2009-07-07 11 views
15

J'ai un Arduino connecté à mon ordinateur qui exécute une boucle, renvoyant une valeur par le port série à l'ordinateur tous les 100   ms.pyserial - Comment lire la dernière ligne envoyée à partir d'un périphérique série

Je veux faire un script Python qui lira à partir du port série seulement toutes les quelques secondes, donc je veux juste voir la dernière chose envoyée à partir de l'Arduino.

Comment faites-vous cela dans Pyserial?

Voici le code que j'ai essayé et qui ne marche pas. Il lit les lignes de manière séquentielle.

import serial 
import time 

ser = serial.Serial('com4',9600,timeout=1) 
while 1: 
    time.sleep(10) 
    print ser.readline() #How do I get the most recent line sent from the device? 

Répondre

27

Peut-être que je suis mal compris votre question, mais comme il est une ligne série, vous devrez lire tout envoyé de la séquence Arduino - il va tamponner dans l'Arduino jusqu'à ce que vous lisez il. Si vous voulez avoir un affichage d'état qui montre la dernière chose envoyée - utilisez un fil qui incorpore le code dans votre question (moins le sommeil), et gardez la dernière ligne complète lue comme la dernière ligne de l'Arduino.

Mise à jour: exemple de code de mtasic est assez bonne, mais si l'Arduino a envoyé une ligne partielle lorsque inWaiting() est appelée, vous obtiendrez une ligne tronquée. Au lieu de cela, ce que vous voulez faire est de mettre la dernière ligne complète dans last_received, et gardez la ligne partielle dans buffer afin qu'il puisse être ajouté à la prochaine fois autour de la boucle. Quelque chose comme ceci:

def receiving(ser): 
    global last_received 

    buffer_string = '' 
    while True: 
     buffer_string = buffer_string + ser.read(ser.inWaiting()) 
     if '\n' in buffer_string: 
      lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries 
      last_received = lines[-2] 
      #If the Arduino sends lots of empty lines, you'll lose the 
      #last filled line, so you could make the above statement conditional 
      #like so: if lines[-2]: last_received = lines[-2] 
      buffer_string = lines[-1] 

En ce qui concerne l'utilisation de readline(): Voici ce que la documentation pyserial a à dire (légèrement modifié pour plus de clarté et une mention à readlines()):

Soyez prudent lorsque vous utilisez " readline ". Ne spécifier un délai d'attente lors de l'ouverture du port série , sinon il pourrait bloquer pour toujours si aucun caractère de nouvelle ligne est reçu. Notez également que "readlines()" ne fonctionne qu'avec un délai d'expiration. Il dépend d'avoir un délai et interprète cela comme EOF (fin de fichier).

ce qui me semble assez raisonnable!

10
from serial import * 
from threading import Thread 

last_received = '' 

def receiving(ser): 
    global last_received 
    buffer = '' 

    while True: 
     # last_received = ser.readline() 
     buffer += ser.read(ser.inWaiting()) 
     if '\n' in buffer: 
      last_received, buffer = buffer.split('\n')[-2:] 

if __name__ == '__main__': 
    ser = Serial(
     port=None, 
     baudrate=9600, 
     bytesize=EIGHTBITS, 
     parity=PARITY_NONE, 
     stopbits=STOPBITS_ONE, 
     timeout=0.1, 
     xonxoff=0, 
     rtscts=0, 
     interCharTimeout=None 
    ) 

    Thread(target=receiving, args=(ser,)).start() 
+0

bien qui se lit dans le total de la somme de ce qui est dans le tampon de réception. Mon impression est que le demandeur est en train de délimiter ce que l'Arduino envoie par les nouvelles lignes, donc il ne correspondra probablement pas à la taille du tampon de réception. – JosefAssad

+0

Donc, last_received aura toujours ce dont j'ai besoin? Y a-t-il un moyen de le faire avec readline? – Greg

+0

