2017-09-20 1 views
1

J'essaie de configurer Vert.X avec Jersey pour gérer les données POST (pas nécessairement des données de formulaire). Le ContainerRequest.setEntityStream de Jersey prend dans un InputStream qui est ce que j'essaye de construire Cependant, je ne peux pas sembler se passer autour de ces données sans lire la chose en mémoire à l'aide bodyHandler ou ma propre méthode personnalisée qui fait quelque chose de similaire, mais limite l'entréeVert.x ReadStream <Buffer> à InputStream

final Buffer body = Buffer.buffer(); 
    event 
     .handler(buffer -> { 
      if (!event.response().headWritten()) { 
       body.appendBuffer(buffer); 
       if (body.length() > 10 * 1024 * 1024) { 
        event.response() 
         .setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode()) 
         .setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase()) 
         .end(); 
       } 
      } 
     }) 
     .endHandler(aVoid -> { 
      request.setEntityStream(new VertxBufferInputStream(body)); 
      appHandler.handle(request); 
     }); 

VertxBufferInputStream est une enveloppe simple au VertXbuffer . Juste pour économiser de la mémoire en évitant de convertir en ByteArrayInputStream(). Mais il a tout le corps.

Je veux éviter d'avoir le corps entier et le faire couler. J'ai essayé quelques trucs tout à fait hacky et mauvais code qui finalement ne fonctionne pas parce qu'il bloque la boucle d'événement parce que le handler n'est pas appelé et l'attend.

Répondre

0

Un tel beau problème à résoudre :)
Vous pouvez obtenir une source d'inspiration à https://github.com/jersey/jersey/blob/12e5d8bdf22bcd2676a1032ed69473cf2bbc48c7/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java#L124 où l'intégration Jersey avec Netty est mis en œuvre.

Je crois qu'il ya exactement les mêmes problèmes résolus (bien que pour un serveur web différent, Netty dans ce cas), qui est:

  1. La requête http est traitée sur une boucle d'événements (par Netty), comme la méthode ApplicationHandler.handle() doit être appelée sur un thread différent (boucle sans événement)
  2. L'API non bloquante doit être convertie en blocage InputStream. Cela est mis en œuvre au NettyInputStream. Il tire parti du fait que le ByteBuf de Netty peut être facilement converti en un InputStream et, par conséquent, en utilisant LinkedBlockingDeque, ces flux d'entrée sont convertis en un seul. (Juste fyi, je ne suis pas sûr à 100% si le fournisseur, le thread de boucle d'événement, est garanti pour ne jamais bloquer ce qui serait un bug.)

Par ailleurs, ceci est juste un traitement de l'entrant Les données. Pour la réponse, vous devrez implémenter une conversion de OutputStream (utilisée par Jersey) en API non bloquante Vert.X.

+0

Merci aussi j'ai regardé dedans. Une de mes tentatives était similaire à cela mais elle bloque toujours car "take()" bloquera. –

+0

Je ne comprends pas. C'est parfaitement ok; 'take()' bloque (c'est le contrat de 'InputStream.read()').La partie importante est ce que j'ai décrit à [1], le 'ApplicationHandler.handle()' doit être délégué à un thread différent. Par conséquent, la méthode 'take()' est appelée par cet autre thread et tout fonctionne parfaitement. Il n'y a pas d'autre moyen que celui-ci, à moins que Jersey ne soit équipé d'une API non bloquante, ce qui ne s'est pas produit (malheureusement, mais c'était proche). –

+0

Correct c'est le plus proche. (Je viens de le faire travailler maintenant) –

0

Deux composants sont nécessaires.

  1. vous devez vous assurer que vous séparer le traitement pour tout ce qui peut bloquer l'utilisation vertx.executeBlocking voir https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128

  2. vous devez gérer deux événements: lorsque le nouveau tampon de données arrive et quand elle se termine. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123

  3. vous devez mettre en œuvre un InputStream qui serait en mesure d'accepter des données d'un autre fil et bloc lorsqu'il n'y a pas de données ainsi que la capacité de réception d'un message qu'il n'y a pas plus d'entrée. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java