2016-12-23 1 views
8

Je suis novice dans le domaine du tensorflow distribué et je recherche un bon exemple pour effectuer un entraînement synchrone sur les processeurs.Tensorflow distribué: bon exemple pour l'entraînement synchrone sur les processeurs

J'ai déjà essayé le Distributed Tensorflow Example et il peut effectuer la formation asynchrone avec succès sur 1 serveur de paramètres (1 machine avec 1 CPU) et 3 travailleurs (chaque travailleur = 1 machine avec 1 CPU). Cependant, quand il s'agit de l'entraînement synchrone, je ne suis pas capable de l'exécuter correctement, bien que j'ai suivi le tutoriel de SyncReplicasOptimizer(V1.0 and V2.0).

J'ai inséré le code officiel SyncReplicasOptimizer dans l'exemple d'apprentissage asynchrone de travail, mais le processus de formation est toujours asynchrone. Mon code détaillé est le suivant. Tout code se rapporte à la formation synchrone est dans le bloc de ******.

import tensorflow as tf 
import sys 
import time 

# cluster specification ---------------------------------------------------------------------- 
parameter_servers = ["xx1.edu:2222"] 
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"] 
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers}) 

# input flags 
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") 
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
FLAGS = tf.app.flags.FLAGS 

# start a server for a specific task 
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) 

# Parameters ---------------------------------------------------------------------- 
N = 3 # number of replicas 
learning_rate = 0.001 
training_epochs = int(21/N) 
batch_size = 100 

# Network Parameters 
n_input = 784 # MNIST data input (img shape: 28*28) 
n_hidden_1 = 256 # 1st layer number of features 
n_hidden_2 = 256 # 2nd layer number of features 
n_classes = 10 # MNIST total classes (0-9 digits) 

if FLAGS.job_name == "ps": 
    server.join() 
    print("--- Parameter Server Ready ---") 
elif FLAGS.job_name == "worker": 
    # Import MNIST data 
    from tensorflow.examples.tutorials.mnist import input_data 
    mnist = input_data.read_data_sets("/tmp/data/", one_hot=True) 
    # Between-graph replication 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 
     # count the number of updates 
     global_step = tf.get_variable('global_step', [], 
             initializer = tf.constant_initializer(0), 
             trainable = False, 
             dtype = tf.int32) 
     # tf Graph input 
     x = tf.placeholder("float", [None, n_input]) 
     y = tf.placeholder("float", [None, n_classes]) 

     # Create model 
     def multilayer_perceptron(x, weights, biases): 
      # Hidden layer with RELU activation 
      layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1']) 
      layer_1 = tf.nn.relu(layer_1) 
      # Hidden layer with RELU activation 
      layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2']) 
      layer_2 = tf.nn.relu(layer_2) 
      # Output layer with linear activation 
      out_layer = tf.matmul(layer_2, weights['out']) + biases['out'] 
      return out_layer 

     # Store layers weight & bias 
     weights = { 
      'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])), 
      'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes])) 
     } 
     biases = { 
      'b1': tf.Variable(tf.random_normal([n_hidden_1])), 
      'b2': tf.Variable(tf.random_normal([n_hidden_2])), 
      'out': tf.Variable(tf.random_normal([n_classes])) 
     } 

     # Construct model 
     pred = multilayer_perceptron(x, weights, biases) 

     # Define loss and optimizer 
     cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y)) 

     # ************************* SyncReplicasOpt Version 1.0 ***************************************************** 
     ''' This optimizer collects gradients from all replicas, "summing" them, 
     then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. ''' 
     # Create any optimizer to update the variables, say a simple SGD 
     opt = tf.train.AdamOptimizer(learning_rate=learning_rate) 

     # Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables. 
     opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N, 
             replica_id=FLAGS.task_index, total_num_replicas=N) 

     # Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally 
     train = opt.minimize(cost, global_step=global_step) 

     # You can now call get_init_tokens_op() and get_chief_queue_runner(). 
     # Note that get_init_tokens_op() must be called before creating session 
     # because it modifies the graph. 
     init_token_op = opt.get_init_tokens_op() 
     chief_queue_runner = opt.get_chief_queue_runner() 
     # ************************************************************************************** 

     # Test model 
     correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1)) 
     accuracy = tf.reduce_mean(tf.cast(correct, "float")) 

     # Initializing the variables 
     init_op = tf.initialize_all_variables() 
     print("---Variables initialized---") 

    # ************************************************************************************** 
    is_chief = (FLAGS.task_index == 0) 
    # Create a "supervisor", which oversees the training process. 
    sv = tf.train.Supervisor(is_chief=is_chief, 
          logdir="/tmp/train_logs", 
          init_op=init_op, 
          global_step=global_step, 
          save_model_secs=600) 
    # ************************************************************************************** 

    with sv.prepare_or_wait_for_session(server.target) as sess: 
     # **************************************************************************************   
     # After the session is created by the Supervisor and before the main while loop: 
     if is_chief: 
      sv.start_queue_runners(sess, [chief_queue_runner]) 
      # Insert initial tokens to the queue. 
      sess.run(init_token_op) 
     # ************************************************************************************** 
     # Statistics 
     net_train_t = 0 
     # Training 
     for epoch in range(training_epochs): 
      total_batch = int(mnist.train.num_examples/batch_size) 
      # Loop over all batches 
      for i in range(total_batch): 
       batch_x, batch_y = mnist.train.next_batch(batch_size) 
       # ======== net training time ======== 
       begin_t = time.time() 
       sess.run(train, feed_dict={x: batch_x, y: batch_y}) 
       end_t = time.time() 
       net_train_t += (end_t - begin_t) 
       # =================================== 
      # Calculate training accuracy 
      # acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels}) 
      # print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc) 
      print("Epoch:", '%04d' % (epoch+1)) 
     print("Training Finished!") 
     print("Net Training Time: ", net_train_t, "second") 
     # Testing 
     print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels})) 

    sv.stop() 
    print("done") 

