2017-10-20 31 views
0

Je suis l'article https://blog.golang.org/pipelines pour implémenter quelques étapes.In golang, comment écrire un étage pipeline qui introduit un délai pour l'étape suivante?

J'ai besoin de l'une des étapes pour introduire un délai de quelques secondes avant que les événements ne passent à l'étape suivante du pipeline. Mon souci avec le code ci-dessous est qu'il donnera un nombre illimité de routines go que time.Sleep() avant de passer les événements le long. Y a-t-il de meilleurs moyens de le faire?

Merci!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    out := make(chan *Bar, 10000) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 

Répondre

1

Vous pouvez utiliser un autre canal pour limiter le nombre de goroutines actives que votre boucle est capable de créer.

const numRoutines = 10 

func fooStage(inChan <-chan *Bar) <-chan *Bar { 
    out := make(chan *Bar, 10000) 
    routines := make(chan struct{}, numRoutines) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       routines <- struct{}{} 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
        <-routines 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 
+0

Merci, cela semble être une bonne idée. Le seul défaut que je peux voir est que si le canal 'routines' est bloqué, les événements seront retardés de plus de 5 secondes. Il n'y a pas de bonne façon d'aborder cela sans avoir d'horodatage à l'intérieur de l'événement. – ultimoo

+1

@ultimoo En raison de l'attente de 5 secondes, vous pouvez facilement exécuter des centaines ou des milliers de goroutines, ce qui réduirait le temps d'attente pratique. Quelque chose comme ceci est difficile à déterminer simplement en lisant le code. Des tests et des analyses comparatives seraient nécessaires pour déterminer réellement comment les choses fonctionneraient réellement. – RayfenWindspear

+0

Absolument, c'est plus une expérience. Mon intuition est que l'exécution de quelques milliers de ces goroutines devrait être OK - puisque tout ce qu'ils font est exécuter 'time.Sleep()', ils ne seront pas programmés sur le processeur pendant la plus grande partie de leur durée de vie. – ultimoo

0

Vous pouvez utiliser time.Ticker:

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    //... some code 
    ticker := time.NewTicker(5 * time.Second) 
    <-ticker // the delay, probably need to call twice 
    ticker.Stop() 
    close(ticker.C) 
    //... rest code 
} 
+0

Pouvez-vous expliquer comment cela fonctionne avec les événements suivants? Si deux événements surviennent sur l'inChan, le deuxième événement n'attendrait-il pas 10 secondes? – ultimoo

+0

'<-ticker' devrait retourner pas plus souvent que la période spécifiée. Essayez-le –

1

Vous pouvez fixer le nombre de goroutines manuellement - à partir seul numéro.

func sleepStage(in <-chan *Bar) (out <-chan *Bar) { 
    out = make(<-chan *Bar) 
    wg := sync.WaitGroup 
    for i:=0; i < N; i++ { // Number of goroutines in parallel 
      wg.Add(1) 
      go func(){ 
       defer wg.Done() 
       for e := range in { 
        time.Sleep(5*time.Seconds) 
        out <- e 
       } 
      }() 
     } 
     go func(){} 
      wg.Wait() 
      close(out) 
     }() 
     return out 
    }