2017-09-21 6 views
0

J'essaie d'utiliser Brighter pour la commande/l'approvisionnement d'événements. J'ai une solution contenant un service .NET Core Web Api pour mettre des messages dans une file d'attente et une autre solution contenant un projet de console .NET Core pour faire sortir les messages de la file d'attente. Les services sont isolés et ne sont pas dans la même solution.Des messages plus clairs du consommateur ne sont pas acheminés vers le gestionnaire

Le répartiteur de messages extrait les messages de Rabbit et les achemine vers MessageMapper, mais le message ne trouve pas son chemin vers le gestionnaire à traiter.

Visual Studio 2015, .NET Core 1.1, Paramore.Brighter.MessagingGateway.RMQ 7.1.5, Paramore.Brighter.ServiceActivator 7.1.5, StructureMap.Microsoft.DependencyInjection 1.4.0.

La configuration dans l'application de la console:

public static void Main(string[] args) 
{ 
    RetryPolicy retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new List<TimeSpan>() 
    { 
     TimeSpan.FromMilliseconds(50), 
     TimeSpan.FromMilliseconds(100), 
     TimeSpan.FromMilliseconds(150) 
    }); 

    CircuitBreakerPolicy circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500)); 
    PolicyRegistry policyRegistry = new PolicyRegistry() { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }; 

    var subscriberRegistry = new SubscriberRegistry(); 
    subscriberRegistry.Register<ApplicationUpdateCommand, ApplicationUpdateCommandHandler>(); 

    var rmqConnnection = new RmqMessagingGatewayConnection 
    { 
     AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:[email protected]:5672/%2f")), 
     Exchange = new Exchange("api.coverage.exchange"), 
    }; 

    var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection); 
    var rmqMessageProducerFactory = new RmqMessageProducerFactory(rmqConnnection); 

    Dispatcher dispatcher = null; 
    var container = new Container(); 
    container.Configure(config => 
    { 
     config.For<IHandleRequests<ApplicationUpdateCommand>>().Use<ApplicationUpdateCommandHandler>(); 
     var servicesMessageMapperFactory = new ServicesMessageMapperFactory(container); 
     var messageMapperRegistry = new MessageMapperRegistry(servicesMessageMapperFactory) 
     { 
      {typeof(ApplicationUpdateCommand), typeof(ApplicationUpdateCommandMessageMapper) } 
     }; 

     var servicesHandlerFactory = new ServicesHandlerFactory(container); 

     var commandProcessor = CommandProcessorBuilder.With() 
      .Handlers(new HandlerConfiguration(subscriberRegistry, servicesHandlerFactory)) 
      .Policies(policyRegistry) 
      .NoTaskQueues() 
      .RequestContextFactory(new InMemoryRequestContextFactory()) 
      .Build(); 

      dispatcher = DispatchBuilder.With() 
             .CommandProcessor(commandProcessor) 
             .MessageMappers(messageMapperRegistry) 
             .DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory, rmqMessageProducerFactory)) 
             .Connections(new List<Connection>() 
             { 
              new Connection<ApplicationUpdateCommand> 
              (
               new ConnectionName("Application.Update"), 
               new ChannelName("Application.Update"), 
               new RoutingKey("Application.Update") 
              ) 
             }).Build(); 
     }); 

     dispatcher.Receive(); 

     Console.WriteLine("Press enter to stop ..."); 
     Console.ReadLine(); 

     dispatcher.End().Wait(); 
    } 

Code pour la MessageMapper, commande et gestionnaire:

public class ApplicationUpdateCommandMessageMapper : IAmAMessageMapper<ApplicationUpdateCommand> 
{ 
    public Message MapToMessage(ApplicationUpdateCommand request) 
    { 
     var header = new MessageHeader(messageId: request.Id, topic: "Application.Update", messageType: MessageType.MT_EVENT); 
     var body = new MessageBody(JsonConvert.SerializeObject(request)); 
     var message = new Message(header, body); 
     return message; 
    } 

    public ApplicationUpdateCommand MapToRequest(Message message) 
    { 
     // dispatcher will route message here but that is it 
     ApplicationUpdateCommand command = JsonConvert.DeserializeObject<ApplicationUpdateCommand>(message.Body.Value); 
     return command; 
    } 
} 

public class ApplicationUpdateCommand : Command 
{ 
    public int ApplicationId { get; private set; } 
    public string ApplicantName { get; private set; } 

    public ApplicationUpdateCommand(Guid id, int applicationId, string applicantName) 
     : base(id) 
    { 
     ApplicationId = applicationId; 
     ApplicantName = applicantName; 
    } 
} 

public class ApplicationUpdateCommandHandler : RequestHandler<ApplicationUpdateCommand> 
{ 
    private readonly IAmACommandProcessor _commandProcessor; 

    public ApplicationUpdateCommandHandler(IAmACommandProcessor commandProcessor) 
    { 
     _commandProcessor = commandProcessor; 
    } 

    public override ApplicationUpdateCommand Handle(ApplicationUpdateCommand command) 
    { 
     // would like to get here to handle command 

     return base.Handle(command); 
    } 
} 

Répondre

1

Vous identifiez comme MessageType.MT_EVENt dans l'en-tête mais dérivez de commande. Les deux doivent être d'accord, soit dériver de l'événement ou utiliser MT_COMMAND