2017-05-04 1 views
0

Causée par: java.lang.RuntimeException: java.io.NotSerializableException: io.netty.channel. DefaultChannelHandlerContext à org.apache.storm.serialization.SerializableSerializer.write (SerializableSerializer.java:41) ~ [storm-core-1.0.1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] au com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:628) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.serializers.MapSerializer.write (MapSerializer.java:113) ~ [kryo-3.0.3.jar :?] au com.esotericsoftware.kryo.serializers.MapSerializer.write (MapSerializer.java:39) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.Kryo.writeClassAndObject (Kryo.java:628) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.serializers.CollectionSerializer.write (CollectionSerializer.java:100) ~ [kryo-3.0.3.jar :?] à com. esotericsoftware.kryo.serializers.CollectionSerializer.write (CollectionSerializer.java:40) ~ [kryo-3.0.3.jar :?] à com.esotericsoftware.kryo.Kryo.writeObject (Kryo.java:534) ~ [ kryo-3.0.3.jar:?] au org.apache.storm.serialization.KryoValuesSerializer.serializeInto (KryoValuesSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1 245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.serialization.KryoTupleSerializer.serialize (KryoTupleSerializer.java:44) ~ [storm-core-1.0.1.2.5.0.0-1245 .jar: 1.0.1.2.5.0.0-1245] au org.apache.storm.daemon.worker $ mk_transfer_fn $ transfer_fn__6723.invoke (worker.clj: 192) ~ [storm-core-1.0.1.2.5.0. 0-1245.jar: 1.0.1.2.5.0.0-1245] au org.apache.storm.daemon.executor $ start_batch_transfer__GT_worker_handler_BANG_ $ fn__6411.invoke (executor.clj: 313) ~ [storm-core-1.0.1.2 .5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.disruptor $ clojure_handler $ reify__6005.onEvent (disruptor.clj: 40) ~ [storm-core-1.0. 1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] à org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue. java: 451) ~ [tempête-core-1.0.1.2.5.0.0-1245.jar: 1.0.1.2.5.0.0-1245] ... 6 plusCausée par: java.lang.RuntimeException: java.io.NotSerializableException: io.netty.channel.DefaultChannelHandlerContext

J'utilise une tempête mode local n'est pas un problème, mais sur le cluster sera signalé à la faute.

ceci est mon code:

public class NettySpout extends BaseRichSpout { 

private static final long serialVersionUID = 1L; 
/** 
* colloctor for spout 
*/ 
private SpoutOutputCollector collector; 

@Override 
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { 
    collector=spoutOutputCollector; 
    StormServer stormServer=new StormServer(); 
    stormServer.run(); 
} 

@Override 
public void nextTuple() { 
    Values tuple; 
    try { 
     while ((tuple = ServerHandler.queue.take()) != null) { 
      collector.emit(tuple); 
     } 
    } catch (Exception e) { 
    } 
} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declare(new Fields("value","channl")); 
} 


public class ServerHandler extends ChannelInboundHandlerAdapter{ 

private static Logger logger = LogManager.getLogger(ServerHandler.class); 
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>(); 
public static Map<String,ChannelHandlerContext> ctxes; 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    JSONObject message = (JSONObject) msg; 
    queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes))); 

} 

Répondre

1

Je n'ai pas beaucoup de connaissances sur la tempête elle-même, mais il semble que vous essayez de serialise ChannelHandlerContext (comme stocké dans votre carte) qui n'est pas Serializable.

+0

Vous avez raison, le ChannelHandlerContext ne peut pas être sérialisé, peut-être que cette façon est fausse, je devrais essayer d'une autre manière. Merci pour votre réponse. – Tdz