2010-08-09 5 views
2

Finalement, j'essaie de transférer des tampons d'une machine à une autre. Le code ci-dessous prend le flux de <id><size><data with size bytes> et lit la partie dans la fonction handleReadHeader, puis lit le nombre d'octets <size>, puis revient en arrière et attend une autre paire <id><size>. J'ai collé beaucoup de code, mais vraiment les seules fonctions que je me méfie de sont:
:: Downlink addMsgToQueue
:: Downlink writeCallback
Downlink :: startWrites() Downlink :: handleReadHeader
Downlink :: handleReadFrameDataBGRboost :: échec des transferts de paquets asio lors d'une tentative d'envoi très fréquente de paquets

using namespace std; 
using namespace boost; 
using namespace boost::asio; 

Downlink::Downlink() : 
    socket(nIO), 
    headerSize(sizeof(unsigned int)+1), 
    connected(false), 
    isWriting(false), 
    readHeaderBuffer(headerSize) 
{} 

Downlink::~Downlink() { 
    disconnect(); 
} 

bool Downlink::connect(const std::string &robotHost, unsigned int port) { 
    disconnect(); 

    ip::tcp::resolver resolver(nIO); 
    ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port)); 
    ip::tcp::resolver::iterator iterator = resolver.resolve(query); 

    ip::tcp::resolver::iterator end; 
    boost::system::error_code ec; 
    for(;iterator!=end;++iterator) { 
    socket.connect(*iterator, ec); 
    if(!ec) 
     break; 
    socket.close(); 
    } 
    if(!socket.is_open()) 
    return false; 

    async_read(socket, buffer(readHeaderBuffer), 
     bind(&Downlink::handleReadHeader, this, _1, _2)); 

    //start network thread. 
    lock_guard<mutex> l(msgMutex); 
    outgoingMessages = queue<vector<char> >(); 
    nIO.reset(); 
    t = thread(bind(&boost::asio::io_service::run, &nIO)); 
    connected = true; 
    return true; 
} 

bool Downlink::isConnected() const { 
    return connected; 
} 

void Downlink::disconnect() { 
    nIO.stop(); 
    t.join(); 
    socket.close(); 
    connected = false; 
    isWriting = false; 
    nIO.reset(); 
    nIO.run(); 
} 

void Downlink::writeToLogs(const std::string &logMsg) { 
    vector<char> newMsg(logMsg.length()+headerSize); 
    newMsg[0] = MSG_WRITE_LOG; 
    const unsigned int msgLen(logMsg.length()); 
    memcpy(&newMsg[1], &msgLen, sizeof(unsigned int)); 
    vector<char>::iterator dataBegin = newMsg.begin(); 
    advance(dataBegin, headerSize); 
    copy(logMsg.begin(), logMsg.end(), dataBegin); 
    assert(newMsg.size()==(headerSize+logMsg.length())); 
    addMsgToQueue(newMsg); 
} 

void Downlink::addMsgToQueue(const std::vector<char> &newMsg) { 
    lock_guard<mutex> l(msgMutex); 
    outgoingMessages.push(newMsg); 
    lock_guard<mutex> l2(outMutex); 
    if(!isWriting) { 
    nIO.post(bind(&Downlink::startWrites, this)); 
    } 
} 

void Downlink::writeCallback(const boost::system::error_code& error, 
     std::size_t bytes_transferred) { 
    if(error) { 
    disconnect(); 
    lock_guard<mutex> l(msgMutex); 
    outgoingMessages = queue<vector<char> >(); 
    return; 
    } 
    { 
    lock_guard<mutex> l2(outMutex); 
    isWriting = false; 
    } 
    startWrites(); 
} 


void Downlink::startWrites() { 
    lock_guard<mutex> l(msgMutex); 
    lock_guard<mutex> l2(outMutex); 
    if(outgoingMessages.empty()) { 
    isWriting = false; 
    return; 
    } 

    if(!isWriting) { 
    currentOutgoing = outgoingMessages.front(); 
    outgoingMessages.pop(); 
    async_write(socket, buffer(currentOutgoing), 
    bind(&Downlink::writeCallback, this, _1, _2)); 
    isWriting = true; 
    } 
} 

