2017-08-31 3 views
1

Comment puis-je convertir le type Stream<Object> en InputStream? À l'heure actuelle, je reçois le iterator et boucle à travers toutes les données de conversion à un byteArray et l'ajouter à un fluxEntrée:Flux <Object> à InputStream

ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(bos); 

Iterator<MyType> myItr = MyObject.getStream().iterator(); 

while (myItr.hasNext()) { 

     oos.writeObject(myItr.next().toString() 
     .getBytes(StandardCharsets.UTF_8)); 
    } 
    oos.flush(); 
    oos.close(); 

    InputStream is = new ByteArrayInputStream(bao.toByteArray()); 

Quelle est la tête de le faire bien? Si mon flux contient un téraoctet de données, est-ce que je ne sucerais pas un téraoctet de données en mémoire? Y a-t-il un meilleur moyen d'y parvenir?

+0

Etes-vous sûr que ce genre de 'InputStream' est ce dont vous avez besoin? Vous convertissez des objets en chaînes, obtenez leur représentation UTF-8 sous la forme de tableaux d'octets et utilisez la sérialisation d'objets pour ces objets tableau. On ne sait pas très bien ce que vous voulez recevoir à l'autre bout, actuellement ce n'est ni les objets ni les cordes. Vous pouvez écrire des chaînes directement à la place ou vous pouvez écrire une représentation textuelle plus simple en utilisant un 'Writer', sans la surcharge du protocole de sérialisation de l'objet, mais aucune ne recréerait les objets originaux. – Holger

Répondre

2

Vous devriez être en mesure de convertir le OutputStream en un InputStream en utilisant un tuyau:

PipedOutputStream pos = new PipedOutputStream(); 
InputStream is = new PipedInputStream(pos); 

new Thread(() -> { 
    try (ObjectOutputStream oos = new ObjectOutputStream(pos)) { 
     Iterator<MyType> myItr = MyObject.getStream().iterator(); 
     while (myItr.hasNext()) { 
      oos.writeObject(myItr.next().toString() 
       .getBytes(StandardCharsets.UTF_8)); 
     } 
    } catch (IOException e) { 
     // handle closed pipe etc. 
    } 
}).start(); 

Inspirée par this answer.

+0

Hm ... je ne pense pas que je comprends tout à fait ce que ce code est censé faire? le pos n'est jamais assigné de données: S et il semble que nous sommes en train de faire une boucle et d'ajouter le contenu de myItr à l'oos sans aucune raison car il ne rentre en aucun cas dans le contenu que le flux d'entrée contient? Est-ce que j'ai râté quelque chose? – BigBug

+1

'is' et' oos' sont tous les deux liés à 'pos', qui achemine les données du flux de sortie vers le flux d'entrée. Mais maintenant que j'y pense, il faudrait quand même tout mettre en mémoire tampon, donc cela ne résout pas vraiment votre problème. – shmosel

+2

@BigBug Ok, je me suis trompé à ce sujet tout en mémoire tampon. Il bloque réellement une fois qu'il atteint la taille du tampon, ce qui signifie que vous devez alimenter le flux de sortie sur un thread séparé. Voir ma réponse mise à jour. – shmosel

0

Cela fonctionnerait-il pour vous?

https://gist.github.com/stephenhand/292cdd8bba7a452d83c51c00d9ef113c

Il est une implémentation InputStream qui prend Stream<byte[]> comme données d'entrée. Vous avez juste besoin de .map() vos objets abitrary à des tableaux d'octets, mais vous voulez que chaque objet soit représenté par des octets.

Il appelle seulement une opération de terminal sur le Stream lorsque le InputStream est lu, tirer des objets hors Stream que le consommateur lit plus de InputStream il ne charge jamais l'ensemble en mémoire