1

J'ai essayé d'implémenter une première recherche de profondeur distribuée dans C#. J'ai réussi jusqu'à un certain point mais j'ai une erreur de synchronisation. Je ne suis pas capable de rectifier l'erreur. Ce que j'essaye de faire est de faire communiquer chaque noeud entre eux en utilisant un flux de données parallèle à la tâche et ainsi atteindre le parallélisme dans DFS. Ci-dessous mon code:Threading dans C# pour construire un DFS distribué

public class DFS 
{ 
static List<string> traversedList = new List<string>(); 

static List<string> parentList = new List<string>(); 
static Thread[] thread_array; 
static BufferBlock<Object> buffer1 = new BufferBlock<Object>(); 

public static void Main(string[] args) 
{ 

    int N = 100; 
    int M = N * 4; 
    int P = N * 16; 

    Stopwatch stopwatch = new Stopwatch(); 
    stopwatch.Start(); 

    List<string> global_list = new List<string>(); 

    StreamReader file = new StreamReader(args[args.Length - 2]); 


    string text = file.ReadToEnd(); 

    string[] lines = text.Split('\n'); 



    string[][] array1 = new string[lines.Length][]; 

    for (int i = 0; i < lines.Length; i++) 
    { 
     lines[i] = lines[i].Trim(); 
     string[] words = lines[i].Split(' '); 

     array1[i] = new string[words.Length]; 

     for (int j = 0; j < words.Length; j++) 
     { 
      array1[i][j] = words[j]; 
     } 
    } 

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt"); 

    for (int i = 0; i < array1.Length; i++) 
    { 
     for (int j = 0; j < array1[i].Length; j++) 
     { 
      if (j != 0) 
      { 
       sr.Write(array1[i][0] + ":" + array1[i][j]); 
       Console.WriteLine(array1[i][0] + ":" + array1[i][j]); 
       sr.Write(sr.NewLine); 
      } 
     } 

    } 
    int start_no = Convert.ToInt32(args[args.Length - 1]); 
    thread_array = new Thread[lines.Length]; 
    string first_message = "root"; 
    buffer1.Post(first_message); 
    buffer1.Post(array1); 
    buffer1.Post(start_no); 
    buffer1.Post(1); 

    for (int t = 1; t < lines.Length; t++) 
    { 
     Console.WriteLine("thread" + t); 
     thread_array[t] = new Thread(new ThreadStart(thread_run)); 
     thread_array[t].Name = t.ToString(); 
     lock (thread_array[t]) 
     { 
      Console.WriteLine("working"); 
      thread_array[t].Start(); 
      thread_array[t].Join(); 
     } 

    } 
    stopwatch.Stop(); 

    Console.WriteLine(stopwatch.Elapsed); 
    Console.ReadLine(); 
} 

private static void dfs(string[][] array, int point) 
{ 
    for (int z = 1; z < array[point].Length; z++) 
    { 
     if ((!traversedList.Contains(array[point][z]))) 
     { 
      traversedList.Add(array[point][z]); 
      parentList.Add(point.ToString()); 
      dfs(array, int.Parse(array[point][z])); 
     } 

    } 
    return; 


} 
public static void thread_run() 
{ 
    try 
    { 
     string parent; 
     string[][] array1; 
     int point; 
     int id; 
     parent = (string)buffer1.Receive(); 
     array1 = (string[][])buffer1.Receive(); 
     point = (int)buffer1.Receive(); 
     id = (int)buffer1.Receive(); 
     object value; 
     Console.WriteLine("times"); 

     if (Thread.CurrentThread.Name.Equals(point.ToString())) 
     { 
      if (!traversedList.Contains(point.ToString())) 
      { 
       Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id); 
       traversedList.Add(point.ToString()); 
       parent = point.ToString(); 
       for (int x = 1; x < array1[point].Length; x++) 
       { 
        Console.WriteLine("times"); 
        if (buffer1.TryReceive(out value)) 
        { 
         array1 = (string[][])value; 
        } 
        if (buffer1.TryReceive(out value)) 
        { 
         id = (int)buffer1.Receive(); 
        } 
        id++; 
        buffer1.Post(parent); 
        buffer1.Post(array1); 
        buffer1.Post(x); 
        buffer1.Post(id); 
        Console.WriteLine("times"); 
        Monitor.PulseAll(Thread.CurrentThread); 
       } 

       //return; 
      } 
      else 
      { 
       buffer1.Post(parent); 
       buffer1.Post(array1); 
       buffer1.Post(point); 
       buffer1.Post(id); 
       Console.WriteLine("working 1"); 
       Monitor.PulseAll(Thread.CurrentThread); 
      } 
     } 
     else 
     { 
      Console.WriteLine("working 2"); 
      Monitor.Wait(Thread.CurrentThread); 
     } 
     //Console.WriteLine(parent); 
    } 
    catch (Exception ex) 
    { 
     Console.WriteLine(ex.Message); 
    } 

} 

} 

