2017-10-17 26 views
0

en ligne système, la tempête Bolt obtenir NullPointerException, bien que je pense que je le vérifie avant la ligne 61; Il obtient NullPointerException une fois de temps en temps;AbstractStringBuilder.ensureCapacityInternal obtenir NullPointerException dans le boulon de tempête

import ***.KeyUtils; 
import ***.redis.PipelineHelper; 
import ***.redis.PipelinedCacheClusterClient; 
import **.redis.R2mClusterClient; 
import org.apache.commons.lang3.StringUtils; 
import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.IRichBolt; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.context.ApplicationContext; 
import org.springframework.context.support.ClassPathXmlApplicationContext; 

import java.util.Map; 

/** 
* RedisBolt batch operate 
*/ 
public class RedisBolt implements IRichBolt { 
    static final long serialVersionUID = 737015318988609460L; 
    private static ApplicationContext applicationContext; 
    private static long logEmitNumber = 0; 
    private static StringBuffer totalCmds = new StringBuffer(); 
    private Logger logger = LoggerFactory.getLogger(getClass()); 
    private OutputCollector _collector; 
    private R2mClusterClient r2mClusterClient; 

    @Override 
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
     _collector = outputCollector; 
     if (applicationContext == null) { 
      applicationContext = new ClassPathXmlApplicationContext("spring/spring-config-redisbolt.xml"); 
     } 
     if (r2mClusterClient == null) { 
      r2mClusterClient = (R2mClusterClient) applicationContext.getBean("r2mClusterClient"); 
     } 


    } 

    @Override 
    public void execute(Tuple tuple) { 
     String log = tuple.getString(0); 
     String lastCommands = tuple.getString(1); 

     try { 
      //log count 
      if (StringUtils.isNotEmpty(log)) { 
       logEmitNumber++; 
      } 

      if (StringUtils.isNotEmpty(lastCommands)) { 
       if(totalCmds==null){ 
        totalCmds = new StringBuffer(); 
       } 
       totalCmds.append(lastCommands);//line 61 
      } 

      //日志数量控制 
      int numberLimit = 1; 
      String flow_log_limit = r2mClusterClient.get(KeyUtils.KEY_PIPELINE_LIMIT); 
      if (StringUtils.isNotEmpty(flow_log_limit)) { 
       try { 
        numberLimit = Integer.parseInt(flow_log_limit); 
       } catch (Exception e) { 
        numberLimit = 1; 
        logger.error("error", e); 
       } 
      } 

      if (logEmitNumber >= numberLimit) { 
       StringBuffer _totalCmds = new StringBuffer(totalCmds); 
       try { 
        //pipeline submit 
        PipelinedCacheClusterClient pip = r2mClusterClient.pipelined(); 
        String[] commandArray = _totalCmds.toString().split(KeyUtils.REDIS_CMD_SPILT); 
        PipelineHelper.cmd(pip, commandArray); 
        pip.sync(); 
        pip.close(); 
        totalCmds = new StringBuffer(); 
       } catch (Exception e) { 
        logger.error("error", e); 
       } 

       logEmitNumber = 0; 
      } 
     } catch (Exception e) { 
      logger.error(new StringBuffer("====RedisBolt error for log=[ ").append(log).append("] \n commands=[").append(lastCommands).append("]").toString(), e); 
      _collector.reportError(e); 
      _collector.fail(tuple); 
     } 

     _collector.ack(tuple); 
    } 

    @Override 
    public void cleanup() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 

} 

information d'exception:

java.lang.NullPointerException à java.lang.AbstractStringBuilder.ensureCapacityInternal (AbstractStringBuilder.java:113) à java.lang.AbstractStringBuilder.append (AbstractStringBuilder.java:415) à java.lang.StringBuffer.append (StringBuffer.java:237) at com.jd.jr.dataeye.storm.bolt.RedisBolt.execute (RedisBolt.java:61) sur org.apache.storm.daemon.executor $ fn__5044 $ tuple_action_fn__5046.invoke (executor.clj: 727) à org.apache.storm.daemon.executor $ mk_task_receiver $ fn__4965.invoke (executor.clj: 459) à org.apache.storm.disruptor $ clojure_handler $ reify__4480.onEvent (disruptor. clj: 40) à org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (D isruptorQueue.java:472) sur org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable (DisruptorQueue.java:451) sur org.apache.storm.disruptor $ consume_batch_when_available.invoke (disruptor.clj: 73) sur org.apache.storm .daemon.executor $ fn__5044 $ fn__5057 $ fn__5110.invoke (executor.clj: 846) à org.apache.storm.util $ async_loop $ fn__557.invoke (util.clj: 484) à clojure.lang.AFn.run (AFn .java: 22) à java.lang.Thread.run (Thread.java:745)

quelqu'un peut-il me donner quelques conseils pour trouver la raison.

+0

et la version java « 1.7 .0_71 " – Fanl

Répondre

0

C'est vraiment étrange. S'il vous plaît lire le code pour deux classes.

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/AbstractStringBuilder.java

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/StringBuffer.java

AbstractStringBuilder a constructeur sans args qui n'allouent pas le champ 'valeur', ce qui rend l'accès au champ 'valeur' ​​étant NPE. Tous les constructeurs dans StringBuffer utilisent ce constructeur. Alors peut-être que quelque chose d'étrange se produit dans la sérialisation/désérialisation et, malheureusement, le champ 'value' dans AbstractStringBuilder est nul.

Il se peut que l'initialisation de totalCmds dans prepare() soit meilleure et que vous ayez à prendre en compte la synchronisation (thread-safety) entre les boulons. prepare() peut être appelé par instance de boulon afin que les champs soient thread-safe, mais les champs de classe ne sont pas thread-safe.

0

Je pense que je trouve le problème peut-être;

le point clé est

"StringBuffer _totalCmds = new StringBuffer (totalCmds);" et "totalCmds.append (lastCommands); // ligne 61"

quand un objet nouveau, il prend des mesures Serval:

(1) allouer de la mémoire et de retour de référence

(2) initialiser

si append après (1) et avant (2) alors StringBuffer.java étend AbstractStringBuilder.java

/** 
* The value is used for character storage. 
*/ 
char[] value; 
valeur

n'est pas initialisé, donc cela se null:

@Override 
public synchronized void ensureCapacity(int minimumCapacity) { 
    if (minimumCapacity > value.length) { 
     expandCapacity(minimumCapacity); 
    } 
} 

cette tache a une autre question, certaines données peut-être perdu dans un environnement multithread