2016-12-20 2 views
0

J'ai évalué différents composants de messagerie .Net et j'ai des problèmes avec Rebus en utilisant MSMQ que je n'ai pas été en mesure de résoudre avec des documents, des échantillons ou google.Rebus Newbie Questions

Je crois que la seule chose unique à propos de mon scénario est que mon client peut envoyer ou publier des messages. On dirait que ça devrait être simple, mais j'ai les problèmes suivants. Les extraits de code suivent les problèmes.

Questions:

  1. MetersRequest pas être manipulé. Ils sont simplement en train de s'accumuler dans la file d'attente outgoingMetersRequests.
  2. Obtention d'une erreur indiquant que la file d'attente CreditAuthRequests n'existe pas ou que je n'ai pas d'autorisations. J'ai confirmé que la file d'attente n'est pas créée, mais mon hypothèse est que Rebus s'assurera qu'elle est créée au moment où la file d'attente outgoingMetersRequests est créée. (Note de côté: l'erreur disparaît si je supprime le 'await' de Bus.Send (...), mais la file d'attente n'est toujours pas créée et les messages ne sont pas là.)

= ==========

CLIENT DE

private void InitializeBus() 
    { 
     _messageActivator = new BuiltinHandlerActivator(); 
     Configure.With(_messageActivator) 
      .Transport(t => t.UseMsmq("publisher")) 
      .Routing(r => r.TypeBased().Map<MetersRequest>("outgoingMetersRequests") 
      .Map<CreditAuthorizationRequest>("creditAuthRequests")) 
      .Start(); 
    } 

    private async Task SendCreditAuthRequestAsync(int numberToSend) 
    { 
     var cardNumber = generateCardNumber(); 
     await _messageActivator.Bus.Send(new CreditAuthorizationRequest(cardNumber)); 
     await WriteOutputAsync($"Sent credit auth request for card {cardNumber}."); 
    } 

    private async Task SendMetersRequestAsync(int numberToSend) 
    { 
     await _messageActivator.Bus.Send(new MetersRequest()); 
     await WriteOutputAsync("Sent meters request."); 
    } 

FIN =========== CLIENT

========== ==

DU SERVICE

private void InitializeBus() 
    { 
     _messageActivator = new BuiltinHandlerActivator(); 
     _messageActivator.Register<PosOnlineHandler>(() => new PosOnlineHandler(WriteOutputAsync)); 
     _messageActivator.Register<PumpDownHandler>(() => new PumpDownHandler(WriteOutputAsync)); 
     _messageActivator.Register<MetersRequestHandler>(() => new MetersRequestHandler(WriteOutputAsync, _messageActivator.Bus)); 
     _messageActivator.Register<CreditAuthorizationHandler>(() => new CreditAuthorizationHandler(WriteOutputAsync, _messageActivator.Bus)); 
     Configure.With(_messageActivator) 
      .Transport(t => t.UseMsmq("subscriber1")) 
      .Routing(r => r.TypeBased() 
       .Map<PumpDownEvent>("publisher") 
       .Map<PosOnlineEvent>("publisher") 
       .Map<MetersRequest>("outgoingMetersRequests") 
       .Map<CreditAuthorizationRequest>("creditAuthRequests")) 
     .Start(); 
     _messageActivator.Bus.Subscribe<PumpDownEvent>().Wait(); 
     _messageActivator.Bus.Subscribe<PosOnlineEvent>().Wait(); 
    } 

public class MetersRequestHandler : IHandleMessages<MetersRequest> 
{ 
    private readonly Random _randomizer = new Random(); 
    private readonly Func<string, Task> _outputDelegate2; 
    private readonly IBus _messageBus; 

    public MetersRequestHandler(Func<string, Task> outputDelegate, IBus messageBus) 
    { 
     _outputDelegate2 = outputDelegate; 
     _messageBus = messageBus; 
    } 

