2017-08-29 3 views
0

J'écris un programme qui transfère des données qui sont lues à un autre homologue. J'ai une méthode qui est appelée après chaque lecture de données sur socket. Cette méthode poste les données dans un brin pour l'écrire à un autre pair. Lors de l'envoi de gros blocs de données, les données renvoyées par l'application ne sont pas les mêmes que celles reçues, le problème est que les données ne sont plus commandées. C'est uniquement le cas lorsque vous utilisez plusieurs threads dans le fichier boost :: asio :: io_service. HandleGatewayReply est appelé lorsque certaines données sont lues sur le socket. À ce stade (1), après avoir écrit des données dans un fichier, je peux voir que les données sont toujours ordonnées. Après cela, postBackendReply est appelé et les données sont toujours ordonnées (2). Cependant, dans SessionConnection :: postReply, si je vide les données dans un fichier (3), je peux voir que les données ne sont plus commandées. Je ne vois pas pourquoi la commande est perdue à ce stade, j'ai essayé d'utiliser un brin dans handleGatewayReply et dans postBackendReply (comme indiqué dans le code), mais le comportement est toujours le même.La commande d'async_writes est incorrecte même si j'utilise un brin

Désolé, je ne peux pas soumettre un exemple minimal, complet et vérifiable, car le bogue est trop difficile à repérer, le transfert multithread requis d'un grand nombre de données.

void Reply::handleGatewayReply(std::stringstream* stream) 
    { 
    // Flush data to file (1) 
    m_strand.post(std::bind([=]() { 
     postBackendReply(*stream); 
     delete stream; 
    } 
    })); 

    } 

void Reply::postBackendReply(const std::stringstream& stream) 
    { 
    auto buffer = std::make_shared<Buffer>(); 
    buffer->m_buffers.push_back(stream.str()); 
    // Flush data to file (2) 
    auto connection = m_request->connection(); 
    if (connection) { 
// connection->postReply(buffer); // doesn't work either 
      m_strand.post(std::bind(&SessionConnection::postReply, connection,buffer)); 
    } 

    } 


    void SessionConnection::postReply(BufferPtr buffer) 
    { 
     // Flush data to file (3) 
     m_ioService.post(
     m_ostrand.wrap(
      std::bind(&SessionConnection::sendNext, 
        shared_from_this(), buffer))); 
    } 
    } 

Répondre

4

Lorsque vous utilisez un brin:

  1. ne posterez pas gestionnaire directement au io_service - qui est garanti à perdre la commande et briser les garanties de concurrence.

  2. strand::wrap chaque gestionnaire asynchrone.

  3. dans un gestionnaire (enveloppé), si vous devez forcer l'ordre, dispatch au brin. Seulement post si le gestionnaire posté est toujours valide s'il est exécuté à un moment donné dans le futur.

exemple:

thing.async_op(mystrand.wrap([self = shared_from_this()](auto&& error){ 
    self->mystrand.dispatch(&must_be_done_now_A); 
    self->mystrand.post(&may_be_done_out_of_order_B); 
    self->mystrand.dispatch(&must_be_done_now_C); 
}); 

ordre d'exécution sera:

  1. le gestionnaire de » thing
  2. must_be_done_now_A
  3. must_be_done_now_C
  4. gestionnaire dese termine
  5. toute autre substance qui est entré dans le io_service dans l'intervalle
  6. may_be_done_out_of_order_B
+0

1. Je posterai jamais un gestionnaire directement à io_service, j'utilise toujours un brin 2. chaque Gestionnaire async_write est déjà enveloppé dans un brin 3. J'ai essayé de remplacer le poste avec l'envoi, mais le même problème. Une autre idée? pourquoi est-il ordonné dans Reply :: postBackendReply (donc ici pas de problème avec la commande) mais pas dans postReply? Que peut-il arriver ici? –

+0

@BenD Si vous n'enveloppez pas vos gestionnaires, toute commande est une question de chance. –

+0

@BenD dans votre fonction 'postReply' vous postez effectivement un post. la commande n'est plus garantie. –