Je veux ingérer de nombreux petits fichiers texte via étincelle au parquet. Actuellement, j'utilise wholeTextFiles
et effectue une analyse supplémentaire. Pour être plus précis, ces petits fichiers texte sont des fichiers ESRi ASCII Grid d'une taille maximale d'environ 400 ko. Les GeoTools sont utilisés pour les analyser comme indiqué ci-dessous.étincelle textiles entiers - beaucoup de petits fichiers
Voyez-vous des possibilités d'optimisation? Peut-être quelque chose pour éviter la création d'objets inutiles? Ou quelque chose pour mieux gérer les petits fichiers. Je me demande s'il est préférable d'obtenir seulement les chemins des fichiers et de les lire manuellement au lieu d'utiliser String -> ByteArrayInputStream
.
case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()
def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = {
import spark.implicits._
spark.sparkContext
.wholeTextFiles(path, parallelism)
.toDF("path", "content")
.as[RawRecords]
.mapPartitions(mapToSimpleTypes)
}
def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => {
val extractor = new PolygonExtractionProcess()
// http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)
// TODO maybe consider optimization of known size instead of using growable data structure
val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()
while (vectorizedFeatures.hasNext) {
val vectorizedFeature = vectorizedFeatures.next()
val geomWKTLineString = vectorizedFeature.getDefaultGeometry match {
case g: Geometry => writer.write(g)
}
val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
result += GeometryId(r.path, geomUserdata, geomWKTLineString)
}
result
})
S'il vous plaît voir ma réponse ici si elle aide, mais encore une fois c'est avec dataframe: https://stackoverflow.com/a/45227410/297113 –