J'utilise maintenant Java pour concevoir le redis pub/sous-système et a eu un problème. Je vais vous montrer les détails:Redis abonné ne peut pas travailler avec Redis éditeur
L'éditeur ici:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
L'éditeur est correct et peut fonctionner correctement.
Ensuite, nous allons passer à la classe d'abonnés:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
En classe d'abonnés, j'ai utilisé le RxJava pour injecter l'action pour que je puisse l'utiliser beaucoup plus facilement.
Mais la question est là, après avoir publié le message de l'éditeur, je peux c que le message peut être transféré à onMessage, l'impression du journal était pas ce que je pensais:
===> redis subscribe message in <===
===> action is null <===
Ce que je pensais est-ce que quand j'ai publié un nouveau message, l'abonné l'a eu et a couru l'action que j'ai créée.
Le service que j'utilisé pour déclencher l'éditeur et l'abonné ci-dessous:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
De code ci-dessus, vous pouvez c que je prete le sujet et définir une nouvelle action à l'abonné:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
Mais je ne sais pas pourquoi, après avoir déclenché l'éditeur, l'abonné peut obtenir le message mais maintenez NULL Action toujours, l'action que j'ai créée ne lui a pas passé.
Quelqu'un peut-il aider? Y a-t-il un problème avec ce mécanisme?
==== ===== EDIT
code RedisMessageConfig ci-dessous:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
==== ==== Résolu
Enfin je suis arrivé ce résolu par l'idée de mp, légèrement modifié myredismessagesubscriber à myredismessageconfig parce que le flux est de redismessageconfig à redismessagesubscriber, ainsi en redismessagonfonfig, je dois d'abord injecter l'action t o it, alors redismessageconfig créera de nouveaux redismessagesubscriber et maintiendra la nouvelle action créée. Le code ci-dessous:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
Donc, vous voulez dire que le multi-thread qui a causé ce problème? – CharlieShi
Non. Plusieurs threads le rendent visible pour vous. Le problème est dû à un état mutable partagé. – mp911de
Ok, pourriez-vous me faire part de quelques idées exécutables pour impulser ce scénario? – CharlieShi