2

J'utilise WebClient et la coutume BodyExtractor classe pour mon application printemps-bootComment lire correctement Flux <DataBuffer> et le convertir en un seul fluxEntrée

WebClient webLCient = WebClient.create(); 
webClient.get() 
    .uri(url, params) 
    .accept(MediaType.APPLICATION.XML) 
    .exchange() 
    .flatMap(response -> { 
    return response.body(new BodyExtractor()); 
    }) 

BodyExtractor.java

@Override 
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) { 
    Flux<DataBuffer> body = response.getBody(); 
    body.map(dataBuffer -> { 
    try { 
     JaxBContext jc = JaxBContext.newInstance(SomeClass.class); 
     Unmarshaller unmarshaller = jc.createUnmarshaller(); 

     return (T) unmarshaller.unmarshal(dataBuffer.asInputStream()) 
    } catch(Exception e){ 
     return null; 
    } 
    }).next(); 
} 

code ci-dessus fonctionne avec une petite charge utile mais pas sur un gros problème, je pense que c'est parce que je ne lis que la valeur d'un flux avec next et je ne sais pas comment combiner et lire tous dataBuffer. Je suis nouveau dans le domaine des réacteurs, donc je ne fais pas beaucoup de trucs avec le flux/monophonique.

Répondre

1

j'ai pu le faire fonctionner à l'aide Flux#collect et SequenceInputStream

@Override 
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) { 
    Flux<DataBuffer> body = response.getBody(); 
    return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream)) 
    .map(inputStream -> { 
     try { 
     JaxBContext jc = JaxBContext.newInstance(SomeClass.class); 
     Unmarshaller unmarshaller = jc.createUnmarshaller(); 

     return (T) unmarshaller.unmarshal(inputStream); 
     } catch(Exception e){ 
     return null; 
     } 
    }).next(); 
} 

InputStreamCollector.java

public class InputStreamCollector { 
    private InputStream is; 

    public void collectInputStream(InputStream is) { 
    if (this.is == null) this.is = is; 
    this.is = new SequenceInputStream(this.is, is); 
    } 

    public InputStream getInputStream() { 
    return this.is; 
    } 
} 
+1

pourquoi écrivez-vous votre propre BodyExtractor? WebFlux supporte déjà Jaxb avec Jaxb2XmlDecoder. –

+0

@BrianClozel dois-je configurer quelque chose pour que cela fonctionne? 'bodyToMono' ne semble pas ramasser mes pojo. –

+0

Qu'est-ce que 'InputStreamCollector'? –

1

Reconstruire les InputStream défaites le but d'utiliser WebClient en premier lieu parce que rien ne sera émis que l'opération collect est terminée. Pour un grand courant, cela peut être très long. Le modèle réactif ne traite pas des octets individuels, mais des blocs d'octets (comme Spring DataBuffer). Voir ma réponse ici pour une solution plus élégante: https://stackoverflow.com/a/48054615/839733