2017-02-06 2 views
1

J'essaye de mettre en place un flux Twitter en utilisant Apache Spark Java API. En enregistrant le flux Twitter sur Elasticsearch, je reçois une exception. Je pense que j'essaye de sauver le tweet cru c'est pourquoi le problème est. S'il vous plaît laissez-moi savoir ce que je peux essayer de résoudre cette exception.API Apache Spark Java + Twitter4j + exception tout en enregistrant le flux Twitter vers Elasticsearch

Voici le code:

package com.twitter.streaming; 
import com.twitter.util.TwitterStreamUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.serializer.KryoSerializer; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.twitter.TwitterUtils; 
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 
import twitter4j.Status; 

/** 
* Created by Manali on 1/28/2017. 
*/ 
public class TwitterStream { 

    private static final String[] filters = {"#football"}; 

    public static void main(String[] args) throws InterruptedException { 
     // create the spark configuration and spark context 
     System.setProperty("hadoop.home.dir", "C:\\winutil\\"); 
     SparkConf conf = new SparkConf().setAppName("SparkTwitterStreamExample").setMaster("local[2]") 
       .set("spark.serializer", KryoSerializer.class.getName()) 
       .set("es.nodes", "localhost:9200") 
       .set("es.index.auto.create", "true"); 

     // create a java streaming context and define the window (3 seconds batch) 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(15)); 

     System.out.println("Initializing Twitter stream..."); 

     // create a DStream (sequence of RDD). The object tweetsStream is a DStream of tweet statuses: 
     // - the Status class contains all information of a tweet 
     // See http://twitter4j.org/javadoc/twitter4j/Status.html 
     // and fill the keys and tokens in the Streamutils class! 
     JavaDStream<Status> twitterStream = TwitterUtils.createStream(jssc, TwitterStreamUtils.getAuth()); 

     JavaDStream<String> statuses = twitterStream.map(
       new Function<Status, String>() { 
        public String call(Status status) { return status.toString(); } 
       } 
     ); 
     statuses.print(); 

     statuses.foreachRDD(tweets->{ 
        // save tweet to Elasticsearch 
        JavaEsSpark.saveJsonToEs(tweets, "spark/tweets"); 
        return null; 
       }); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

Trace de la pile:

------------------------------------------- 
Time: 1486397175000 ms 
------------------------------------------- 
StatusJSONImpl{createdAt=Mon Feb 06 10:06:11 CST 2017, id=828635913144016896, text='夢王國超強大的XDDD 
托托大愛(´▽`)ノ 
發棉花糖的執事超高超帥wwwww 

#夢100 #CWT45', rel="nofollow">Twitter for Android</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=0, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang='ja', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[], urlEntities=[], hashtagEntities=[HashtagEntityJSONImpl{text='夢100'}, HashtagEntityJSONImpl{text='CWT45'}], mediaEntities=[MediaEntityJSONImpl{id=828635824715505665, symbolEntities=[], currentUserRetweetId=-1, user=UserJSONImpl{id=4298859732, name='草加美燕', screenName='mU7oEb6DVbCda4S', location='臺灣 新北市中和', description='17歲的高 
17/02/06 10:06:16 INFO BlockGenerator: Pushed block input-0-1486397175800 
17/02/06 10:06:16 ERROR TaskContextImpl: Error in TaskCompletionListener 
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Invalid UTF-8 start byte 0x89 
at [Source: [[email protected]; line: 1, column: 3] 
    at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:478) 
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) 
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:426) 
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:153) 
    at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:225) 
    at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:248) 
    at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) 
    at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:130) 
    at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:42) 
    at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:68) 
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79) 
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77) 
    at org.apache.spark.scheduler.Task.run(Task.scala:90) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/02/06 10:06:16 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 6) 
org.apache.spark.util.TaskCompletionListenerException: Invalid UTF-8 start byte 0x89 
at [Source: [[email protected]; line: 1, column: 3] 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:90) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/02/06 10:06:16 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 7, localhost, NODE_LOCAL, 1943 bytes) 
17/02/06 10:06:16 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, localhost): org.apache.spark.util.TaskCompletionListenerException: Invalid UTF-8 start byte 0x89 
at [Source: [[email protected]; line: 1, column: 3] 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:90) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

17/02/06 10:06:16 INFO Executor: Running task 1.0 in stage 3.0 (TID 7) 
17/02/06 10:06:16 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; aborting job 
17/02/06 10:06:16 INFO BlockManager: Found block input-0-1486397172800 locally 
17/02/06 10:06:16 INFO TaskSchedulerImpl: Cancelling stage 3 
17/02/06 10:06:16 INFO Executor: Executor is trying to kill task 1.0 in stage 3.0 (TID 7) 
17/02/06 10:06:16 INFO TaskSchedulerImpl: Stage 3 was cancelled 
17/02/06 10:06:16 INFO DAGScheduler: ResultStage 3 (foreachRDD at TwitterStream.java:47) failed in 0.589 s 
17/02/06 10:06:16 INFO DAGScheduler: Job 3 failed: foreachRDD at TwitterStream.java:47, took 0.608443 s 
17/02/06 10:06:16 INFO JobScheduler: Finished job streaming job 1486397175000 ms.1 from job set of time 1486397175000 ms 
17/02/06 10:06:16 INFO JobScheduler: Total delay: 1.086 s for time 1486397175000 ms (execution: 1.001 s) 
17/02/06 10:06:16 ERROR JobScheduler: Error running job streaming job 1486397175000 ms.1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 6, localhost): org.apache.spark.util.TaskCompletionListenerException: Invalid UTF-8 start byte 0x89 
at [Source: [[email protected]; line: 1, column: 3] 
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:90) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Répondre

0

Il y avait un problème dans l'analyse de la valeur tweet. J'utilise ObjectMapper pour ce qui suit est le code de travail pour enregistrer le flux Twitter à Elasticsearch en utilisant Apache Spark.

package com.twitter.streaming; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.twitter.util.TwitterStreamUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.serializer.KryoSerializer; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.twitter.TwitterUtils; 
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 
import twitter4j.Status; 

/** 
* Created by Manali on 1/28/2017. 
*/ 
public class TwitterStream { 

    private static final String[] filters = {"#trumph", "#happy"}; 

    public static void main(String[] args) throws InterruptedException { 

     // create the spark configuration and spark context 
     System.setProperty("hadoop.home.dir", "C:\\winutil\\"); 
     SparkConf conf = new SparkConf().setAppName("SparkTwitterStreamExample").setMaster("local[2]") 
       .set("spark.serializer", KryoSerializer.class.getName()) 
       .set("es.nodes", "localhost:9200") 
       .set("es.index.auto.create", "true"); 

     // create a java streaming context and define the window (3 seconds batch) 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(15)); 

     System.out.println("Initializing Twitter stream..."); 

     // create a DStream (sequence of RDD). The object tweetsStream is a DStream of tweet statuses: 
     // - the Status class contains all information of a tweet 
     // See http://twitter4j.org/javadoc/twitter4j/Status.html 
     // and fill the keys and tokens in the Streamutils class! 
     JavaDStream<Status> twitterStream = TwitterUtils.createStream(jssc, TwitterStreamUtils.getAuth()); 

     /* JavaDStream<String> statuses = twitterStream.map(
       new Function<Status, String>() { 
        public String call(Status status) { 
         return status.toString(); 
        } 
       } 
     );*/ 
     //statuses.print(); 


     // Jackson ObjectMapper for parsing 
     ObjectMapper mapper = new ObjectMapper(); 

     // parse and save Twitter stream to Elasticsearch 
     twitterStream//.map(t -> new Tweet(t.getUser().getName(), t.getText())) 
       .map(t -> mapper.writeValueAsString(t)) 
       .foreachRDD(tweets -> { 
        JavaEsSpark.saveJsonToEs(tweets, "spark/tweets"); 
        return null; 
       }); 


     jssc.start(); 
     jssc.awaitTermination(); 
    } 
}