2017-05-21 3 views
1

Je veux ingérer de nombreux petits fichiers texte via étincelle au parquet. Actuellement, j'utilise wholeTextFiles et effectue une analyse supplémentaire. Pour être plus précis, ces petits fichiers texte sont des fichiers ESRi ASCII Grid d'une taille maximale d'environ 400 ko. Les GeoTools sont utilisés pour les analyser comme indiqué ci-dessous.étincelle textiles entiers - beaucoup de petits fichiers

Voyez-vous des possibilités d'optimisation? Peut-être quelque chose pour éviter la création d'objets inutiles? Ou quelque chose pour mieux gérer les petits fichiers. Je me demande s'il est préférable d'obtenir seulement les chemins des fichiers et de les lire manuellement au lieu d'utiliser String -> ByteArrayInputStream.

case class RawRecords(path: String, content: String) 
case class GeometryId(idPath: String, value: Double, geo: String) 
@transient lazy val extractor = new PolygonExtractionProcess() 
@transient lazy val writer = new WKTWriter() 

def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = { 
    import spark.implicits._ 
    spark.sparkContext 
     .wholeTextFiles(path, parallelism) 
     .toDF("path", "content") 
     .as[RawRecords] 
     .mapPartitions(mapToSimpleTypes) 
    } 

def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => { 
    val extractor = new PolygonExtractionProcess() 

    // http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html 
    val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null) 

    // TODO maybe consider optimization of known size instead of using growable data structure 
    val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features 
    val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]() 

    while (vectorizedFeatures.hasNext) { 
     val vectorizedFeature = vectorizedFeatures.next() 
     val geomWKTLineString = vectorizedFeature.getDefaultGeometry match { 
     case g: Geometry => writer.write(g) 
     } 
     val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double] 
     result += GeometryId(r.path, geomUserdata, geomWKTLineString) 
    } 
    result 
    }) 
+0

S'il vous plaît voir ma réponse ici si elle aide, mais encore une fois c'est avec dataframe: https://stackoverflow.com/a/45227410/297113 –

Répondre

2

J'ai suggestions:

  1. utilisation wholeTextFile ->mapPartitions -> Convertir en Dataset. Pourquoi? Si vous définissez mapPartitions sur Dataset, toutes les lignes sont converties du format interne à l'objet. Cela entraîne une sérialisation supplémentaire.
  2. Exécutez Java Mission Control et échantillonnez votre application. Il montrera toutes les compilations et les temps d'exécution des méthodes
  3. Peut-être que vous pouvez utiliser binaryFiles, il vous donnera Stream, donc vous pouvez l'analyser sans lecture supplémentaire mapPartitions
+0

La compression tungstens ne devrait-elle pas compenser tous les problèmes d'objet? Y at-il quelque chose comme 'wholeBinaryFiles' qui va me donner le chemin et le fichier entier? –

+0

@GeorgHeiler Vous pouvez utiliser 'SparkContext.binaryFiles', il renvoie paire FilePath, DataStream –

+0

Quel type de flux est le' PortableDataStream'? Lors de l'utilisation de 'new ArcGridReader (r._2) .read (null)' j'obtiens une 'ERROR arcgrid: type d'entrée non pris en charge'. Tout flux d'entrée normal doit être pris en charge par https://github.com/geotools/geotools/blob/master/modules/plugin/arcgrid/src/main/java/org/geotools/gce/arcgrid/ArcGridReader.java#L119 –