J'ai écrit un programme distribué TensorFlow avec 1 travail ps et 2 travaux de travail. Je m'attendais à ce que les lots de données soient distribués parmi les travailleurs, mais cela ne semble pas être le cas. Je vois qu'un seul ouvrier (tâche = 0) fait tout le travail tandis que l'autre est inactif. Pourriez-vous s'il vous plaît me aider à trouver le problème avec ce programme:Les données ne sont pas distribuées parmi les travailleurs Tensorflow
import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("master_hosts", "oser502110:2222",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
"Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
"Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
tf.app.flags.DEFINE_string("worker_grpc_url", None,
"Worker GRPC URL")
FLAGS = tf.app.flags.FLAGS
IMAGE_PIXELS = 28
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
master_hosts = FLAGS.master_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
is_chief = (FLAGS.task_index == 0)
if is_chief: tf.reset_default_graph()
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0/IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal([FLAGS.hidden_units, 10],
stddev=1.0/math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
global_step = tf.Variable(0, trainable=False)
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
saver = tf.train.Saver()
#summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=is_chief,
logdir="/tmp/train_logs",
init_op=init_op,
recovery_wait_secs=1,
saver=saver,
global_step=global_step,
save_model_secs=600)
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True,
device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
# The supervisor takes care of session initialization, restoring from
# a checkpoint, and closing when done or an error occurs.
with sv.prepare_or_wait_for_session(server.target, config=sess_config) as sess:
print("Worker %d: Session initialization complete." % FLAGS.task_index)
# Loop until the supervisor shuts down or 1000000 steps have completed.
step = 0
while not sv.should_stop() and step < 1000000:
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
print("FETCHING NEXT BATCH %d" % FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_op, global_step], feed_dict=train_feed)
if step % 100 == 0:
print("Done step %d" % step)
# Ask for all the services to stop.
sv.stop()
if __name__ == "__main__":
tf.app.run()
et voici les journaux des travailleurs à la tâche = 0:
20/06/2017 04: 50: 58,405431: Je tensorflow/core/common_runtime/simple_placer.cc: 841] Adagrad/valeur: (Const)/travail: ps/réplique: 0/tâche: 0/cpu: 0 truncated_normal/stddev: (Const):/travail: worker/réplique: 0/tâche: 0/gpu: 0
2017-06-20 04: 50: 58.405456: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/stddev: (Const)/travail: travailleur/réplique: 0/tâche : 0/gpu: 0 truncated_normal/mean: (Const):/du travail: worker/replica: 0/task: 0/gpu: 0
2017-06-20 04: 50: 58.405481: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/mean: (Const)/travail: worker/réplique: 0/task: 0/gpu: 0 truncated_normal/forme: (Const):/travail: travailleur/réplique: 0/tâche: 0/gpu: 0
2017-06-20 04: 50: 58.405506: I tensorflow/core/common_runtime/simple_placer.cc: 841] truncated_normal/shape: (Const)/job: worker/réplique: 0/task: 0/gpu: 0 Worker 0: initialisation de la session terminée.
LOT 500 FETCH portant PROCHAIN
LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH LOT 500 PROCHAINE instruction FETCH Fait étape 408800 ... ...
mais de travailleur 2 (task = 1) les journaux ressemble à:
2017-06-20 04: 51: 07.288600: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] zéros: (Const)/travail: travailleur/réplique: 0/tâche : 1/gpu: 0 Adagrad/valeur: (Const):/travail: ps/réplique: 0/tâche: 0/cpu: 0
2017-06-20 04: 51: 07.288614: Je tensorflow/core/common_runtime/simple_placer.cc: 841] Adagrad/valeur: (Const)/travail: ps/réplique: 0/tâche: 0/cpu: 0 truncated_normal/stddev: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0
2017-06-20 04: 51: 07.288639: I tensorflow/core/common_runtime/simple_placer.cc: 841] truncated_normal/stddev: (Const)/travail: travailleur/réplique: 0/tâche: 1/gpu: 0 troncature_normale/moyenne: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0
2017-06 -20 04: 51: 07.288664: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/mean: (Const)/travail: worker/replica: 0/tâche: 1/gpu: 0 truncated_normal/forme: (Const):/travail: travailleur/réplique: 0/tâche: 1/gpu: 0 2017-06-20 04: 51: 07.288689: I tensorflow/noyau/common_runtime/simple_placer.cc: 841] truncated_normal/forme: (Const)/travail: travailleur/réplique: 0/tâche: 1/gpu: 0
Je m'attendais à des journaux similaires des deux travailleurs. S'il vous plaît, aidez-moi à comprendre cela. Dans l'attente de votre aide.