2017-04-19 8 views
1

Lorsque vous appelez SimilarityAnalysis Apache Mahout pour CCO je reçois une exception fatale d'un NegativeArraySizeException.Apache Mahout SimilarityAnalysis pour lancer CCO NegativeArraySizeException

Le code que je suis en cours d'exécution ressemble à ceci:

val result = SimilarityAnalysis.cooccurrencesIDSs(myIndexedDataSet:Array[IndexedDataset], 
     randomSeed = 1234, 
     maxInterestingItemsPerThing = 3, 
     maxNumInteractions = 4) 

Je vois l'erreur suivante et correspondant trace de la pile:

17/04/19 20:49:09 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 20) 
java.lang.NegativeArraySizeException 
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/04/19 20:49:09 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 21) 
java.lang.NegativeArraySizeException 
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/04/19 20:49:09 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 20, localhost): java.lang.NegativeArraySizeException 
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73) 
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

J'utilise la version Apache Mahout 0.13.0

Répondre

1

Cela signifie toujours que l'une des matrices d'entrée est vide. Combien de matrices y a-t-il dans le tableau? Quel est le nombre de lignes et de colonnes dans chaque? Il y a un objet compagnon pour IndexedDatasetSpark qui fournit un constructeur, appelé apply à Scala, qui prend un RDD[String, String] donc si vous pouvez obtenir vos données dans le RDD, juste construire le IndexedDatasetSpark avec cela. Ici les paires de chaînes sont user-id, item-id pour un événement comme un achat.

Voir le Compagnon objet ici: https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala#L75

Un peu de recherche va trouver le code pour transformer un csv en RDD [String, String] avec une ligne de code ou si. Il ressemblera à quelque chose comme ceci:

val rawPurchaseInteractions = sc.textFile("/path/in/hdfs").map { line => 
    (line.split("\,")(0), (line.split("\,")(1)) 
} 

Bien que cela divise deux fois, il attend une liste délimitée par des virgules des lignes dans un fichier texte avec user-id,item-id pour un certain type d'interaction comme « achat ». S'il y a d'autres champs dans le fichier, il suffit de le diviser pour obtenir l'identifiant de l'utilisateur et l'identifiant de l'élément. La ligne dans la fonction map renvoie une paire de chaînes de sorte que le RDD résultant sera du bon type, à savoir RDD[String, String]. Transmettez-le à l'IndexedDatasetSpark avec:

val purchasesRdd = IndexedDatasetSpark(rawPurchaseInteractions)(sc) 

où sc est votre contexte Spark. Cela devrait vous donner un non-vide IndexedDatasetSpark, que vous pouvez vérifier en regardant la taille des enveloppées BiDictionary s ou en appelant des méthodes sur le Mahout enveloppé DRM.

BTW Cela suppose qu'il n'y a pas d'en-tête au csv. Ceci est délimité par du texte et non par csv. En utilisant d'autres méthodes dans Spark, vous pouvez lire de vrais fichiers CSV, mais cela peut être inutile.

+0

Merci @pferrel votre réponse, je me suis dit ce que la question était (voir ci-dessous) qui n'a rien à voir avec Mahout. – ldeluca

0

Le problème avait en fait rien à voir avec Mahout, mais une ligne plus tôt:

inputRDD.filter(_ (1) == primaryFilter).map(o => (o(0), o(2))) 

La gamme était hors, j'avais 1 à 3 au lieu de 0 à 2. Je pensais que pour que l'endroit où il a été fait était dans Mahout étant donné l'erreur, mais s'avère que c'était le vrai problème.