2017-10-06 4 views
0

Quelqu'un pourrait-il m'aider avec ce problème PySpark? J'ai passé des jours là-dessus. Je n'arrive pas à comprendre pourquoi la longueur de mon schéma change lorsque je l'ai imprimé plusieurs fois. La version de Spark est 2.2 et j'utilise Jupyter Notebook pour exécuter le code sur un cluster de 20 nœuds.Longueur des changements de schéma après flatMap et lors de l'impression de plusieurs fois dans pyspark

Voici mon code:

import myReader 

    # read data from binary files 
    data=sc.binaryFiles('Data/20171006') 

    # binary file reader convert binary file to a tuple of schema and data array 
    # the 1st item in the tuple is the schema of type StructType 
    # the 2nd item in the tuple is a numpy 2D array 
    tableWithSchemaRDD = data.map(myReader.convert) 

    # print out the length of the schema to check its length 
    # since the schema is the same for all items in the RDD, I only check the first one 
    print "1st print: ", len(tableWithSchemaRDD.first()[0]) 

    # extract the data array from RDD 
    tableRDD = tableWithSchemaRDD.map(lambda x:x[1]) 

    # print out the length of the schema to check its length again 
    print "2nd print: ", len(tableWithSchemaRDD.first()[0]) 

    # flatmap so each item in the rdd is a row instead of 2D array 
    # and sort all the rows by the last item in each row, which is a timestamp 
    rowRDD = tableRDD.flatMap(lambda y:[z for z in y]).sortBy(lambda x:x[-1]) 

    # print out the length of the schema multiple times 
    print "multiple print: " 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 

Voici la sortie:

1st print: 73 
    2nd print: 73 
    multiple print: 
    3961 
    3961 
    3961 
    3961 
    73 
    73 

Comme vous pouvez voir que le premier 2 print imprimé sur la longueur correcte du RDD, mais après la flatMap transformation, il a augmenté à un nombre beaucoup plus grand. Ensuite, après un certain temps, c'est-à-dire une déclaration de 4 print, il est revenu à la longueur correcte. Parfois, le mauvais numéro n'est pas 3961, mais tout autre nombre de multiple de 72, puis plus 1. Je suppose que c'est parce que le premier StructField de mon schéma est le nom des données, le 73rd StructField est un horodatage, donc le nombre 3961 est 72 * 51 + 1. J'ai également vu le nombre 11737, 23401, 9793, 2017, etc.

Une autre chose que je voudrais mentionner est que si je mets le code dans myReader dans le même cahier plutôt de l'importer en tant que module. Je ne vois pas ce problème. J'ai utilisé sc.addPyFile() pour distribuer mon module aux noeuds.

Tous les commentaires et suggestions seraient appréciés! Je vous remercie!

Répondre

0

Comme cette question est déjà formulée, il n'y a rien d'inattendu dans le résultat. Puisque vous tri:

.sortBy(lambda x: x[-1]) 

Spark doit mélanger les données et, en cas d'égalité, peut choisir un élément arbitraire comme first un. Par conséquent, la valeur peut changer d'un cycle à l'autre.

+0

Merci beaucoup pour la réponse. Puis-je vous demander pourquoi vous pensez qu'il y aurait des liens? Tous les horodatages sont uniques, ils sont utilisés pour 'SortBy'. – dangwh