2017-09-15 2 views
0

J'ai une application qui tourne autour de six mille urls.Pour minimiser ce travail, j'ai créé une RecursiveTask qui consomme un ConcurrentLinkedQueue de toutes les URL à explorer. Il se divise jusqu'à 50 off et si le vide est Qué elle directement, mais explore sinon d'abord crée une nouvelle instance de lui-même et fourches, après qu'il explore le sous-ensemble de 50 et après qu'il se joindra à la tâche en forme de fourche.ForkJoinFramework utilise seulement deux travailleurs

Maintenant vient mon problème, jusqu'à ce que chaque fil a travaillé de son 50 tous les quatre anf rapide travail en même temps. Mais après deux arrêter de travailler et attendre la jointure et seulement les deux autres travaillent et créer de nouvelles fourches et des pages d'exploration.

Pour visualiser cela, je compte le nombre des URL Mouch un thread et laisser un rampe IUG JavaFX le montrer.

Que dois-je tort de sorte que le ForkJoinFramewok utilise seulement deux de mes quatre fils permis? Que puis-je faire pour le changer?

Voici ma méthode de calcul de la tâche:

LOG.debug(
     Thread.currentThread().getId() + " Starting new Task with " 
      + urlsToCrawl.size() + " left." 
    ); 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++) 
    { 
     urlsToCrawlSubset.offer(urlsToCrawl.poll()); 
    } 
    LOG.debug(
     Thread.currentThread().getId() + " Crated a Subset with " 
     + urlsToCrawlSubset.size() + "." 
    ); 
    LOG.debug(
     Thread.currentThread().getId() 
     + " Now the Urls to crawl only left " + urlsToCrawl.size() + "." 
    ); 

    if (urlsToCrawl.isEmpty()) 
    { 
     LOG.debug(Thread.currentThread().getId() + " Crawling the subset."); 
     crawlPage(urlsToCrawlSubset); 
    } 
    else 
    { 
     LOG.debug(
      Thread.currentThread().getId() 
       + " Creating a new Task and crawling the subset." 
     ); 
     final AbstractUrlTask<T, D> otherTask = createNewOwnInstance(); 
     otherTask.fork(); 
     crawlPage(urlsToCrawlSubset); 
     taskResults.addAll(otherTask.join()); 
    } 
    return taskResults; 

Et voici un aperçu de mon schéma: enter image description here

P.S. Si j'autorise jusqu'à 80 threads, nous les utiliserons jusqu'à ce que chacun ait 50 URL explorées et en utilise seulement deux.

Et si vous êtes intéressé, voici le code source complet: https://github.com/mediathekview/MServer/tree/feature/cleanup

+1

Etes-vous sûr qu'il est juste d'appeler otherTask.join() là? – algrid

+1

Je ne peux pas traverser la montagne de code dans github. Si vous voulez de l'aide, créez un exemple sscc. http://sscce.org/ Notez également que join() stands jusqu'à 50% des fils comme indiqué ici: http://coopsoft.com/ar/Calamity2Article.html#join – edharned

+0

Pouvez-vous montrer le code qui soumet des tâches à la piscine? –

Répondre

0

Je l'ai fixé. Mon erreur était, que j'ai divisé puis travaillé une petite protion et qu'attendu au lieu de le diviser en deux, puis appelez-moi encore avec le reste de l'autre moitié, etc

En d'autres termes avant que je me suis séparé et travaillé directement mais correct est de se séparer jusqu'à ce que tout soit divisé et ensuite commencer à travailler.

Voici mon code à quoi il ressemble maintenant:

@Override 
protected Set<T> compute() 
{ 
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask()) 
    { 
     crawlPage(urlsToCrawl); 
    } 
    else 
    { 
     final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl)); 
     final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl); 
     leftTask.fork(); 
     taskResults.addAll(rightTask.compute()); 
     taskResults.addAll(leftTask.join()); 
    } 
    return taskResults; 
} 

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue) 
{ 
    final int halfSize = aBaseQueue.size()/2; 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < halfSize; i++) 
    { 
     urlsToCrawlSubset.offer(aBaseQueue.poll()); 
    } 
    return urlsToCrawlSubset; 
}