0

Je suis un peu nouveau dans le monde de l'informatique distribuée. Je lisais le following du tutoriel officiel tensorflow, mais je suis assez confus sur ce qui se passe dans l'exemple principal du tutoriel.Pouvez-vous expliquer l'exemple de didacticiel Tensorflow distribué?

En particulier, comment les tâches ps et les travailleurs interagissent-ils? Quel est exactement le rôle des emplois ps? Leur partie correspondante dans le code est assez limitée et ils ne semblent pas faire grand-chose, alors quel est leur but? Je suppose que je ne comprends pas comment les différentes parties de notre système distribué fonctionnent ensemble.

Ce serait génial si quelqu'un pouvait expliquer ce qui se passe exactement comme vous exécutez les commandes du shell à la fin en termes de processus différents et de leurs actions.

Voici le code principal de référence:

import argparse 
import sys 

import tensorflow as tf 

FLAGS = None 

def main(_): 
    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 

    # Create a cluster from the parameter server and worker hosts. 
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    # Create and start a server for the local task. 
    server = tf.train.Server(cluster, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    if FLAGS.job_name == "ps": 
    server.join() 
    elif FLAGS.job_name == "worker": 

    # Assigns ops to the local worker by default. 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 

     # Build model... 
     loss = ... 
     global_step = tf.contrib.framework.get_or_create_global_step() 

     train_op = tf.train.AdagradOptimizer(0.01).minimize(
      loss, global_step=global_step) 

    # The StopAtStepHook handles stopping after running given steps. 
    hooks=[tf.train.StopAtStepHook(last_step=1000000)] 

    # The MonitoredTrainingSession takes care of session initialization, 
    # restoring from a checkpoint, saving to a checkpoint, and closing when done 
    # or an error occurs. 
    with tf.train.MonitoredTrainingSession(master=server.target, 
              is_chief=(FLAGS.task_index == 0), 
              checkpoint_dir="/tmp/train_logs", 
              hooks=hooks) as mon_sess: 
     while not mon_sess.should_stop(): 
     # Run a training step asynchronously. 
     # See `tf.train.SyncReplicasOptimizer` for additional details on how to 
     # perform *synchronous* training. 
     # mon_sess.run handles AbortedError in case of preempted PS. 
     mon_sess.run(train_op) 

if __name__ == "__main__": 
    parser = argparse.ArgumentParser() 
    parser.register("type", "bool", lambda v: v.lower() == "true") 
    # Flags for defining the tf.train.ClusterSpec 
    parser.add_argument(
     "--ps_hosts", 
     type=str, 
     default="", 
     help="Comma-separated list of hostname:port pairs" 
) 
    parser.add_argument(
     "--worker_hosts", 
     type=str, 
     default="", 
     help="Comma-separated list of hostname:port pairs" 
) 
    parser.add_argument(
     "--job_name", 
     type=str, 
     default="", 
     help="One of 'ps', 'worker'" 
) 
    # Flags for defining the tf.train.Server 
    parser.add_argument(
     "--task_index", 
     type=int, 
     default=0, 
     help="Index of task within the job" 
) 
    FLAGS, unparsed = parser.parse_known_args() 
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) 

Voici les commandes shell:

$ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = ps--task_index = 0# On ps1.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = ps--task_index = 1# On worker0.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = worker--task_index = 0# On worker1.example.com: 
    $ python trainer.py\ 
    --ps_hosts = ps0.example.com: 2222, ps1.example.com: 2222\ 
    --worker_hosts = worker0.example.com: 2222, worker1.example.com: 2222\ 
    --job_name = worker--task_index = 1 

Répondre

1

Selon ma compréhension, le travail ps contient toutes les données partagées entre les différentes tâches, peut fonctionner sur des machines différentes (et tous partagent le même travail ps).

+0

Cela fait attendre le sens du fait qu'il ya deux ps tâches dans le code. Pourquoi devrait-il y en avoir deux? Que se passerait-il par exemple si nous utilisions le même code mais éliminions l'une des tâches ps? Si les tâches ps sont là juste pour la synchronisation, je ne vois pas pourquoi nous en avons besoin de deux. – moxox

2

Voici un diagramme schématique de la situation. Vous avez 4 processus tensorflow. Chaque processus exécute le thread de travail TensorFlow qui peut exécuter des calculs TensorFlow. En outre, deux des processus exécutent également un thread client qui émet des demandes session.run.

Chaque processus de travail est également un « appareil » dans tensorflow en vue de l'exécution du graphe de séparation par rapport aux dispositifs. Vous pouvez indiquer à TF runtime d'exécuter une partie du graphique sur le périphérique worker1 en faisant quelque chose comme with tf.device("job:worker/task:0"): pendant la construction du graphe.

Il se passe de la magie dans tf.train.replica_device_setter qui a lieu avec les annotations manuelles with tf.device et qui a pour effet d'assigner automatiquement des variables à travers les dispositifs. Plus précisément, lorsque vous avez deux fragments PS, la moitié des variables ira sur le dispositif ps1 et l'autre moitié sur le dispositif ps2. Pendant ce temps, la partie du graphique qui met à jour ces variables sera répliquée sur chaque unité de travail.

Si vous remplacez replica_device_setter avec les spécifications de l'appareil manuel, votre processus de travail serait à peu près ressembler à ceci

with tf.device('ps1'): 
    var1 = tf.Variable(...) 
with tf.device('ps2'): 
    var2 = tf.Variable(...) 
with tf.device('worker1'): 
    update_op1 = var1.assign_add(grad1) 
    update_op2 = var2.assign_add(grad2) 

while True: 
    sess.run([update_op1, update_op2]) 

La communication est automatiquement pris en charge. Lorsque vous exécutez sess.run(update_op1) dans le thread client worker1, il calcule grad1 sur worker1, puis envoyer le résultat à la tâche ps1 et déclencher thread de travail ps1 mettre à jour sa valeur de var1