Quelque chose ne va pas avec mon code? Ou puis-je avoir un bon exemple à suivre?

+0

Le code ressemble superficiellement correct, mais l'interface 'tf.train.SyncReplicasOptimizer' est assez compliquée, donc il pourrait encore y avoir un bug. Quand vous dites «le processus de formation est encore asynchrone», comment avez-vous observé cela? – mrry

+0

Merci pour la réponse, @mrry. Dans la formation syn idéale, nous nous attendons à voir "Epoque #i" imprimé à peu près à la même heure sur tous les travailleurs, mais ce que j'ai observé est: "Epoch 1" sur le travailleur 0 - (3 min plus tard) -> " Epoque 1 "sur travailleur 1 - (3 min plus tard) ->" Epoque 1 "sur travailleur 2 - (3 min plus tard) ->" Epoque 2 "sur travailleur 0 - (3 min plus tard) -> "Epoch 2" sur le travailleur 1 - (3 min plus tard) -> "Epoch 2" sur le travailleur 2 - (3 min plus tard) -> "Epoch 3" sur le travailleur 0 .... boucle jusqu'à la fin. Alors, que se passe-t-il exactement dans le syn-training tensorflow? Pourquoi y a-t-il une formation d'époque ordonnée? –

+0

Je suis également curieux à ce sujet. Je me demande si parfois un processeur peut prendre du retard et il agrège deux lots d'un processeur et laisse l'un des autres processeurs prendre du retard. – Aaron

Répondre

0

Je ne suis pas sûr si vous seriez intéressé par tensorflow distribuée transparente à l'utilisateur qui utilise MPI dans le backend. Nous avons récemment développé une telle version avec MaTEx: https://github.com/matex-org/matex. Par conséquent, pour TensorFlow distribué, vous n'avez pas besoin d'écrire un code SyncReplicaOptimizer, puisque toutes les modifications sont extraites de l'utilisateur.

Espérons que cela aide.

0

Je pense que votre question peut être répondue comme les commentaires dans le numéro #9596 du tensorflow. Ce problème est dû aux bogues de la nouvelle version de tf.train.SyncReplicasOptimizer(). Vous pouvez utiliser l'ancienne version de cette API pour éviter ce problème. Une autre solution vient du Tensorflow Distributed Benchmarks. Regardez le code source, et vous pouvez trouver qu'ils synchronisent manuellement les travailleurs dans la file d'attente dans le tensorflow. Grâce à des expériences, ce benchmark fonctionne exactement comme vous le souhaitez.

Espérons que ces commentaires et ressources peuvent vous aider à résoudre votre problème. Merci!

0

Un problème est que vous devez spécifier un aggregation_method dans la méthode pour minimiser pour exécuter de manière synchrone,

train = opt.minimize(cost, global_step=global_step, aggregation_method=tf.AggregationMethod.ADD_N)