2017-07-03 4 views
1

J'essaie d'utiliser Azure Service Bus avec Apache Qpid et Spring Integration avec des transactions.Transactions de bus de service Azure avec AMQP

Mais il semble que l'implémentation d'AMQP Azure Service Bus ne supporte pas les transactions. Est-ce vrai? Je n'ai pas trouvé d'informations connexes.

C'est ma config JMS

<bean id="serviceBusConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory"> 
    <constructor-arg value="amqps://${serviceBus.host}?amqp.idleTimeout=1200000"/> 
    <property name="clientID" value="${serviceBus.clientId}"/> 
    <property name="username" value="${serviceBus.sharedAccessPolicyName}"/> 
    <property name="password" value="${serviceBus.sharedAccessPolicyKey}"/> 
    <property name="receiveLocalOnly" value="true"/> 
    <property name="receiveNoWaitLocalOnly" value="true"/> 
</bean> 
<bean id="jmsCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="serviceBusConnectionFactory" /> 
</bean> 

Et ceci est mon extrait d'intégration de printemps:

<int-jms:inbound-channel-adapter id="resOrdJmsIn" 
           destination-name="${serviceBus.destination-name}" 
           channel="resOrdIncoming" 
           connection-factory="jmsCachingConnectionFactory" 
           acknowledge="client" 
           session-transacted="true" > 
    <int:poller fixed-rate="1000"/> 
</int-jms:inbound-channel-adapter> 

Il fonctionne avec session traitées = "false" mais traitées session = "true "il produit une erreur:

2017-07-03 10:06:27.237 ERROR 21575 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error] 
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) 
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487) 
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:754) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:138) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:111) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error] 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:148) 
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:103) 
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167) 
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:92) 
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699) 
    ... 7 more 

APACHE QPID Trace

2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_FINAL 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_INIT 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_LOCAL_OPEN 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_REMOTE_OPEN 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : SENT: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Coordinator{capabilities=[amqp:local-transactions]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null} 
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.q.j.t.netty.NettyTcpTransport  : Attempted write of: 127 bytes 
2017-07-03 10:37:29.328 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport  : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165) 
2017-07-03 10:37:29.329 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl  : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : RECV: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null} 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_INIT 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_LOCAL_OPEN 
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_REMOTE_OPEN 
2017-07-03 10:37:29.432 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport  : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165) 
2017-07-03 10:37:29.433 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl  : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input 
+0

service Azure Bus fait des transactions de soutien: https://docs.microsoft.com/ fr-fr/azure/service-bus-messagerie/service-bus-transactions –

Répondre

0

extrait d'intégration Spring, qui fonctionne avec Azure Service Bus sur JMS, soutenu par AMPQ de manière transactionnelle:

<int-jms:message-driven-channel-adapter id="jmsIn" 
              destination-name="${serviceBus.destination-name}" 
              connection-factory="jmsCachingConnectionFactory" 
              acknowledge="client" 
              channel="channel-si"/>