2017-06-06 1 views
1

J'essaie de lire un gros fichier en morceaux à partir de S3 sans couper aucune ligne pour le traitement en parallèle. Laissez-moi vous expliquer par l'exemple: Il y a un fichier de taille 1G sur S3. Je veux diviser ce fichier en mandrins de 64 Mo. C'est facile je peux le faire comme:Comment lire un fragment de fichier par segment à partir de S3 en utilisant aws-java-sdk

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key)); 

InputStream stream = s3object.getObjectContent(); 

byte[] content = new byte[64*1024*1024]; 

while (stream.read(content) != -1) { 

//process content here 

} 

mais le problème avec le morceau est qu'il peut avoir 100 lignes complètes et une incomplète. mais je ne peux pas traiter une ligne incomplète et je ne veux pas la rejeter.

Est-il possible de gérer cette situation? signifie que tous les mandrins n'ont pas de ligne partielle.

Répondre

1

L'aws-java-sdk fournit déjà des fonctionnalités de streaming pour vos objets S3. Vous devez appeler "getObject" et le résultat sera un InputStream.

1) AmazonS3Client.getObject (GetObjectRequest getObjectRequest) -> S3Object

2) S3Object.getObjectContent()

Note: La méthode est une méthode de lecture simple et ne crée pas un flux . Si vous récupérez un objet S3Object, vous devez fermer ce flux d'entrée dès que possible, car le contenu de l'objet n'est pas mis en mémoire tampon et diffusé directement à partir d'Amazon S3. En outre, l'échec de la fermeture de ce flux peut entraîner la fermeture du pool de requêtes .

aws java docs

1

100 ligne complète et incomplète

voulez-vous dire que vous devez lire la ligne de courant en ligne? Si c'est le cas, au lieu d'utiliser un InputStream, essayez de lire le flux de l'objet s3 en utilisant BufferedReader afin de pouvoir lire le flux ligne par ligne, mais je pense que cela va ralentir un peu.

 S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key)); 
     BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent())); 
     String line; 
     while ((line = in.readLine()) != null) { 

//process line here 

     } 
0

Mon approche habituelle (InputStream ->BufferedReader.lines() -> lots de lignes ->CompletableFuture) ne fonctionnera pas ici parce que les S3ObjectInputStream fois sous-jacents sur éventuellement pour les fichiers énormes.

J'ai donc créé une nouvelle classe S3InputStream, qui ne se soucie pas du temps d'ouverture et de lecture des blocs d'octets à la demande en utilisant des appels AWS SDK de courte durée. Vous fournissez un byte[] qui sera réutilisé. new byte[1 << 24] (16Mb) semble bien fonctionner.

package org.harrison; 

import java.io.IOException; 
import java.io.InputStream; 

import com.amazonaws.services.s3.AmazonS3; 
import com.amazonaws.services.s3.AmazonS3ClientBuilder; 
import com.amazonaws.services.s3.model.GetObjectRequest; 

/** 
* An {@link InputStream} for S3 files that does not care how big the file is. 
* 
* @author stephen harrison 
*/ 
public class S3InputStream extends InputStream { 
    private static class LazyHolder { 
     private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient(); 
    } 

    private final String bucket; 
    private final String file; 
    private final byte[] buffer; 
    private long lastByteOffset; 

    private long offset = 0; 
    private int next = 0; 
    private int length = 0; 

    public S3InputStream(final String bucket, final String file, final byte[] buffer) { 
     this.bucket = bucket; 
     this.file = file; 
     this.buffer = buffer; 
     this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1; 
    } 

    @Override 
    public int read() throws IOException { 
     if (next >= length) { 
      fill(); 

      if (length <= 0) { 
       return -1; 
      } 

      next = 0; 
     } 

     if (next >= length) { 
      return -1; 
     } 

     return buffer[this.next++]; 
    } 

    public void fill() throws IOException { 
     if (offset >= lastByteOffset) { 
      length = -1; 
     } else { 
      try (final InputStream inputStream = s3Object()) { 
       length = 0; 
       int b; 

       while ((b = inputStream.read()) != -1) { 
        buffer[length++] = (byte) b; 
       } 

       if (length > 0) { 
        offset += length; 
       } 
      } 
     } 
    } 

    private InputStream s3Object() { 
     final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset, 
       offset + buffer.length - 1); 

     return LazyHolder.S3.getObject(request).getObjectContent(); 
    } 
}