2017-07-24 3 views
1

Je commence tout juste à apprendre le Hazelcast Jet. Ma source est des datagrammes UDP. Je veux le traiter en parallèle à certains nœuds de Jet et les renvoyer à d'autres adresses par 'domaine'. Je veux utiliser Hazelcast IMDG IMap avec loader pour obtenir 'domain' par 'source ip'.Utilisez Hazelcast IMap à Hazelcast Jet Processor

DAG dag = new DAG();   
Vertex source = dag.newVertex("datagram-source", 
       UdpSocketP.supplier("0.0.0.0", 41813)); 
     source.localParallelism(1); 

     Vertex mapper = dag.newVertex("map", 
       map(new DomainMapper(instance.getMap("mysqlNas")))); 

     Vertex sink = dag.newVertex("sink", 
       Sinks.writeFile("logs")); 
     sink.localParallelism(1); 

Mais quand je tente d'utiliser IMap à DistributedFunction i get exception

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable 
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185) 
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101) 
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78) 
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79) 
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46) 
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator 

Code DomainMapper:

package org.eltex.softwlc.sorm.replicator; 

import com.hazelcast.core.IMap; 
import com.hazelcast.jet.function.DistributedFunction; 

import java.io.Serializable; 
import java.net.DatagramPacket; 

/** 
* Created by mickey on 21.07.17. 
*/ 
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable { 

    private final IMap<String, NasValue> map; 

    public DomainMapper(IMap<String, NasValue> map) { 
     this.map = map; 
    } 

    @Override 
    public IpData apply(DatagramPacket datagramPacket) { 
     final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress()); 
     System.out.println(d); 

     final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress()); 
     if (nasValue!=null) { 
      d.setDomain(nasValue.getDomain()); 
     } 

     return d; 
    } 
} 

Quelle est mon erreur? Or Hazelcast Jet est mauvais choix pour mon but.

Répondre

2

Le problème est que vous essayez de sérialiser la totalité de IMap à l'intérieur de la fonction. Une solution directe serait d'écrire un processeur personnalisé qui accède à l'instance de Hazelcast Jet à l'intérieur de sa méthode init() et recherche son IMap à partir de cela. Puisque le code init() est exécuté sur le membre cible, après toute désérialisation, cela fonctionnerait.

Cependant, d'un point de vue plus général, votre objectif semble être du type "enrichissement de données". La façon dont nous voulons supporter cela dans Jet est via une opération de "hash join", qui n'est actuellement pas de première classe; Cependant, il existe un exemple de code montrant l'approche. Vous pouvez soit canaliser tout le contenu IMap vers un sommet qui le transformera en un HashMap ordinaire et le distribuer à tous les processeurs enrichissants, ou vous pouvez préparer une Hazelcast ReplicatedMap qui sera utilisée directement par le processeur d'enrichissement.

La première approche signifie que vous travaillez sur un instantané du IMap; Dans la seconde, vous pouvez continuer à mettre à jour le ReplicatedMap lorsque le travail est en cours d'exécution. Il est préférable d'aller vérifier les échantillons: HashMapEnrichment et ReplicatedMapEnrichment.

+0

J'ai implémenté 'DomainMapper extends AbstractProcessor' et remplace 'tryProcess' pour cela. dag.newVertex ("map", DomainMapper.supplier()); fonctionne correctement. – MGaidamak

+1

Prenez garde que 'IMap.get()' soit un appel bloquant (pouvant communiquer avec des membres distants), donc en général l'appeler dans un processeur coopératif casserait sa coopération. Vous pouvez déclarer le processeur non coopératif, mais les performances en souffriront encore (contrairement aux approches que je mentionne dans la réponse). –