2017-08-09 1 views
1

J'essaie de transmettre des données volumineuses via des Websockets en utilisant le RPC de crossbar/autobahn. Ma configuration est la suivante:Renvoi de grandes données à partir de RPC (Crossbar + Autobahn | Python)

  • Python 2.7
  • Un routeur barre transversale (version 17.8.1.post1)
  • Un back-end qui va essayer d'envoyer un grand dataframe de pandas géants en tant que chaîne JSON
  • Un frontal qui voudra recevoir cette chaîne

En substance, mon frontal essaie d'appeler une fonction qui retournera une grande chaîne.

class MyComponent(ApplicationSession): 

@inlineCallbacks 
def onJoin(self, details): 
    print("session ready") 
    try: 
     res = yield self.call(u'data.get') 

Et je reçois cette erreur:

2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before] 
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests 
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None) 

Il semble la barre transversale est me donner des coups parce que ma session cliente semble mort lui, mais je pensais que autobahn serait gros morceau de mes données et que l'appel serait ne pas bloquer le réacteur client.

J'ai activé un certain nombre de choses dans ma configuration de barre transversale pour améliorer le traitement de socket Web; Merci à cela, j'ai été en mesure de transmettre une plus grande quantité de données, mais je finirais par atteindre une limite (fichier de configuration en grande partie copié et collé à partir de sam & max).

     "options": { 
          "enable_webstatus": false, 
          "max_frame_size": 16777216, 
          "auto_fragment_size": 65536, 
          "fail_by_drop": true, 
          "open_handshake_timeout": 2500, 
          "close_handshake_timeout": 1000, 
          "auto_ping_interval": 10000, 
          "auto_ping_timeout": 5000, 
          "auto_ping_size": 4, 
          "compression": { 
           "deflate": { 
            "request_no_context_takeover": false, 
            "request_max_window_bits": 11, 
            "no_context_takeover": false, 
            "max_window_bits": 11, 
            "memory_level": 4 
           } 
          } 
         } 

Des idées, des prises, des choses que je fais mal?

Merci,


Code client:

from __future__ import print_function 
import pandas as pd 

from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet.defer import inlineCallbacks 


class MyComponent(ApplicationSession): 

    @inlineCallbacks 
    def onJoin(self, details): 
     print("session ready") 
     try: 
      res = yield self.call(u'data.get') 
      print('Got the data') 
      data = pd.read_json(res) 
      print("call result: {}".format(data.head())) 
      print("call result shape: {0}, {1}".format(*data.shape)) 
     except Exception as e: 
      print("call error: {0}".format(e)) 


if __name__ == "__main__": 
    from autobahn.twisted.wamp import ApplicationRunner 

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1") 
    runner.run(MyComponent) 

Code Backend

from __future__ import absolute_import, division, print_function 

from twisted.internet.defer import inlineCallbacks 
from autobahn.twisted.wamp import ApplicationSession 
from twisted.internet import reactor, defer, threads 

# Imports 
import pandas as pd 


def get_data(): 
    """Returns a DataFrame of stuff as a JSON 

    :return: str, data as a JSON string 
    """ 
    data = pd.DataFrame({ 
     'col1': pd.np.arange(1000000), 
     'col2': "I'm big", 
     'col3': 'Like really big', 
    }) 
    print("call result shape: {0}, {1}".format(*data.shape)) 
    print(data.memory_usage().sum()) 
    print(data.head()) 
    return data.to_json() 


class MyBackend(ApplicationSession): 

    def __init__(self, config): 
     ApplicationSession.__init__(self, config) 

    @inlineCallbacks 
    def onJoin(self, details): 

     # Register a procedure for remote calling 
     @inlineCallbacks 
     def async_daily_price(eqt_list): 
      res = yield threads.deferToThread(get_data) 
      defer.returnValue(res) 

     yield self.register(async_daily_price, u'data.get') 


if __name__ == "__main__": 
    from autobahn.twisted.wamp import ApplicationRunner 

    runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1") 
    runner.run(MyBackend) 

Configuration

{ 
"version": 2, 
"controller": {}, 
"workers": [ 
    { 
     "type": "router", 
     "realms": [ 
      { 
       "name": "realm1", 
       "roles": [ 
        { 
         "name": "anonymous", 
         "permissions": [ 
          { 
           "uri": "", 
           "match": "prefix", 
           "allow": { 
            "call": true, 
            "register": true, 
            "publish": true, 
            "subscribe": true 
           }, 
           "disclose": { 
            "caller": false, 
            "publisher": false 
           }, 
           "cache": true 
          } 
         ] 
        } 
       ] 
      } 
     ], 
     "transports": [ 
      { 
       "type": "universal", 
       "endpoint": { 
        "type": "tcp", 
        "port": 8080 
       }, 
       "rawsocket": { 
       }, 
       "websocket": { 
        "ws": { 
         "type": "websocket", 
         "options": { 
          "enable_webstatus": false, 
          "max_frame_size": 16777216, 
          "auto_fragment_size": 65536, 
          "fail_by_drop": true, 
          "open_handshake_timeout": 2500, 
          "close_handshake_timeout": 1000, 
          "auto_ping_interval": 10000, 
          "auto_ping_timeout": 5000, 
          "auto_ping_size": 4, 
          "compression": { 
           "deflate": { 
            "request_no_context_takeover": false, 
            "request_max_window_bits": 11, 
            "no_context_takeover": false, 
            "max_window_bits": 11, 
            "memory_level": 4 
           } 
          } 
         } 
        } 
       }, 
       "web": { 
        "paths": { 
         "/": { 
          "type": "static", 
          } 
         } 
        } 
       } 
      ] 
     } 
    ] 
} 

Répondre

0

La solution proposée par le groupe crossbar.io consistait à utiliser l'option de résultat progressif du RPC.

Un exemple de travail complet est situé à https://github.com/crossbario/autobahn-python/tree/master/examples/twisted/wamp/rpc/progress

Dans mon code, je devais ajouter un l'Chunking du résultat dans le back-end

 step = 10000 
     if details.progress and len(res) > step: 
      for i in xrange(0, len(res), step): 
       details.progress(res[i:i+step]) 
     else: 
      defer.returnValue(res) 

Et à l'appelant

 res = yield self.call(
      u'data.get' 
      options=CallOptions(
       on_progress=partial(on_progress, res=res_list) 
      ) 
     ) 

Où ma fonction on_progress ajoute les morceaux à une liste de résultats

def on_progress(x, res): 
    res.append(x) 

Choisir la bonne taille de morceau fera l'affaire.