enter image description here

+1

comment est-ce différent de la question que vous avez posée il ya 3 jours @ http://stackoverflow.com/questions/10852317/depth-first-search-in-a-distributed-way? –

+0

@JamesManning: S'il vous plaît ne pas tat j'ai fait une mise en œuvre séquentielle dans la dernière question mais j'ai trouvé un moyen pour l'implémentation distribuée (threads utilisés) et ici je suis coincé avec cette erreur.En java c'est plus facile car nous utilisons essentiellement un mot-clé synchronisé mais en C# je ne trouve rien de facile. –

+0

De quelle ligne de code provient cette erreur? – Faraday

Répondre

1

Le problème est que le fil doit posséder moniteur afin d'appel en attente. Donc, vous devez verrouiller Monitor.PulseAll ainsi que Monitor.Wait afin de vous assurer que vous n'obtenez plus d'erreurs comme celle-ci.

Si vous avez besoin que je vous explique le verrouillage, ouvrez une autre question et je l'expliquerai en entier! :)

+0

Comment est-ce que je fais ça? pourriez-vous me donner un exemple de code? –

0

EDIT: Ignorer mon post - Lire le post de la place @PanagiotisKanavos ...

Cela ne compile pas, mais vous mettrez dans la bonne direction pour l'utilisation de serrures:

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.IO; 
using System.Threading; 

public class DFS 
{ 
    static List<string> traversedList = new List<string>(); 

    static List<string> parentList = new List<string>(); 
    static Thread[] thread_array; 
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>(); 

    public static void Main(string[] args) 
    { 

     int N = 100; 
     int M = N * 4; 
     int P = N * 16; 

     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     List<string> global_list = new List<string>(); 

     StreamReader file = new StreamReader(args[args.Length - 2]); 


     string text = file.ReadToEnd(); 

     string[] lines = text.Split('\n'); 



     string[][] array1 = new string[lines.Length][]; 

     for (int i = 0; i < lines.Length; i++) 
     { 
      lines[i] = lines[i].Trim(); 
      string[] words = lines[i].Split(' '); 

      array1[i] = new string[words.Length]; 

      for (int j = 0; j < words.Length; j++) 
      { 
       array1[i][j] = words[j]; 
      } 
     } 

     StreamWriter sr = new StreamWriter("E:\\Newtext1.txt"); 

     for (int i = 0; i < array1.Length; i++) 
     { 
      for (int j = 0; j < array1[i].Length; j++) 
      { 
       if (j != 0) 
       { 
        sr.Write(array1[i][0] + ":" + array1[i][j]); 
        Console.WriteLine(array1[i][0] + ":" + array1[i][j]); 
        sr.Write(sr.NewLine); 
       } 
      } 

     } 
     int start_no = Convert.ToInt32(args[args.Length - 1]); 
     thread_array = new Thread[lines.Length]; 
     string first_message = "root"; 
     //buffer1.Post(first_message); 
     //buffer1.Post(array1); 
     //buffer1.Post(start_no); 
     //buffer1.Post(1); 

     for (int t = 1; t < lines.Length; t++) 
     { 
      Console.WriteLine("thread" + t); 
      thread_array[t] = new Thread(new ThreadStart(thread_run)); 
      thread_array[t].Name = t.ToString(); 
      lock (thread_array[t]) 
      { 
       Console.WriteLine("working"); 
       thread_array[t].Start(); 
       thread_array[t].Join(); 
      } 

     } 
     stopwatch.Stop(); 

     Console.WriteLine(stopwatch.Elapsed); 
     Console.ReadLine(); 
    } 

    private static void dfs(string[][] array, int point) 
    { 
     for (int z = 1; z < array[point].Length; z++) 
     { 
      if ((!traversedList.Contains(array[point][z]))) 
      { 
       traversedList.Add(array[point][z]); 
       parentList.Add(point.ToString()); 
       dfs(array, int.Parse(array[point][z])); 
      } 

     } 
     return; 


    } 

    bool busy; 
    private readonly object syncLock = new object(); 