ce droit, selon votre question – mtasic85

2

Vous aurez besoin d'une boucle pour lire tout ce qui est envoyé, avec le dernier appel à readline() bloquant jusqu'au timeout. Alors:

def readLastLine(ser): 
    last_data='' 
    while True: 
     data=ser.readline() 
     if data!='': 
      last_data=data 
     else: 
      return last_data 
2

modification légère à mtasic Code de & Vinay Sajip:

Alors que je trouve ce code me très utile pour une application similaire, je avais besoin toutes les lignes qui reviennent d'un dispositif de série enverrait des informations périodiquement. J'ai opté pour pop le premier élément du haut, l'enregistrer, puis rejoindre les éléments restants comme le nouveau tampon et continuer à partir de là.

Je me rends compte que c'est pas ce que Greg demandait, mais je pensais que ça valait la peine d'être partagé en note annexe.

def receiving(ser): 
    global last_received 

    buffer = '' 
    while True: 
     buffer = buffer + ser.read(ser.inWaiting()) 
     if '\n' in buffer: 
      lines = buffer.split('\n') 
      last_received = lines.pop(0) 

      buffer = '\n'.join(lines) 
7

Ces solutions monopolisent le processeur en attendant les caractères.

Vous devez faire au moins un appel de blocage à lire (1)

while True: 
    if '\n' in buffer: 
     pass # skip if a line already in buffer 
    else: 
     buffer += ser.read(1) # this will block until one more char or timeout 
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars 

... et faire la chose divisée comme avant.

2

L'utilisation de .inWaiting() à l'intérieur d'une boucle infinie peut être problématique. Il peut monopoliser toute la CPU en fonction de la mise en œuvre. Au lieu de cela, je recommanderais d'utiliser une taille spécifique de données à lire. Donc, dans ce cas, ce qui suit devrait se faire par exemple:

ser.read(1024) 
3

Cette méthode vous permet de contrôler séparément le délai d'attente pour recueillir toutes les données pour chaque ligne, et un délai d'attente différent d'attente sur les lignes supplémentaires.

# get the last line from serial port 
lines = serial_com() 
lines[-1]    

def serial_com(): 
    '''Serial communications: get a response''' 

    # open serial port 
    try: 
     serial_port = serial.Serial(com_port, baudrate=115200, timeout=1) 
    except serial.SerialException as e: 
     print("could not open serial port '{}': {}".format(com_port, e)) 

    # read response from serial port 
    lines = [] 
    while True: 
     line = serial_port.readline() 
     lines.append(line.decode('utf-8').rstrip()) 

     # wait for new data after each line 
     timeout = time.time() + 0.1 
     while not serial_port.inWaiting() and timeout > time.time(): 
      pass 
     if not serial_port.inWaiting(): 
      break 

    #close the serial port 
    serial_port.close() 
    return lines 
+0

Merci pour ça. Je n'ai pas trouvé d'autre réponse qui retournerait toutes les lignes d'une réponse en série. – Timmay

2

Vous pouvez utiliser ser.flushInput() pour débusquer toutes les données série qui est actuellement dans le tampon.

Après avoir effacé les anciennes données, vous pouvez utiliser ser.readline() pour obtenir les données les plus récentes du périphérique série. Je pense que c'est un peu plus simple que les autres solutions proposées sur ici. J'ai travaillé pour moi, j'espère que ça vous convient.

2

complications Trop

Quelle est la raison de diviser l'objet octets par saut de ligne ou par d'autres manipulations de tableau? J'écris la méthode la plus simple, qui va résoudre votre problème:

import serial 
s = serial.Serial(31) 
s.write(bytes("ATI\r\n", "utf-8")); 
while True: 
    last = '' 
    for byte in s.read(s.inWaiting()): last += chr(byte) 
    if len(last) > 0: 
     # Do whatever you want with last 
     print (bytes(last, "utf-8")) 
     last = '' 
Questions connexes