Causée par: java.lang.RuntimeException: java.io.NotSerializableException: io.netty.channel. DefaultChannelHandlerContext à org.apache.storm.serialization.SerializableSerializer.write (SerializableSerializer.java:41) ~ [storm-core-1.0.1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] au com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:628) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.serializers.MapSerializer.write (MapSerializer.java:113) ~ [kryo-3.0.3.jar :?] au com.esotericsoftware.kryo.serializers.MapSerializer.write (MapSerializer.java:39) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:628) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.serializers.CollectionSerializer.write (CollectionSerializer.java:100) ~ [kryo-3.0.3.jar :?] à com. esotericsoftware.kryo.serializers.CollectionSerializer.write (CollectionSerializer.java:40) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:534) ~ [ kryo-3.0.3.jar:?] au org.apache.storm.serialization.KryoValuesSerializer.serializeInto (KryoValuesSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1 245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.serialization.KryoTupleSerializer.serialize (KryoTupleSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1245 .jar: 1.0.1.2.5.0.0-1245] au org.apache.storm.daemon.worker $ mk_transfer_fn $ transfer_fn__6723.invoke (worker.clj: 192) ~ [storm-core-1.0.1.2.5.0. 0-1245.jar: 1.0.1.2.5.0.0-1245] au org.apache.storm.daemon.executor $ start_batch_transfer__GT_worker_handler_BANG_ $ fn__6411.invoke (executor.clj: 313) ~ [storm-core-1.0.1.2 .5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.disruptor $ clojure_handler $ reify__6005.onEvent (disruptor.clj: 40) ~ [storm-core-1.0. 1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue. java: 451) ~ [tempête-core-1.0.1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] ... 6 plusCausée par: java.lang.RuntimeException: java.io.NotSerializableException: io.netty.channel.DefaultChannelHandlerContext
J'utilise une tempête mode local n'est pas un problème, mais sur le cluster sera signalé à la faute.
ceci est mon code:
public class NettySpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
/**
* colloctor for spout
*/
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector=spoutOutputCollector;
StormServer stormServer=new StormServer();
stormServer.run();
}
@Override
public void nextTuple() {
Values tuple;
try {
while ((tuple = ServerHandler.queue.take()) != null) {
collector.emit(tuple);
}
} catch (Exception e) {
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value","channl"));
}
public class ServerHandler extends ChannelInboundHandlerAdapter{
private static Logger logger = LogManager.getLogger(ServerHandler.class);
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>();
public static Map<String,ChannelHandlerContext> ctxes;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
JSONObject message = (JSONObject) msg;
queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes)));
}
Vous avez raison, le ChannelHandlerContext ne peut pas être sérialisé, peut-être que cette façon est fausse, je devrais essayer d'une autre manière. Merci pour votre réponse. – Tdz