    public static void thread_run() 
    { 
     try 
     { 
      string parent; 
      string[][] array1; 
      int point; 
      int id; 
      //parent = (string)buffer1.Receive(); 
      //array1 = (string[][])buffer1.Receive(); 
      //point = (int)buffer1.Receive(); 
      //id = (int)buffer1.Receive(); 
      object value; 
      Console.WriteLine("times"); 

      if (Thread.CurrentThread.Name.Equals("Point.ToString()")) 
      { 
       if (!traversedList.Contains("Point.ToString()")) 
       { 
        for (int x = 1; x < 99999; x++) 
        { 
         Console.WriteLine("times"); 
         //if (buffer1.TryReceive(out value)) 
         //{ 
         // array1 = (string[][])value; 
         //} 
         //if (buffer1.TryReceive(out value)) 
         //{ 
         // id = (int)buffer1.Receive(); 
         //} 
         //id++; 
         //buffer1.Post(parent); 
         //buffer1.Post(array1); 
         //buffer1.Post(x); 
         //buffer1.Post(id); 
         Console.WriteLine("times"); 

         lock (syncLock) 
         { 
          while (busy) 
          { 
           busy = false; 
           Monitor.PulseAll(Thread.CurrentThread); 
          } 
          busy = true; // we've got it! 
         } 


        } 

        //return; 
       } 
       else 
       { 
        //buffer1.Post(parent); 
        //buffer1.Post(array1); 
        //buffer1.Post(point); 
        //buffer1.Post(id); 
        lock (syncLock) 
        { 
         while (busy) 
         { 
          busy = false; 
          Monitor.PulseAll(Thread.CurrentThread); 
         } 
         busy = true; // we've got it! 
        } 
       } 
      } 
      else 
      { 
       Console.WriteLine("working 2"); 
       lock (syncLock) 
       { 
        while (busy) 
        { 
         Monitor.Wait(Thread.CurrentThread); 
        } 
        busy = true; // we've got it! 
       } 

      } 
      //Console.WriteLine(parent); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine(ex.Message); 
     } 

    } 

} 
+0

ce que je fais dans le code est que je fais 10 threads différents pour appeler la même fonction. L'idée ci-dessus ne semble pas fonctionner. –

+0

Si vous souhaitez utiliser Monitor, vous devez verrouiller correctement ... Peut-être que quelqu'un d'autre peut faire la lumière sur ce ... – Faraday

+0

Existe-t-il un autre travail en dehors de Monitor? –

3

Il existe divers problèmes avec votre code.

Une utilisation incorrecte du verrouillage et du "toucher" de la traverseedist à partir de plusieurs threads est le problème le plus évident. Plus important encore, votre code n'utilise pas vraiment Dataflow, il utilise BufferBlock d'une manière similaire à ConcurrentQueue ou à toute autre collection concurrente. L'ensemble du flux de données consiste à utiliser ActionBlocks au lieu de threads pour simplifier le traitement. Par défaut, un bloc d'action n'utilisera qu'un seul thread pour le traitement, mais vous pouvez spécifier autant de threads que vous le souhaitez via la classe DataflowBlockOptions. ActionBlocks ont leurs propres tampons d'entrée et de sortie, donc vous n'avez pas besoin d'ajouter des BufferBlocks supplémentaires uniquement pour la mise en mémoire tampon.

La transmission de plusieurs valeurs associées à un bloc est un autre problème, car cela peut entraîner des erreurs et rendre le code confus. Créer une structure de données pour contenir toutes les valeurs ne coûte rien.

En supposant que vous utilisez cette classe pour contenir un message de traitement:

public class PointMessage 
    { 
     public string Message { get; set; } 
     public string[][] Lines{get;set;} 
     public int Point { get; set; } 
     public int ID { get; set; } 
    } 

Vous pouvez créer un ActionBlock pour traiter ces messages comme celui-ci:

static ActionBlock<PointMessage> _block; 
... 
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded }; 
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options); 

et traiter chaque message comme celui-ci:

private static void ProcessMessage(PointMessage arg) 
    { 
     if (...) 
     { 
      ... 
      arg.ID++; 
      _block.Post(arg); 
     } 
     else 
     { 
      ... 
      _block.Post(arg); 
     } 
    } 

Si votre fonction renvoie une valeur, vous pouvez utiliser un TransformBlock au lieu d'un ActionBlock.

Je ne comprends pas ce que fait votre code, donc je ne vais pas essayer de le réécrire avec DataFlow. Si vous le nettoyez un peu, il sera plus facile d'aider.

+0

Je veux juste envoyer une notification et recevoir des confirmations de chacun des nœuds présents dans le graphique. C'est la raison pour laquelle j'utilise la bibliothèque parallèle de tâches comme modèle d'acteur. –

+0

Le fait est que vous n'utilisez aucun acteur. BufferBlock ne gère pas les messages, ActionBlock ou TransformBlock. D'ailleurs, qu'entendez-vous par acteurs? Essayez-vous de convertir chaque noeud en acteur? C'est un énorme gaspillage de ressources. Il serait plus facile d'évaluer une fonction Contient pour chaque nœud dans une boucle Parallel.For pour permettre à l'exécution de sélectionner un nombre acceptable de thred. –

+0

Mais bufferblock possède des fonctionnalités d'envoi et de réception où je m'en sers ... Ouais! l'idée de base est que chaque nœud agira comme un système indépendant et communiquera entre eux. –