void Downlink::handleReadHeader(const boost::system::error_code& error, 
    std::size_t bytes_transferred) { 
    //TODO: how to handle disconnect on errors? 
    cout<<"handleReadHeader"<<endl; 
    if(error) { 
    return; 
    } 
    assert(bytes_transferred==headerSize); 
    if(bytes_transferred!=headerSize) { 
    cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl; 
    } 
    currentPacketID = readHeaderBuffer[0]; 

    memcpy(&currentPacketLength, &readHeaderBuffer[1], sizeof(unsigned int)); 
    dataStream.resize(currentPacketLength); 
    switch(currentPacketID) { 
    case MSG_FRAME_BGR: { 
    cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl; 
    async_read(socket, asio::buffer(dataStream), 
     boost::asio::transfer_at_least(currentPacketLength), 
     bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));  
    } break; 
    default: { 
    cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl; 
    cout<<"  "<<(int)currentPacketID<<endl; 
    async_read(socket, asio::buffer(dataStream), 
     boost::asio::transfer_at_least(currentPacketLength), 
     bind(&Downlink::handleReadData, this, _1, _2)); 
    } break; 
    } 
} 

void Downlink::handleReadData(const boost::system::error_code& error, 
    std::size_t bytes_transferred) { 
    cout<<"handleReadData"<<endl; 
    if(error) { 
    return; 
    } 
    if(bytes_transferred!=currentPacketLength) { 
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl; 
    } 
    assert(bytes_transferred==currentPacketLength); 

    switch(currentPacketID) { 
    case MSG_ASCII: { 
    string msg(dataStream.begin(), dataStream.end()); 
    textCallback(&msg); 
    } break; 
    case MSG_IMU: { 
    Eigen::Vector3d a,g,m; 
    unsigned int stamp; 
    memcpy(a.data(), &dataStream[0], sizeof(double)*3); 
    memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3); 
    memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3); 
    memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int)); 
    imuCallback(a,m,g,stamp); 
    } break; 
    default: 
    //TODO: handle this better? 
    cout<<"Unknown packet ID."<<endl; 
    } 

    async_read(socket, buffer(readHeaderBuffer), 
     boost::asio::transfer_at_least(headerSize), 
     bind(&Downlink::handleReadHeader, this, _1, _2)); 
} 

void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error, 
      std::size_t bytes_transferred) { 
    cout<<"Got a frame"<<endl; 
    if(error) { 
    return; 
    } 
    if(bytes_transferred!=currentPacketLength) { 
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl; 
    } 
    assert(bytes_transferred==currentPacketLength); 
    unsigned int imageWidth, imageHeight, cameraID; 

    unsigned char *readOffset = (unsigned char*)&dataStream[0]; 
    memcpy(&imageWidth, readOffset, sizeof(unsigned int)); 
    readOffset += sizeof(unsigned int); 
    memcpy(&imageHeight, readOffset, sizeof(unsigned int)); 
    readOffset += sizeof(unsigned int); 
    memcpy(&cameraID, readOffset, sizeof(unsigned int)); 
    readOffset += sizeof(unsigned int); 

    cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl; 

    frameCallback(readOffset, imageWidth, imageHeight, cameraID); 

    async_read(socket, buffer(readHeaderBuffer), 
     boost::asio::transfer_at_least(headerSize), 
     bind(&Downlink::handleReadHeader, this, _1, _2)); 
} 


boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) { 
    return textCallback.connect(s); 
} 

boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) { 
    return imuCallback.connect(s); 
} 

boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) { 
    return frameCallback.connect(s); 
} 

Voici le code à l'autre extrémité. C'est presque exactement la même chose que l'autre code, mais l'erreur pourrait être dans les deux extrémités.

using namespace std; 
using namespace boost; 
using namespace boost::asio; 

Uplink::Uplink(unsigned int port) : 
    socket(nIO), 
    acceptor(nIO), 
    endpoint(ip::tcp::v4(), port), 
    headerSize(sizeof(unsigned int)+1), //id + data size 
    headerBuffer(headerSize) 
{ 
    //move socket into accept state. 
    acceptor.open(endpoint.protocol()); 
    acceptor.set_option(ip::tcp::acceptor::reuse_address(true)); 
    acceptor.bind(endpoint); 
    acceptor.listen(1); //1 means only one client in connect queue. 
    acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1)); 
    //start network thread. 
    nIO.reset(); 
    t = thread(boost::bind(&boost::asio::io_service::run, &nIO)); 
} 

