2017-09-14 4 views
0

J'essaye d'envoyer un fichier sur le réseau en utilisant Twisted avec le protocole LineReceiver. Le problème que je vois est que quand je lis un fichier binaire et essaie d'envoyer les morceaux, ils n'envoient tout simplement pas.Python Twisted envoi d'un fichier sur un réseau

Je lis le fichier en utilisant:

import json 
import time 
import threading 
from twisted.internet import reactor, threads 
from twisted.protocols.basic import LineReceiver 
from twisted.internet import protocol 

MaximumMsgSize = 15500 

trySend = True 
connectionToServer = None 

class ClientInterfaceFactory(protocol.Factory): 

    def buildProtocol(self, addr): 
     return WoosterInterfaceProtocol(self._msgProcessor, self._logger) 

class ClientInterfaceProtocol(LineReceiver): 

    def connectionMade(self): 
     connectionToServer = self 

    def _DecodeMessage(self, rawMsg): 
     header, body = json.loads(rawMsg) 
     return (header, json.loads(body)) 

    def ProcessIncomingMsg(self, rawMsg, connObject): 
     # Decode raw message. 
     decodedMsg = self._DecodeMessage(rawMsg) 

     self.ProccessTransmitJobToNode(decodedMsg, connObject) 

    def _BuildMessage(self, id, msgBody = {}): 
     msgs = [] 

     fullMsgBody = json.dumps(msgBody) 
     msgBodyLength = len(fullMsgBody) 

     totalParts = 1 if msgBodyLength <= MaximumMsgSize else \ 
      int(math.ceil(msgBodyLength/MaximumMsgSize)) 

     startPoint = 0 
     msgBodyPos = 0 

     for partNo in range(totalParts): 
      msgBodyPos = (partNo + 1) * MaximumMsgSize 

      header = {'ID' : id, 'MsgParts' : totalParts, 
       'MsgPart' : partNo } 
      msg = (header, fullMsgBody[startPoint:msgBodyPos]) 
      jsonMsg = json.dumps(msg)  

      msgs.append(jsonMsg) 
      startPoint = msgBodyPos 

     return (msgs, '') 

    def ProccessTransmitJobToNode(self, msg, connection): 
     rootDir = '../documentation/configs/Wooster' 

     exportedFiles = ['consoleLog.txt', 'blob.dat'] 
     params = { 
      'Status' : 'buildStatus', 
      'TaskID' : 'taskID', 
      'Name' : 'taskName', 
      'Exports' : len(exportedFiles), 
      } 
     msg, statusStr = self._BuildMessage(101, params) 
     connection.sendLine(msg[0]) 

     for filename in exportedFiles: 
      with open (filename, "rb") as exportFileHandle: 
       data = exportFileHandle.read().encode('base64') 

      params = { 
       ExportFileToMaster_Tag.TaskID : taskID, 
       ExportFileToMaster_Tag.FileContents : data, 
       ExportFileToMaster_Tag.Filename : filename 
      } 
      msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)   
      for m in msgs: 
       connection.sendLine(m) 

    def lineReceived(self, data): 
     threads.deferToThread(self.ProcessIncomingMsg, data, self) 


def ConnectFailed(reason): 
    print 'Connection failed..' 
    reactor.callLater(20, reactor.callFromThread, ConnectToServer) 

def ConnectToServer(): 
    print 'Connecting...' 
    from twisted.internet.endpoints import TCP4ClientEndpoint 
    endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181) 

    deferItem = endpoint.connect(factory) 
    deferItem.addErrback(ConnectFailed) 

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False}) 
netThread.start() 

reactor.callFromThread(ConnectToServer) 

factory = ClientInterfaceFactory() 
protocol = ClientInterfaceProtocol() 

while 1: 
    time.sleep(0.01) 

    if connectionToServer == None: continue 

    if trySend == True: 
     protocol.ProccessTransmitJobToNode(None, None) 
     trySend = False 

Y at-il quelque chose que je fais mal fichier est envoyé, il est quand l'écriture fait partie de plusieurs ou il y a plus d'un fichier, il se bat?.

Si une seule écriture se produit alors le m Note: J'ai mis à jour la question avec un morceau brut de code d'échantillon dans l'espoir que cela soit logique.

Répondre

1

_BuildMessage renvoie un double-tuple: (msgs, '').

Votre code réseau itère sur ce:

msgs = self._BuildMessage(MsgID.ExportFileToMaster, params) 

for m in msgs: 

donc votre code réseau tente d'abord d'envoyer une liste de données codées JSON et essaie ensuite d'envoyer la chaîne vide. Cela déclenche probablement une exception car vous ne pouvez pas envoyer une liste de n'importe quoi en utilisant sendLine. Si vous ne voyez pas l'exception, vous avez oublié d'activer la journalisation. Vous devez toujours activer la journalisation pour pouvoir voir les exceptions qui se produisent.

En outre, vous utilisez time.sleep et vous ne devriez pas faire cela dans un programme basé sur Twisted. Si vous faites cela pour essayer d'éviter de surcharger le récepteur, vous devriez utiliser la contre-pression native de TCP à la place en enregistrant un producteur qui peut recevoir des notifications de pause et de reprise. Quoi qu'il en soit, time.sleep (et votre boucle sur toutes les données) bloquera l'ensemble du fil du réacteur et empêchera tout progrès. La conséquence est que la plupart des données seront tamponnées localement avant d'être envoyées.

De plus, votre code appelle LineReceiver.sendLine à partir d'un filetage non-réacteur. Cela a des résultats indéfinis mais vous pouvez probablement compter dessus pour ne pas fonctionner.

Cette boucle exécute dans le thread principal:

while 1: 
    time.sleep(0.01) 

    if connectionToServer == None: continue 

    if trySend == True: 
     protocol.ProccessTransmitJobToNode(None, None) 
     trySend = False 

alors que le réacteur fonctionne dans un autre thread:

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False}) 
netThread.start() 

ProcessTransmitJobToNode appelle simplement self.sendLine:

def ProccessTransmitJobToNode(self, msg, connection): 
    rootDir = '../documentation/configs/Wooster' 

    exportedFiles = ['consoleLog.txt', 'blob.dat'] 
    params = { 
     'Status' : 'buildStatus', 
     'TaskID' : 'taskID', 
     'Name' : 'taskName', 
     'Exports' : len(exportedFiles), 
     } 
    msg, statusStr = self._BuildMessage(101, params) 
    connection.sendLine(msg[0]) 

Vous devriez probablement supprimer l'utilisation de threading entièrement de l'applicati sur. Les événements basés sur le temps sont mieux gérés en utilisant reactor.callLater (votre boucle de thread principal génère effectivement un appel à ProcessTransmitJobToNode une fois cent fois par seconde (effets modulo du drapeau trySend)).

Vous pouvez également jeter un oeil à https://github.com/twisted/tubes comme un meilleur moyen de gérer de grandes quantités de données avec Twisted.

+0

Le message envoyé n'est pas une liste, il s'agit d'une sortie Jps dumps(). Je renvoie une liste de messages, mais pour les boucler –

+0

Désolé, il devrait être msgs, _ = self._Build ........ –

+0

Oui, oui, il devrait. –