0

Hé, je suis en train de mettre fin à certaines mesures de strraming à CloudWatch via StreamingListenerétincelle intégration StreamingListener CloudWatch

quelque chose comme ceci:

class MyStreamingListener() 
extends StreamingListener{ 

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted):Unit={ 
val cloudWatch = new AmazonCloudWatchClient(new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)) 
    cloudWatch.setEndpoint("monitoring.eu-west-1.amazonaws.com") 
    val putMetricDataRequest = new PutMetricDataRequest() 
    putMetricDataRequest.setNamespace("my-name-space") 
    val metricDatum = new MetricDatum().withMetricName("test") 
    metricDatum.setValue(batchCompleted.batchInfo.numRecords) 
    metricDatum.setUnit(StandardUnit.fromValue("Milliseconds")) 
    putMetricDataRequest.getMetricData.add(metricDatum) 
    cloudWatch.putMetricData(putMetricDataRequest) 
} 
} 

puis l'utiliser dans saprkStreaming:

val streamingContext: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(2)) 
    streamingContext.addStreamingListener(new LoadIndexStreamingListener) 

    val dstream = KinesisUtils.createStream(
     streamingContext, "this-is-just-a-test", "my-stream", "kinesis.eu-west-1.amazonaws.com", 
     "eu-west-1", InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) 
     .map(byteArray => new String(byteArray)) 
    dstream.print() 
    streamingContext.start() 
    streamingContext.awaitTermination() 

quand j'ai fait un test avec le shell d'étincelle sur mon cluster (EMR) cela a fonctionné Ok et les mesures ont été envoyées à CloudWacth

mais quand je fis mon code dans un bocal avec sbt clean assembly et l'exécuter avec étincelle soumettre, je me suis l'erreur suivante:

java.lang.NoSuchMethodError: com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.putMetricData(Lcom/amazonaws/services/cloudwatch/model/PutMetricDataRequest;)Lcom/amazonaws/services/cloudwatch/model/PutMetricDataResult; 

c'est la commande -submit d'allumage J'ai essayé:

spark-submit --class com.me.sparkTest.App --master local[4] --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,com.amazonaws:amazon-kinesis-client:1.7.2 clowdwatch-spark-test-assembly-1.0.jar 

une idée de ce qui l'a fait échouer lors de l'utilisation de spark-submit?

+0

problème est avec votre version de dépendances, peut-être que vous utilisez une version différente dans votre pot d'assemblage – Kaushal

+0

je l'ai marqué comme prévu dans la construction .sbt donc il utilise le paquet donné dans l'étincelle soumettre .... –

+0

Mais vous compilez votre jar avec, donc vous devez utiliser la même version. – Kaushal

Répondre

0

L'erreur a été provoquée précédemment car la compilation de la classe avait une signature différente pour AmazonCloudWatchClient.putMetricData que les bibliothèques d'exécution disponibles dans le cluster EMR.

La solution est de créer un pot uber avec la dépendance suivante

<dependency> 
    <groupId>com.amazonaws</groupId> 
    <artifactId>aws-java-sdk</artifactId> 
    <version>1.10.75.1</version> 
</dependency>