Uplink::~Uplink() { 
    nIO.stop(); //tell the network thread to stop. 
    t.join(); //wait for the network thread to stop. 
    acceptor.close(); //close listen port. 
    socket.close(); //close active connections. 
    nIO.reset(); 
    nIO.run(); //let clients know that we're disconnecting. 
} 

void Uplink::parse_header(const boost::system::error_code& error, 
    std::size_t bytes_transferred) { 
    if(error || bytes_transferred!=headerSize) { 
    disconnect(); 
    return; 
    } 
    currentPacketID = headerBuffer[0]; 
    memcpy(&currentPacketLength, &headerBuffer[1], sizeof(unsigned int)); 
    //move to read data state 

    //TODO: move to different states to parse various packet types. 

    async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength), 
     bind(&Uplink::parse_data, this, _1, _2)); 
} 

void Uplink::parse_data(const boost::system::error_code& error, 
    std::size_t bytes_transferred) { 
    if(error) { 
    disconnect(); 
    return; 
    } 

    if(bytes_transferred != currentPacketLength) { 
    cout<<"bytes_transferred != currentPacketLength"<<endl; 
    disconnect(); 
    return; 
    } 

    //move back into the header reading state 
    async_read(socket, buffer(headerBuffer), 
     bind(&Uplink::parse_header, this, _1, _2)); 
} 

void Uplink::disconnect() { 
    acceptor.close(); 
    socket.close(); 
    acceptor.open(endpoint.protocol()); 
    acceptor.set_option(ip::tcp::acceptor::reuse_address(true)); 
    acceptor.bind(endpoint); 
    acceptor.listen(1); //1 means only one client in connect queue. 
    acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1)); 
} 

void Uplink::accept_handler(const boost::system::error_code& error) 
{ 
    if (!error) { 
    //no more clents. 
    acceptor.close(); 
    //move to read header state. 
    async_read(socket, buffer(headerBuffer), 
     bind(&Uplink::parse_header, this, _1, _2)); 
    } 
} 

void Uplink::sendASCIIMessage(const std::string &m) { 
    //Format the message 
    unsigned int msgLength(m.length()); 
    vector<char> outBuffer(msgLength+headerSize); 
    outBuffer[0] = MSG_ASCII; 
    memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int)); 
    vector<char>::iterator dataBegin(outBuffer.begin()); 
    advance(dataBegin, headerSize); 
    copy(m.begin(), m.end(), dataBegin); 
    //queue the message 
    addToQueue(outBuffer); 
} 

void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) { 
    //Format the message. 
    //a,g,m, 3 components each plus a stamp 
    const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int)); 
    vector<char> outBuffer(msgLength+headerSize); 
    outBuffer[0] = MSG_IMU; 
    memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int)); 

    const Eigen::Vector3d a(d->getAccel()); 
    const Eigen::Vector3d m(d->getMag()); 
    const Eigen::Vector3d g(d->getGyro()); 
    const unsigned int s(d->getUpdateStamp()); 

    memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3); 
    memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3); 
    memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3); 
    memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int)); 

    /* 
    cout<<"----------------------------------------"<<endl; 
    cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl; 
    cout<<"Mag = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl; 
    cout<<"Gyro = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl; 
    cout<<"Stamp = "<<s<<endl; 
    cout<<"----------------------------------------"<<endl; 
    */ 

    //queue the message 
    addToQueue(outBuffer); 
} 

void Uplink::send_handler(const boost::system::error_code& error, 
    std::size_t bytes_transferred) { 
    { 
    lock_guard<mutex> l(queueLock); 
    lock_guard<mutex> l2(sendingLock); 
    if(outQueue.empty()) { 
     currentlySending = false; 
     return; 
    } 
    } 
    startSend(); 
} 

void Uplink::addToQueue(const std::vector<char> &out) { 
    bool needsRestart = false; 
    { 
    lock_guard<mutex> l(queueLock); 
    lock_guard<mutex> l2(sendingLock); 
    outQueue.push(out); 
    needsRestart = !currentlySending; 
    } 
    if(needsRestart) 
    nIO.post(bind(&Uplink::startSend, this)); 
} 

