2017-06-20 1 views
0

J'ai écrit un programme distribué TensorFlow avec 1 travail ps et 2 travaux de travail. Je m'attendais à ce que les lots de données soient distribués parmi les travailleurs, mais cela ne semble pas être le cas. Je vois qu'un seul ouvrier (tâche = 0) fait tout le travail tandis que l'autre est inactif. Pourriez-vous s'il vous plaît me aider à trouver le problème avec ce programme:Les données ne sont pas distribuées parmi les travailleurs Tensorflow

import math 
    import tensorflow as tf 
    from tensorflow.examples.tutorials.mnist import input_data 

    # Flags for defining the tf.train.ClusterSpec 
    tf.app.flags.DEFINE_string("ps_hosts", "", 
          "Comma-separated list of hostname:port pairs") 
    tf.app.flags.DEFINE_string("worker_hosts", "", 
          "Comma-separated list of hostname:port pairs") 
    tf.app.flags.DEFINE_string("master_hosts", "oser502110:2222", 
          "Comma-separated list of hostname:port pairs") 

    # Flags for defining the tf.train.Server 
    tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") 
    tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
    tf.app.flags.DEFINE_integer("hidden_units", 100, 
          "Number of units in the hidden layer of the NN") 
    tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data", 
          "Directory for storing mnist data") 
    tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size") 

    tf.app.flags.DEFINE_string("worker_grpc_url", None, 
        "Worker GRPC URL") 

    FLAGS = tf.app.flags.FLAGS 

    IMAGE_PIXELS = 28 

def main(_): 
    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 
    master_hosts = FLAGS.master_hosts.split(",") 


    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    # Create and start a server for the local task. 
    server = tf.train.Server(cluster, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    if FLAGS.job_name == "ps": 
     server.join() 
    elif FLAGS.job_name == "worker": 

     is_chief = (FLAGS.task_index == 0) 
     if is_chief: tf.reset_default_graph() 


# Assigns ops to the local worker by default. 
     with tf.device(tf.train.replica_device_setter(
       worker_device="/job:worker/task:%d" % FLAGS.task_index, 
       cluster=cluster)): 

      # Variables of the hidden layer 
      hid_w = tf.Variable(
       tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], 
            stddev=1.0/IMAGE_PIXELS), name="hid_w") 
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b") 

      # Variables of the softmax layer 
      sm_w = tf.Variable(
       tf.truncated_normal([FLAGS.hidden_units, 10], 
            stddev=1.0/math.sqrt(FLAGS.hidden_units)), 
       name="sm_w") 
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b") 

      x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS]) 
      y_ = tf.placeholder(tf.float32, [None, 10]) 

      hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) 
      hid = tf.nn.relu(hid_lin) 

      y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b)) 
      loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) 

      global_step = tf.Variable(0, trainable=False) 

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
       loss, global_step=global_step) 

      saver = tf.train.Saver() 
      #summary_op = tf.merge_all_summaries() 
      init_op = tf.initialize_all_variables() 

     # Create a "supervisor", which oversees the training process. 
     sv = tf.train.Supervisor(is_chief=is_chief, 
           logdir="/tmp/train_logs", 
           init_op=init_op, 
           recovery_wait_secs=1, 
           saver=saver, 
           global_step=global_step, 
           save_model_secs=600) 
     if is_chief: 
      print("Worker %d: Initializing session..." % FLAGS.task_index) 
     else: 
      print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index) 

     mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) 

     sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True, 
            device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index]) 


     # The supervisor takes care of session initialization, restoring from 
     # a checkpoint, and closing when done or an error occurs. 
     with sv.prepare_or_wait_for_session(server.target, config=sess_config) as sess: 
      print("Worker %d: Session initialization complete." % FLAGS.task_index) 
      # Loop until the supervisor shuts down or 1000000 steps have completed. 
      step = 0 
      while not sv.should_stop() and step < 1000000: 
       # Run a training step asynchronously. 
       # See `tf.train.SyncReplicasOptimizer` for additional details on how to 
       # perform *synchronous* training. 

       batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size) 
       print("FETCHING NEXT BATCH %d" % FLAGS.batch_size) 
       train_feed = {x: batch_xs, y_: batch_ys} 

       _, step = sess.run([train_op, global_step], feed_dict=train_feed) 
       if step % 100 == 0: 
        print("Done step %d" % step) 

     # Ask for all the services to stop. 
     sv.stop() 

if __name__ == "__main__": 
    tf.app.run() 

et voici les journaux des travailleurs à la tâche = 0:

20/06/2017 04: 50: 58,405431: Je tensorflow/core/common_runtime/simple_placer.cc: 841] Adagrad/valeur: (Const)/travail: ps/réplique: 0/tâche: 0/cpu: 0 truncated_normal/stddev: (Const):/travail: worker/réplique: 0/tâche: 0/gpu: 0
2017-06-20 04: 50: 58.405456: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/stddev: (Const)/travail: travailleur/réplique: 0/tâche : 0/gpu: 0 truncated_normal/mean: (Const):/du travail: worker/replica: 0/task: 0/gpu: 0
2017-06-20 04: 50: 58.405481: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/mean: (Const)/travail: worker/réplique: 0/task: 0/gpu: 0 truncated_normal/forme: (Const):/travail: travailleur/réplique: 0/tâche: 0/gpu: 0
2017-06-20 04: 50: 58.405506: I tensorflow/core/common_runtime/simple_placer.cc: 841] truncated_normal/shape: (Const)/job: worker/réplique: 0/task: 0/gpu: 0 Worker 0: initialisation de la session terminée.
LOT 500 FETCH portant PROCHAIN ​​
LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH Fait étape 408800 ... ...

mais de travailleur 2 (task = 1) les journaux ressemble à:

2017-06-20 04: 51: 07.288600: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] zéros: (Const)/travail: travailleur/réplique: 0/tâche : 1/gpu: 0 Adagrad/valeur: (Const):/travail: ps/réplique: 0/tâche: 0/cpu: 0
2017-06-20 04: 51: 07.288614: Je tensorflow/core/common_runtime/simple_placer.cc: 841] Adagrad/valeur: (Const)/travail: ps/réplique: 0/tâche: 0/cpu: 0 truncated_normal/stddev: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0
2017-06-20 04: 51: 07.288639: I tensorflow/core/common_runtime/simple_placer.cc: 841] truncated_normal/stddev: (Const)/travail: travailleur/réplique: 0/tâche: 1/gpu: 0 troncature_normale/moyenne: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0
2017-06 -20 04: 51: 07.288664: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/mean: (Const)/travail: worker/replica: 0/tâche: 1/gpu: 0 truncated_normal/forme: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0 2017-06-20 04: 51: 07.288689: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/forme: (Const)/travail: travailleur/réplique: 0/tâche: 1/gpu: 0

Je m'attendais à des journaux similaires des deux travailleurs. S'il vous plaît, aidez-moi à comprendre cela. Dans l'attente de votre aide.

Répondre

0

Vous devez séparer manuellement les données pour chaque travailleur.

# Get the subset of data for this worker 

mnist = input_data.read_data_sets('/tmp/mnist_temp', one_hot=True) 
num_old = mnist.train._num_examples 
ids = list(range(task_index, mnist.train._num_examples, num_workers)) 
mnist.train._images = mnist.train._images[ids,] 
mnist.train._labels = mnist.train._labels[ids,] 
mnist.train._num_examples = mnist.train._images.shape[0] 

print("subset of training examples ", mnist.train._num_examples,"/",num_old)