2017-04-04 3 views
0

J'ai un flux CEP qui a un débit élevé avec plus de 100 messages par seconde.WSO2 CEP 420 JMSEventAdapter - Le saut d'événement dans la file d'attente de travaux de l'adaptateur de sortie est plein

Et je publie le résultat de mon traitement dans un éditeur JMS avec la configuration suivante:

Output Event Adapter Type* : JMS 
JNDI Initial Context Factory Class: org.apache.activemq.jndi.ActiveMQInitialContextFactory 
JNDI Provider URL *: tcp://localhost:61616 
Connection Factory JNDI Name *: QueueConnectionFactory 
Destination Type *: Queue 
Destination *: myqueue 

aussi, afin d'essayer si le problème était de ne pas avoir accès simultané i ajouté:

Concurrent Publishers: Allow 

au JMSPublisher

et je reçois l'erreur suivante:

ERROR {org.wso2.carbon.event.output.adapter.jms.JMSEventAdapter} - 
     Event dropped at Output Adapter 'kpis' for tenant id '-1234', 
     Job queue is full, Task [email protected] 
     rejected from [email protected] 
     [Running, pool size =1, active threads = 1, queued tasks = 10000, 
     completed tasks = 176986] 
     java.util.concurrent.RejectedExecutionException: Task 
     [email protected] rejected from 
     [email protected][Running, pool size = 1, 
     active threads = 1, queued tasks = 10000, completed tasks = 176986] 

Existe-t-il une limite sur le débit d'un activemq JMS?

Aussi, jusqu'à présent, il n'y a pas de consommateur dans la file d'attente à laquelle j'écris tous les messages. Cela peut-il avoir un impact négatif sur l'éditeur WSO2 CEP causant cette erreur et dégradant les performances?

À la lecture de certaines informations en ligne, il semble que cela pourrait être un problème direct avec la taille de la piscine!

Est-il possible d'augmenter la taille du pool JMSEventAdapter? si oui, alors comment?

COMPLET STACK TRACE:

ERROR {org.wso2.carbon.event.output.ad apter.jms.JMSEventAdapter} - Event dropped at Output Adapter 'kpis' for tenant id '-1234', Job queue is full, Task [email protected] rejected from [email protected][Running, pool size = 100, active threads = 100, queued tasks = 10000, completed tasks = 5151] 
     at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 100, active threads = 100, queued tasks = 10000, completed tasks = 5151] 
     at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
     at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
     at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) 
     at org.wso2.carbon.event.output.adapter.jms.JMSEventAdapter.publish(JMSEventAdapter.java:142) 
     at org.wso2.carbon.event.output.adapter.core.internal.OutputAdapterRuntime.publish(OutputAdapterRuntime.java:62) 
     at org.wso2.carbon.event.output.adapter.core.internal.CarbonOutputEventAdapterService.publish(CarbonOutputEventAdapterService.java:143) 
     at org.wso2.carbon.event.publisher.core.internal.EventPublisher.process(EventPublisher.java:414) 
     at org.wso2.carbon.event.publisher.core.internal.EventPublisher.sendEvent(EventPublisher.java:226) 
     at org.wso2.carbon.event.publisher.core.internal.EventPublisher.onEvent(EventPublisher.java:294) 
     at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvents(EventJunction.java:194) 
     at org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener.receive(SiddhiOutputStreamListener.java:100) 
     at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:98) 
     at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:69) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323) 
     at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46) 
     at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78) 
     at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86) 
     at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:56) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:150) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendData(StreamJunction.java:214) 
     at org.wso2.siddhi.core.stream.StreamJunction.access$200(StreamJunction.java:46) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:343) 
     at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:49) 
     at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:59) 
     at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:51) 
     at org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher.sendEvent(SiddhiInputEventDispatcher.java:39) 
     at org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher.consumeEvent(AbstractSiddhiInputEventDispatcher.java:104) 
     at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvents(EventJunction.java:183) 
     at org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener.receive(SiddhiOutputStreamListener.java:100) 
     at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:98) 
     at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:69) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323) 
     at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46) 
     at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78) 
     at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:102) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323) 
     at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46) 
     at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78) 
     at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86) 
     at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:110) 
     at org.wso2.siddhi.core.query.processor.stream.window.LengthWindowProcessor.process(LengthWindowProcessor.java:86) 
     at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:57) 
     at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:101) 
     at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:118) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:102) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323) 
     at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46) 
     at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78) 
     at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123) 
     at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86) 
     at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:56) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80) 
     at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:150) 
     at org.wso2.siddhi.core.stream.StreamJunction.sendData(StreamJunction.java:214) 
     at org.wso2.siddhi.core.stream.StreamJunction.access$200(StreamJunction.java:46) 
     at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:343) 
     at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:49) 
     at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:59) 
     at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:51) 
     at org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher.sendEvent(SiddhiInputEventDispatcher.java:39) 
     at org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher.consumeEvent(AbstractSiddhiInputEventDispatcher.java:104) 
     at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvent(EventJunction.java:146) 
     at org.wso2.carbon.event.receiver.core.internal.management.InputEventDispatcher.onEvent(InputEventDispatcher.java:27) 
     at org.wso2.carbon.event.receiver.core.internal.EventReceiver.sendEvent(EventReceiver.java:298) 
     at org.wso2.carbon.event.receiver.core.internal.EventReceiver.processMappedEvent(EventReceiver.java:222) 
     at org.wso2.carbon.event.receiver.core.internal.EventReceiver$MappedEventSubscription.onEvent(EventReceiver.java:355) 
     at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.onEvent(InputAdapterRuntime.java:110) 
     at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSMessageListener.onMessage(JMSMessageListener.java:61) 
     at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.handleMessage(JMSTaskManager.java:643) 
     at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.run(JMSTaskManager.java:542) 
     at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Répondre

0

En raison du débit élevé des plans d'exécution aux éditeurs et le mécanisme de async qui ferme les connexions JMS, pool de connexion active JMS MQ à l'intérieur du moteur WSO2 CEP est incapable de garder avec l'ouverture et la fermeture de ces connexions.

Ceci épuise rapidement toutes les connexions disponibles. Indépendamment du nombre maximum de #.

La solution dans mon cas passe par la réduction du nombre de messages envoyés par unité de temps et l'accumulation de résultats.