2017-04-07 1 views
1

Bonjour J'essaye d'écrire un cas de test pour implémenter le support de basculement pour activeMQ.Message perdu lors de l'arrêt et du redémarrage de Embedded ActiveMQ

Voici le code

val brokerA = createBroker("A") 
brokerA.start 
val failoverUrl = s"failover:(vm://BrokerA?create=false)" + 
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true" 


val cFactory = new ActiveMQConnectionFactory(failoverUrl) 
val qConnection = getQueueConnection 
val session = createQueueSession(qConnection) 

private def totalReadMessagesCount(queueReceiver: QueueReceiver) = { 
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq 
messages.size 
} 

private def getReceiver = { 
val queueConnection = getQueueConnection 
queueConnection.start() 
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE) 
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName) 
queueReceiver 
} 

def getQueueConnection =cFactory.createQueueConnection("admin", "") 

def createBroker(name:String) = { 
val broker = new BrokerService() 
val adaptor = new KahaDBPersistenceAdapter() 
broker.setBrokerName("Broker" + name) 
broker.addConnector(getBrokerUrl) 
broker.setPersistent(true) 
broker.setUseJmx(false) 
broker.setUseShutdownHook(false) 
broker 
} 

def getBrokerUrl = "tcp://localhost:0" 


val queueReceiver: QueueReceiver = getReceiver 
val messageCount = 500 
(1 to messageCount) map {count => 
    //Calling method to send message to ActiveMQ 
    if(count == 200){ 
    brokerA.stop() 
    brokerA.waitUntilStopped() 
    brokerA.start(true) 
    } 
} 
val totalCount = totalReadMessagesCount(queueReceiver) 
println(s"Read ${totalCount} messages") 
assert(totalCount == messageCount) 

Je suis en mesure de renouer avec activeMQ après le redémarrage, mais totalCount DIFFUSE 300 au lieu de 500. Il semble les messages précédents sont perdus. Cependant, lorsque je cours le même scénario en mode non intégré. Je suis capable d'obtenir tous les messages.

S'il vous plaît aidez-moi comment puis-je éviter de perdre un message lors du redémarrage de mq actif intégré.

+0

Vous devez avoir deux (2) cas de activeMQ pour tester le basculement. Le basculement signifie que si un client ne peut pas parler au serveur donné, il essaiera le suivant dans la liste des serveurs de la chaîne de connexion. Vous avez seulement un serveur dans votre chaîne de connexion. Voir la documentation ici: http://activemq.apache.org/failover-transport-reference.html – zloster

Répondre

1

Vous devez définir persistant vrai, je ne sais pas scala mais voici le code java

public BrokerService broker() throws Exception { 
    final BrokerService broker = new BrokerService(); 
    //broker.addConnector("tcp://localhost:61616"); 
    broker.addConnector("stomp://localhost:61613"); 
    broker.addConnector("vm://localhost"); 
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); 
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha"); 
    if (!dir.exists()) { 
     dir.mkdirs(); 
    } 
    persistenceAdapter.setDirectory(dir); 
    broker.setPersistenceAdapter(persistenceAdapter); 
    broker.setPersistent(true); 
    return broker; 
} 
+0

Je viens de mettre à jour mon exemple. J'ai rendu persistant à vrai et ajouté KahaDB. Mais maintenant, je compte 305 points. Toute suggestion? –

+0

Vous avez créé une instance de KahaDBPersistenceAdapter mais vous ne l'avez pas configurée pour le courtier non ?? Comme ce broker.setPersistenceAdapter (adaptateur); –