2016-12-09 3 views
0

J'utilise maintenant Java pour concevoir le redis pub/sous-système et a eu un problème. Je vais vous montrer les détails:Redis abonné ne peut pas travailler avec Redis éditeur

L'éditeur ici:

public class RedisMessagePublisher implements MessagePublisher { 

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic) 
{ 
    this.redisTemplate = redisTemplate; 
    this.topic = topic; 
} 

private StringRedisTemplate redisTemplate; 

private ChannelTopic topic; 

@Override 
public void publish(String message) { 
    redisTemplate.convertAndSend(topic.getTopic(), message); 
    } 
} 

L'éditeur est correct et peut fonctionner correctement.

Ensuite, nous allons passer à la classe d'abonnés:

public class RedisMessageSubscriber implements MessageListener { 

//action inspect here 
private Action2<Message, byte[]> action; 

public void setAction(Action2<Message, byte[]> action) { 
    logger.info("action set"); 
    this.action = action; 
} 

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class); 

@Override 
public void onMessage(Message message, byte[] bytes) { 

    logger.info("===> redis subscribe message in <==="); 

    if (action != null) 
     action.call(message, bytes); 
    else 
     logger.info("===> action is null <==="); 
    } 
} 

En classe d'abonnés, j'ai utilisé le RxJava pour injecter l'action pour que je puisse l'utiliser beaucoup plus facilement.

Mais la question est là, après avoir publié le message de l'éditeur, je peux c que le message peut être transféré à onMessage, l'impression du journal était pas ce que je pensais:

===> redis subscribe message in <=== 
===> action is null <=== 

Ce que je pensais est-ce que quand j'ai publié un nouveau message, l'abonné l'a eu et a couru l'action que j'ai créée.

Le service que j'utilisé pour déclencher l'éditeur et l'abonné ci-dessous:

@RestController("redispubsubcontroller") 
@RequestMapping(value = "/redis") 
public class redispubsubcontroller { 

@Autowired 
private RedisMessagePublisher redisMessagePublisher; 

@Autowired 
private RedisMessageSubscriber redisMessageSubscriber; 

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class); 

@RequestMapping(value = "/publisher", method = {RequestMethod.GET}) 
public ApiResponse getConfig(String message,HttpServletRequest request, 
              HttpServletResponse response) { 

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       String result = objectMapper.readValue(message.getBody(), String.class); 
       logger.info("receive:"+result); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    redisMessagePublisher.publish(message); 

    return new ApiResponse("success","message sent"); 
    } 
} 

De code ci-dessus, vous pouvez c que je prete le sujet et définir une nouvelle action à l'abonné:

redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
    @Override 
    public void call(Message message, byte[] bytes) { 
     ObjectMapper objectMapper = new ObjectMapper(); 
     try { 
      String result = objectMapper.readValue(message.getBody(), String.class); 
      logger.info("receive:"+result); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 

Mais je ne sais pas pourquoi, après avoir déclenché l'éditeur, l'abonné peut obtenir le message mais maintenez NULL Action toujours, l'action que j'ai créée ne lui a pas passé.

Quelqu'un peut-il aider? Y a-t-il un problème avec ce mécanisme?

==== ===== EDIT

code RedisMessageConfig ci-dessous:

@Configuration 
public class RedisMessageConfig { 

@Bean 
ChannelTopic topic() { 
    return new ChannelTopic("useraddresspubsub:queue"); 
} 

@Bean 
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber()); 
} 

@Autowired 
private RedisConnectionFactory JedisConnectionFactory; 

@Bean 
RedisMessageListenerContainer redisContainer() { 
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(JedisConnectionFactory); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
    } 
} 

==== ==== Résolu

Enfin je suis arrivé ce résolu par l'idée de mp, légèrement modifié myredismessagesubscriber à myredismessageconfig parce que le flux est de redismessageconfig à redismessagesubscriber, ainsi en redismessagonfonfig, je dois d'abord injecter l'action t o it, alors redismessageconfig créera de nouveaux redismessagesubscriber et maintiendra la nouvelle action créée. Le code ci-dessous:

@Component 
public class MyRedisMessageConfig extends RedisMessageConfig { 

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class); 

public MyRedisMessageConfig() { 
    super.action = new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      String result = new String(message.getBody()); 
       logger.info("received:" + result); 
      } 
     }; 
    } 
} 

image ci-dessous: enter image description here

Répondre

1

Ce n'est pas la façon dont MessageListener est destiné à fonctionner. De plus, vous créez un état mutable partagé. Deux invocations simultanées changent simultanément l'état de RedisMessageSubscriber.

Je suppose que vous rencontrez des problèmes de visibilité lorsque vous définissez action dans un thread et la réception de message se produit sur un thread différent.

Si vous avez besoin d'un comportement différent par MessageListener, créez plusieurs écouteurs qui implémentent ce comportement.

+0

Donc, vous voulez dire que le multi-thread qui a causé ce problème? – CharlieShi

+0

Non. Plusieurs threads le rendent visible pour vous. Le problème est dû à un état mutable partagé. – mp911de

+0

Ok, pourriez-vous me faire part de quelques idées exécutables pour impulser ce scénario? – CharlieShi