2017-10-19 21 views
4

J'ai besoin de lire les données de parquet de aws s3. Si j'utilise sdk aws pour ce que je peux obtenir InputStream comme ceci:Lire les données de parquet du seau AWS s3

S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, bucketKey)); 
InputStream inputStream = object.getObjectContent(); 

Mais le lecteur parquet apache utilise uniquement un fichier local comme celui-ci:

ParquetReader<Group> reader = 
        ParquetReader.builder(new GroupReadSupport(), new Path(file.getAbsolutePath())) 
          .withConf(conf) 
          .build(); 
reader.read() 

Je ne sais pas comment flux d'entrée parse pour le fichier parquet. Par exemple, pour les fichiers CSV, CSVParser utilise inputstream.

Je connais la solution pour utiliser des étincelles pour ce but. Comme ceci:

SparkSession spark = SparkSession 
       .builder() 
       .getOrCreate(); 
Dataset<Row> ds = spark.read().parquet("s3a://bucketName/file.parquet"); 

Mais je ne peux pas utiliser étincelle.

Quelqu'un pourrait-il me dire des solutions pour lire des données de parquet à partir de s3?

Répondre

0
String SCHEMA_TEMPLATE = "{" + 
         "\"type\": \"record\",\n" + 
         " \"name\": \"schema\",\n" + 
         " \"fields\": [\n" + 
         "  {\"name\": \"timeStamp\", \"type\": \"string\"},\n" + 
         "  {\"name\": \"temperature\", \"type\": \"double\"},\n" + 
         "  {\"name\": \"pressure\", \"type\": \"double\"}\n" + 
         " ]" + 
         "}"; 
String PATH_SCHEMA = "s3a"; 
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName); 
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE); 
Configuration configuration = new Configuration(); 
AvroReadSupport.setRequestedProjection(configuration, schema); 
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build(); 
GenericRecord genericRecord = parquetReader.read(); 

while(genericRecord != null) { 
     Map<String, String> valuesMap = new HashMap<>(); 
     genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString())); 

     genericRecord = parquetReader.read(); 
} 

Gradle dépendances

compile 'com.amazonaws:aws-java-sdk:1.11.213' 
    compile 'org.apache.parquet:parquet-avro:1.9.0' 
    compile 'org.apache.parquet:parquet-hadoop:1.9.0' 
    compile 'org.apache.hadoop:hadoop-common:2.8.1' 
    compile 'org.apache.hadoop:hadoop-aws:2.8.1' 
    compile 'org.apache.hadoop:hadoop-client:2.8.1'