    public async Task Handle(MetersRequest message) 
    { 
     var pump = _randomizer.Next(20); 
     var meters = _randomizer.Next(); 
     decimal dollars = (decimal)_randomizer.NextDouble(); 

     var response = new MetersResponse(pump, meters, dollars); 
     await _outputDelegate2($"Sending MetersResponse: (Pump={pump}) (Meters={meters}) (Dollars ={dollars}"); 
     await _messageBus.Reply(response); 
    } 
} 

============

Répondre

1

Il y a deux choses qui semblent un peu hors dans le code affiché. Je vais juste commenter sur vos problèmes, et ensuite je vais suggérer une voie à suivre :)

MètresDemande non traitée. Ils sont simplement en train de s'accumuler dans la file d'attente outgoingMetersRequests.

Lorsque vous appelez .Routing(t => t.TypeBased().Map<SomeMessage>("someQueue")) vous dites que le type SomeMessage « appartient » par le point final Rebus avec la file d'attente d'entrée someQueue.

Lorsque vous await bus.Send(yourMessage), Rebus obtiendra la file d'attente qui possède le type de yourMessage et envoyer le message là. Cela explique assez bien pourquoi vos MetersRequest vont dans la file d'attente outgoingMetersRequests.

Cependant, vous n'avez pas publié de code indiquant un noeud final dont la file d'attente d'entrée est la file d'attente outgoingMetersRequests. Quelqu'un devra traiter les messages hors de cette file d'attente pour que quelque chose se produise. La "file d'entrée" est ce que vous configurez dans la partie .Transport(t => t.UseMsmq("publisher")) - dans ce cas, la file d'entrée est publisher. Obtention d'une erreur indiquant que la file d'attente CreditAuthRequests n'existe pas ou que je n'ai pas d'autorisations.

Oui - les messages d'erreur MSMQ ne sont généralement pas le meilleur;)

J'ai confirmé la file d'attente n'est pas créé, mais mon hypothèse est que Rebus assurera qu'elle est créée, tout comme les outgoingMetersRequests la file d'attente est en cours de création. Rebus (avec MSMQ) crée UNIQUEMENT des files d'attente d'entrée et des files d'erreurs (appelées error). Cela signifie que vous devez avoir un point de terminaison en cours d'exécution en utilisant outgoingMetersRequests comme file d'attente d'entrée au moins une fois, ou peut-être avez-vous créé manuellement la file d'attente?

(Side note: L'erreur disparaît si je retire le « attendent » de Bus.Send (...), mais la file d'attente est toujours pas créé et les messages ne sont pas où trouver.)

l'erreur ne disparaît pas - l'exception est juste attrapé et passé à une continuation qui court sur un fil de threadpool que vous jamais voir parce que vous lâchez le retour de TaskSend.

Je vous suggère de faire ceci:

  1. Venez avec de bons noms pour les files d'attente d'entrée - publisher et subscriber sont trop génériques, vous devez choisir des noms qui correspondent à la responsabilité ou la raison de chaque point de terminaison pour existant dans la première endroit.

  2. Soyez conscient de vos types de message. Il semble que vous ayez raison car vous avez nommé les événements *Event, et votre utilisation de demande/réponse semble correcte aussi.

  3. Utilisez une sorte de stockage d'abonnement partagé, par ex. un serveur SQL central. De cette façon - si vous configurez le même stockage d'abonnement centralisé dans tous vos éditeurs et abonnés - vous n'avez besoin d'aucun mappage de point de terminaison pour vos types d'événement, ce qui est plutôt simple. Vous pouvez en savoir plus sur the wiki page about subscription storages dans la section sur the centralized type.

+0

Cela aide, mais comment configurer plus d'une file d'attente entrante? – scottctr

+0

Un point d'extrémité Rebus == une file d'attente d'entrée – mookid8000

+0

OK, cela fait partie de ce qui me dégoûte. Mon modèle est que mes services peuvent nécessiter des événements et/ou des demandes de types différents et que je devrais avoir une file d'attente pour chaque type. Je ne vois pas comment créer plus d'un point de terminaison dans un service. S'il vous plaît laissez-moi savoir s'il y a quelques documents que j'ai manqués. Merci!! – scottctr