2017-05-09 2 views
0

J'utilise ElasticSearch 5.4 et Hadoop 2.7.3 et que vous voulez écrire des données de HDFS à Elasticsearch.My données blog.json:données d'écriture à ElasticSearch: EsHadoopSerializationException

{"id":"1","title":"git简介","posttime":"2016-06-11","content":"svn与git的最主要区别..."} 
{"id":"2","title":"ava中泛型的介绍与简单使用","posttime":"2016-06-12","content":"基本操作:CRUD ..."} 
{"id":"3","title":"SQL基本操作","posttime":"2016-06-13","content":"svn与git的最主要区别..."} 
{"id":"4","title":"Hibernate框架基础","posttime":"2016-06-14","content":"Hibernate框架基础..."} 
{"id":"5","title":"Shell基本知识","posttime":"2016-06-15","content":"Shell是什么..."} 

Je mets blog.json à HDFS:

hadoop fs -put blog.json /work 

puis je commence ElasticSearch 5.4 et écrire mon code java:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.elasticsearch.hadoop.mr.EsOutputFormat; 

import java.io.IOException; 

/** 
* Created by bee on 4/1/17. 
*/ 
public class HdfsToES { 

    public static class MyMapper extends Mapper<Object, Text, NullWritable, 
      BytesWritable> { 

     public void map(Object key, Text value, Mapper<Object, Text, 
       NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException { 
      byte[] line = value.toString().trim().getBytes(); 
      BytesWritable blog = new BytesWritable(line); 
      context.write(NullWritable.get(), blog); 
     } 
    } 


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 

     Configuration conf = new Configuration(); 
     conf.setBoolean("mapred.map.tasks.speculative.execution", false); 
     conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); 
     conf.set("es.nodes", "localhost:9200/"); 
     conf.set("es.resource", "blog/csdn"); 
     conf.set("es.mapping.id", "id"); 
     conf.set("es.input.json", "yes"); 

     Job job = Job.getInstance(conf, "hadoop es write test"); 
     job.setMapperClass(HdfsToES.MyMapper.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(EsOutputFormat.class); 

     job.setMapOutputKeyClass(NullWritable.class); 
     job.setMapOutputValueClass(BytesWritable.class); 

     // 设置输入路径 
     FileInputFormat.setInputPaths(job, new Path 
       ("hdfs://localhost:9000//work/blog.json")); 

     job.waitForCompletion(true); 
    } 
} 

J'ai obtenu un index vide sans aucune donnée à l'exception ElasticSearch fluide:

java.lang.Exception: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.codehaus.jackson.JsonParseException: Unexpected character ('b' (code 98)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 3] 

Il travaille dans ElasticSearch 2.3 bu pas 5.4.How mettre à jour mon code?

Répondre

0
public static class MyMapper extends Mapper<Object, Text, NullWritable, 
     BytesWritable> { 

    public void map(Object key, Text value, Mapper<Object, Text, 
      NullWritable, BytesWritable>.Context context) throws IOException, InterruptedException { 
     byte[] line = value.toString().trim().getBytes(); 
     BytesWritable blog = new BytesWritable(line); 
     context.write(NullWritable.get(), blog); 
    } 
}