2017-09-25 2 views
1

J'ai travaillé sur un client qui utilise PyQt5 et le module websockets qui est construit autour d'asyncio. Je pensais que quelque chose comme le code ci-dessous fonctionnerait, mais je trouve que les données entrantes (du serveur) ne sont pas mises à jour dans l'interface graphique jusqu'à ce que je clique sur entrer dans la zone d'édition de ligne. Ces messages entrants sont destinés à définir l'impulsion pour les mises à jour de l'interface graphique et transporteront les données à utiliser pour la mise à jour. Le quamash est-il une meilleure façon d'aborder cela? btw, je vais utiliser des processus pour d'autres aspects de ce code, donc je ne pense pas que ce soit exagéré (à ce stade). Ceci est Python 3.6, PyQt5.6 (ou supérieur) et quelle que soit la version de websockets qui installe actuellement avec pip. https://github.com/aaugustin/websocketswebsockets, asyncio et PyQt5 ensemble enfin. Est-ce que Quamash est nécessaire?

Le client:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 
import asyncio 
import websockets 
import sys 
import time 
from multiprocessing import Process, Pipe, Queue 
from PyQt5 import QtCore, QtGui, QtWidgets 

class ComBox(QtWidgets.QDialog): 
    def __init__(self): 
     QtWidgets.QDialog.__init__(self) 
     self.verticalLayout = QtWidgets.QVBoxLayout(self) 
     self.groupBox = QtWidgets.QGroupBox(self) 
     self.groupBox.setTitle("messages from beyond") 
     self.gridLayout = QtWidgets.QGridLayout(self.groupBox) 
     self.label = QtWidgets.QLabel(self.groupBox) 
     self.gridLayout.addWidget(self.label, 0, 0, 1, 1) 
     self.verticalLayout.addWidget(self.groupBox) 
     self.lineEdit = QtWidgets.QLineEdit(self) 
     self.verticalLayout.addWidget(self.lineEdit) 
     self.lineEdit.editingFinished.connect(self.enterPress) 

    @QtCore.pyqtSlot() 
    def enterPress(self): 
     mytext = str(self.lineEdit.text()) 
     self.inputqueue.put(mytext) 

    @QtCore.pyqtSlot(str) 
    def updategui(self, message): 
     self.label.setText(message) 

class Websocky(QtCore.QThread): 
    updatemaingui = QtCore.pyqtSignal(str) 
    def __init__(self): 
     super(Websocky, self).__init__() 
    def run(self): 
     while True: 
      time.sleep(.1) 
      message = self.outputqueue.get() 
      try: 
       self.updatemaingui[str].emit(message) 
      except Exception as e1: 
       print("updatemaingui problem: {}".format(e1)) 

async def consumer_handler(websocket): 
    while True: 
     try: 
      message = await websocket.recv() 
      outputqueue.put(message) 
     except Exception as e1: 
      print(e1) 

async def producer_handler(websocket): 
    while True: 
     message = inputqueue.get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

async def handler(): 
    async with websockets.connect('ws://localhost:8765') as websocket: 
     consumer_task = asyncio.ensure_future(consumer_handler(websocket)) 
     producer_task = asyncio.ensure_future(producer_handler(websocket)) 
     done, pending = await asyncio.wait(
      [consumer_task, producer_task], 
      return_when=asyncio.FIRST_COMPLETED,) 
     for task in pending: 
      task.cancel() 

def start_websockets(): 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(handler()) 

inputqueue = Queue() 
outputqueue = Queue() 

app = QtWidgets.QApplication(sys.argv) 
comboxDialog = ComBox() 
comboxDialog.inputqueue = inputqueue 
comboxDialog.outputqueue = outputqueue 
comboxDialog.show() 

webster = Websocky() 
webster.outputqueue = outputqueue 
webster.updatemaingui[str].connect(comboxDialog.updategui) 
webster.start() 

p2 = Process(target=start_websockets) 
p2.start() 

sys.exit(app.exec_()) 

Le serveur:

#!/usr/bin/env python3 
# -*- coding: utf-8 -*- 
import asyncio 
import time 
import websockets 

# here we'll store all active connections to use for sending periodic messages 
connections = [] 


#@asyncio.coroutine 
async def connection_handler(connection, path): 
    connections.append(connection) # add connection to pool 
    while True: 
     msg = await connection.recv() 
     if msg is None: # connection lost 
      connections.remove(connection) # remove connection from pool, when client disconnects 
      break 
     else: 
      print('< {}'.format(msg)) 

#@asyncio.coroutine 
async def send_periodically(): 
    while True: 
     await asyncio.sleep(2) # switch to other code and continue execution in 5 seconds 
     for connection in connections: 
      message = str(round(time.time())) 
      print('> Periodic event happened.') 
      await connection.send(message) # send message to each connected client 

start_server = websockets.serve(connection_handler, 'localhost', 8765) 
asyncio.get_event_loop().run_until_complete(start_server) 
asyncio.ensure_future(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages 
asyncio.get_event_loop().run_forever() 

Répondre

0

Peu de temps après la publication de cette question que je réalise le problème. La ligne

message = inputqueue.get() 

dans la fonction producer_handler est bloquante. Cela provoque ce qui devrait être une fonction asynchrone pour bloquer tout dans ce processus jusqu'à ce qu'il voit quelque chose dans la file d'attente. Ma solution de contournement consistait à utiliser le module d'aoprocessing qui fournit des files d'attente compatibles asyncio. Donc, il ressemble plus à ceci:

import aioprocessing 

async def producer_handler(websocket): 
    while True: 
     message = await inputqueue.coro_get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

inputqueue = aioprocessing.AioQueue() 

Le module d'aoprocessing fournit de jolies options et de la documentation. Et dans ce cas est une solution plutôt simple pour le problème. https://github.com/dano/aioprocessing Donc, pour répondre à ma question: Non, vous n'avez pas à utiliser le quamash pour ce genre de chose.