2017-03-27 2 views
0

Pendant que je travaillais sur mon DAG dans Jet Noisette, je suis tombé sur un problème étrange. Pour vérifier l'erreur, j'ai descendu mon approche complètement et: il semble que les bords ne fonctionnent pas selon le tutoriel.Comment obtenir un simple DAG pour travailler dans Hazelcast Jet?

Le code ci-dessous est presque aussi simple que possible. Deux sommets (une source, un puits), un bord.

La source lit sur une carte, l'évier doit être placé sur une carte.

Le data.addEntryListener me dit correctement que la carte est remplie avec 100 listes (chacune avec 25 objets à 400 octets) par une autre application ... et puis rien. La carte se remplit, mais le dag n'interagit pas du tout.

Une idée où chercher le problème?

package be.andersch.clusterbench; 

import com.fasterxml.jackson.databind.ObjectMapper; 
import com.hazelcast.config.Config; 
import com.hazelcast.config.SerializerConfig; 
import com.hazelcast.core.EntryEvent; 
import com.hazelcast.jet.*; 
import com.hazelcast.jet.config.JetConfig; 
import com.hazelcast.jet.stream.IStreamMap; 
import com.hazelcast.map.listener.EntryAddedListener; 
import be.andersch.anotherpackage.myObject; 

import java.util.List; 
import java.util.concurrent.ExecutionException; 

import static com.hazelcast.jet.Edge.between; 
import static com.hazelcast.jet.Processors.*; 

/** 
* Created by abernard on 24.03.2017. 
*/ 
public class Analyzer { 
    private static final ObjectMapper mapper = new ObjectMapper(); 
    private static JetInstance jet; 
    private static final IStreamMap<Long, List<String>> data; 
    private static final IStreamMap<Long, List<String>> testmap; 

    static { 
     JetConfig config = new JetConfig(); 
     Config hazelConfig = config.getHazelcastConfig(); 
     hazelConfig.getGroupConfig().setName("name").setPassword("password"); 
     hazelConfig.getNetworkConfig().getInterfaces().setEnabled(true).addInterface("my_IP_range_here"); 
     hazelConfig.getSerializationConfig().getSerializerConfigs().add(
       new SerializerConfig(). 
         setTypeClass(myObject.class). 
         setImplementation(new OsamKryoSerializer())); 
     jet = Jet.newJetInstance(config); 
     data = jet.getMap("data"); 
     testmap = jet.getMap("testmap"); 
    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 

     DAG dag = new DAG(); 
     Vertex source = dag.newVertex("source", readMap("data")); 
     Vertex test = dag.newVertex("test", writeMap("testmap")); 

     dag.edge(between(source, test)); 

     jet.newJob(dag).execute()get(); 

     data.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got data: " + entryEvent.getKey() + " at " + System.currentTimeMillis() + ", Size: " + jet.getHazelcastInstance().getMap("data").size()); 
     }, true); 

     testmap.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got test: " + entryEvent.getKey() + " at " + System.currentTimeMillis()); 
     }, true); 

     Runtime.getRuntime().addShutdownHook(new Thread(() -> Jet.shutdownAll())); 
    } 
} 

Répondre

1

Le travail Jet est déjà terminé à la ligne jet.newJob(dag).execute().get(), avant même créé les auditeurs d'entrée. Cela signifie que le travail s'exécute sur une carte vide. Peut-être que votre confusion est sur la nature de ce travail: c'est un travail par lots, pas un flux de traitement infini. La version Jet 0.3 ne prend pas encore en charge le traitement de flux infini.

+0

Avez-vous aussi une suggestion pour le résoudre? J'ai un IStreamMap avec un scheduledExcetor pour faire le travail, mais c'est plutôt lent. Neil a suggéré un DAG (ce qui est logique), c'est pourquoi j'essaye cela. –

+1

Il pourrait y avoir un système de micro-lotage qui pourrait fonctionner; Dans le cas contraire, l'équipe développe activement le support du vrai traitement de flux infini. –