void Uplink::startSend() { 
    lock_guard<mutex> l(queueLock); 
    lock_guard<mutex> l2(sendingLock); 
    if(outQueue.empty()) 
    return; 
    currentlySending = true; 
    currentWrite = outQueue.front(); 
    outQueue.pop(); 
    async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler, 
     this, _1, _2)); 
} 

void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height, 
      const unsigned int cameraID, const unsigned char *frameData) { 
    //        image data   image metadata  header 
    const unsigned int packetSize(width*height*3 + sizeof(unsigned int)*3 + headerSize); 
    const unsigned int dataSize(width*height*3 + sizeof(unsigned int)*3); 
    vector<char> outgoingBuffer(packetSize); 
    outgoingBuffer[0] = MSG_FRAME_BGR; 
    memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int)); 
    char *writePtr = &outgoingBuffer[headerSize]; 
    memcpy(writePtr, &width, sizeof(unsigned int)); 
    writePtr += sizeof(unsigned int); 
    memcpy(writePtr, &height, sizeof(unsigned int)); 
    writePtr += sizeof(unsigned int); 
    memcpy(writePtr, &cameraID, sizeof(unsigned int)); 
    writePtr += sizeof(unsigned int); 
    memcpy(writePtr, frameData, width*height*3*sizeof(char)); 

    //TODO: can we avoid the whole image copy here? 
    //TODO: should come up with a better packet buffer build system. 
    //IDEA!: maybe have a "request buffer" funxction so the Uplink 
    //class can have sole ownership, rather than do the copy in "addtoQueue" 
    addToQueue(outgoingBuffer); 
} 

Ce programme fonctionne la plupart du temps, mais rarement, lors de l'envoi d'un grand nombre de données sans délai entre les paquets, il échouera. Par exemple:

après avoir manipulé une image vidéo en Downlink il remonte à la hadleHeaderData et attend un paquet qui est plusieurs méga-octets de longueur et pour un ID de paquet qui n'existe pas. D'une certaine manière, le flux est corrompu. Je ne sais pas pourquoi. Je ne m'intéresse pas vraiment au code que j'ai écrit maintenant, donc si quelqu'un connaît une bonne classe ou bibliothèque pour analyser les flux sur TCP en blocs de tampons pour moi, je préfère l'utiliser.

EDIT:
Voici le code exact qui exécute l'envoi de données:

if(frontImage) { 
     uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0, 
        (unsigned char*)frontImage->imageData); 
     cout<<"Sent"<<endl; 
     //sleep(1); //works fine if this is uncommented ! 
    } 

    uplink.sendASCIIMessage("Alive..."); 
    sleep(1); 
    uplink.sendIMUDataBlock(imuDataBlock.get()); 
    cout<<"Loop"<<endl; 
    sleep(1); 
    } 
+0

Pouvez-vous afficher le code qui appelle 'sendVideoFrameBGR/ASCIIMessage'? Mon soupçon est que vous appelez les deux avant qu'une écriture soit terminée. –

+0

Bon point. Cependant, si vous vérifiez Uplink :: addToQueue vous verrez qu'il y a une variable "outQueue". Cela stocke une file d'attente de messages sortants. Un seul est envoyé à la fois. C'est exactement le genre de chose que je n'aime pas dans ce code. La fonction addToQueue est sujette aux erreurs. –

+0

"problèmes avec certains asio code boost." Fixez le titre s'il vous plaît! – unixman83

Répondre

3

Le problème est plus probable que votre objet ioservice a plus d'un travail de manipulation de fil.

Lorsque vous appelez la deuxième fonction d'envoi immédiatement après la première, les deux objets de fonction affichés dans le ioservice sont probablement délégués à différents threads. Donc, fondamentalement, deux écritures se produisent sur le même socket en parallèle. Ceci est très probablement illégal. L'utilisation de Winsock2 avec des sockets non bloquantes provoquerait la corruption des données sortantes.

Même si vous utilisez un booléen pour vérifier s'il est en cours d'envoi, le bool n'est pas vérifié tant que l'un des threads ioservice ne gère pas la fonction. Si deux threads ioservice sont actifs lorsque vous publiez les deux tâches, les deux envois peuvent être envoyés en même temps, ce qui provoque l'apparition asynchrone des deux fonctions d'envoi sur des threads distincts. Le contrôle 'envoie actuellement' peut renvoyer faux dans les deux appels, puisque les deux envois sont en parallèle.