Bonjour J'essaye d'écrire un cas de test pour implémenter le support de basculement pour activeMQ.Message perdu lors de l'arrêt et du redémarrage de Embedded ActiveMQ
Voici le code
val brokerA = createBroker("A")
brokerA.start
val failoverUrl = s"failover:(vm://BrokerA?create=false)" +
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true"
val cFactory = new ActiveMQConnectionFactory(failoverUrl)
val qConnection = getQueueConnection
val session = createQueueSession(qConnection)
private def totalReadMessagesCount(queueReceiver: QueueReceiver) = {
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq
messages.size
}
private def getReceiver = {
val queueConnection = getQueueConnection
queueConnection.start()
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName)
queueReceiver
}
def getQueueConnection =cFactory.createQueueConnection("admin", "")
def createBroker(name:String) = {
val broker = new BrokerService()
val adaptor = new KahaDBPersistenceAdapter()
broker.setBrokerName("Broker" + name)
broker.addConnector(getBrokerUrl)
broker.setPersistent(true)
broker.setUseJmx(false)
broker.setUseShutdownHook(false)
broker
}
def getBrokerUrl = "tcp://localhost:0"
val queueReceiver: QueueReceiver = getReceiver
val messageCount = 500
(1 to messageCount) map {count =>
//Calling method to send message to ActiveMQ
if(count == 200){
brokerA.stop()
brokerA.waitUntilStopped()
brokerA.start(true)
}
}
val totalCount = totalReadMessagesCount(queueReceiver)
println(s"Read ${totalCount} messages")
assert(totalCount == messageCount)
Je suis en mesure de renouer avec activeMQ après le redémarrage, mais totalCount
DIFFUSE 300 au lieu de 500. Il semble les messages précédents sont perdus. Cependant, lorsque je cours le même scénario en mode non intégré. Je suis capable d'obtenir tous les messages.
S'il vous plaît aidez-moi comment puis-je éviter de perdre un message lors du redémarrage de mq actif intégré.
Vous devez avoir deux (2) cas de activeMQ pour tester le basculement. Le basculement signifie que si un client ne peut pas parler au serveur donné, il essaiera le suivant dans la liste des serveurs de la chaîne de connexion. Vous avez seulement un serveur dans votre chaîne de connexion. Voir la documentation ici: http://activemq.apache.org/failover-transport-reference.html – zloster