2017-05-16 1 views
0

Je pousse le flux de kafka dans le druide via la tranquillité. la version de kafka est 0.9.1, la tranquilité est 0.8, le druide est 0.10. tranquillité commence bien quand aucun message produit, mais quand l'envoi de message producteur i va se JsonMappingException comme ceci:Exception jette quand insérer des données dans le druide via la tranquillité

ava.lang.IllegalArgumentException: Can not deserialize instance of java.util.ArrayList out of VALUE_STRING token 
at [Source: N/A; line: -1, column: -1] 
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:2774) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6] 
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:2700) ~[com.fasterxml.jackson.core.jackson-databind-2.4.6.jar:2.4.6] 
    at com.metamx.tranquility.druid.DruidBeams$.makeFireDepartment(DruidBeams.scala:406) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.druid.DruidBeams$.fromConfigInternal(DruidBeams.scala:291) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.druid.DruidBeams$.fromConfig(DruidBeams.scala:199) ~[io.druid.tranquility-core-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaBeamUtils$.createTranquilizer(KafkaBeamUtils.scala:40) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaBeamUtils.createTranquilizer(KafkaBeamUtils.scala) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.<init>(TranquilityEventWriter.java:64) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.WriterController.createWriter(WriterController.java:171) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.writer.WriterController.getWriter(WriterController.java:98) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.8.0.jar:0.8.0] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_67] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67] 

et mon kafka.json est:

{ 
    "dataSources" : { 
    "stock-index-topic" : { 
     "spec" : { 
     "dataSchema" : { 
      "dataSource" : "stock-index-topic", 
      "parser" : { 
      "type" : "string", 
      "parseSpec" : { 
       "timestampSpec" : { 
       "column" : "timestamp", 
       "format" : "auto" 
       }, 
       "dimensionsSpec" : { 
       "dimensions" : ["code","name","acronym","market","tradeVolume","totalValueTraded","preClosePx","openPrice","highPrice","lowPrice","tradePrice","closePx","timestamp"], 
       "dimensionExclusions" : [ 
        "timestamp", 
        "value" 
       ] 
       }, 
       "format" : "json" 
      } 
      }, 
      "granularitySpec" : { 
      "type" : "uniform", 
      "segmentGranularity" : "DAY", 
      "queryGranularity" : "none", 
      "intervals":"no" 
      }, 
      "metricsSpec" : [ 
      { 
       "name" : "firstPrice", 
       "type" : "doubleFirst", 
       "fieldName" : "tradePrice" 
      },{ 
       "name" : "lastPrice", 
       "type" : "doubleLast", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "minPrice", 
       "type" : "doubleMin", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "maxPrice", 
       "type" : "doubleMax", 
       "fieldName" : "tradePrice" 
      } 
      ] 
     }, 
     "ioConfig" : { 
      "type" : "realtime" 
     }, 
     "tuningConfig" : { 
      "type" : "realtime", 
      "maxRowsInMemory" : "100000", 
      "intermediatePersistPeriod" : "PT10M", 
      "windowPeriod" : "PT10M" 
     } 
     }, 
     "properties" : { 
     "task.partitions" : "1", 
     "task.replicants" : "1", 
     "topicPattern" : "stock-index-topic" 
     } 
    } 
    }, 
    "properties" : { 
    "zookeeper.connect" : "localhost:2181", 
    "druid.discovery.curator.path" : "/druid/discovery", 
    "druid.selectors.indexing.serviceName" : "druid/overlord", 
    "commit.periodMillis" : "15000", 
    "consumer.numThreads" : "2", 
    "kafka.zookeeper.connect" : "localhost:2181", 
    "kafka.group.id" : "tranquility-kafka" 
    } 
} 

i utiliser le kafka-Console- consommateur pour obtenir les données, il ressemble à

{"code": "399982", "name": "500等权", "acronym": "500DQ", "market": "102", "tradeVolume": 0, "totalValueTraded": 0.0, "preClosePx": 0.0, "openPrice": 0.0, "highPrice": 0.0, "lowPrice": 0.0, "tradePrice": 7184.7142, "closePx": 0.0, "timestamp": "2017-05-16T09:06:39.000+08:00"} 

Une idée pourquoi? Merci.

Répondre

0
"metricsSpec" : [ 
      { 
       "name" : "firstPrice", 
       "type" : "doubleFirst", 
       "fieldName" : "tradePrice" 
      },{ 
       "name" : "lastPrice", 
       "type" : "doubleLast", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "minPrice", 
       "type" : "doubleMin", 
       "fieldName" : "tradePrice" 
      }, { 
       "name" : "maxPrice", 
       "type" : "doubleMax", 
       "fieldName" : "tradePrice" 
      } 
      ] 
     }, 

c'est le document wrong.The dit: En premier aggrégateur dernier ne peuvent pas être utilisés dans les spécifications de l'ingestion, et ne devraient être précisées dans le cadre des requêtes. Donc, le problème est résolu.