2017-02-14 2 views
0

J'ai des fichiers journaux dans différents répertoires en fonction de la date de création du fichier journal.Combinaison des journaux de plusieurs répertoires dans Spark

Par exemple

> /mypath/2017/01/20/... 
. 
. 
. 
> /mypath/2017/02/13/... 
> /mypath/2017/02/14/... 

Je voudrais combiner tous ces fichiers journaux en un seul RDD unique à l'aide pyspark pour que je puisse faire les agrégats sur ce fichier maître. Jusqu'à la date, j'ai pris des répertoires individuels, appelés sqlContext et utilisé Union pour joindre tout le fichier journal pour des dates spécifiques.

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14")) 

est-il un moyen facile d'obtenir le maître RDD en spécifiant les fichiers journaux de plage de dates? (Je vais de 2017/01/20 à 2017/02/14)

Je suis assez nouveau pour étinceler, s'il vous plaît corrigez-moi si je me trompais à tout moment.

+0

Aussi, si je veux filtrer à partir sur une colonne "Type" après que je rejoins tous ces journaux (disons DF1). Quel serait le processus optimal pour le faire? (J'utilise habituellement DF1.filter()). Y a-t-il un autre moyen efficace? – SpaceOddity

+0

sqlContext.read.schema (schéma) .json ("/ mypath/2017/02/[13-14]")) ne fonctionne pas. Il dit "Modèle de fichier illégal: plage de caractères illégale près de l'index 4" – SpaceOddity

Répondre

1

Si vous vous en tenez à la SqlContext puis une solution il sera facile de définir une méthode qui liste tous vos fichiers dans le répertoire d'entrée

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) { 
def path = s"${basePath}/${year}/${month}/${day}" 
} 

def listFileSources() : List[FileWithDate] = ??? // implement here 

Si vous voulez union tous les dataframes des sources que vous pouvez faites-le comme ceci:

// create an empty dataframe with the strucutre for the json 
val files = listSources() 
val allDFs = files.foldLeft(emptyDF){case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))} 

Si vous voulez filtrer les fichiers d'entrée par date, alors ce sera facile. Quelque chose comme ça

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3)) 

Une autre solution, il sera d'augmenter vos dataframes (mettre des colonnes supplémentaires) avec année, le mois, le jour et faire toute la logique métier sur les nouveaux dataframes

+0

Je pense qu'avec une connaissance minimale de Scala, vous serez en mesure d'implémenter la méthode listFileSources. Ce que vous devriez faire ici est d'obtenir tous les fichiers dans le dossier mypath (récursive parcourir le sous-dossier) et créer des objets de type FileWithDate. Ces objets sont ajoutés à une liste qui sera renvoyée par la méthode. – dumitru