Je commence tout juste à apprendre le Hazelcast Jet. Ma source est des datagrammes UDP. Je veux le traiter en parallèle à certains nœuds de Jet et les renvoyer à d'autres adresses par 'domaine'. Je veux utiliser Hazelcast IMDG IMap avec loader pour obtenir 'domain' par 'source ip'.Utilisez Hazelcast IMap à Hazelcast Jet Processor
DAG dag = new DAG();
Vertex source = dag.newVertex("datagram-source",
UdpSocketP.supplier("0.0.0.0", 41813));
source.localParallelism(1);
Vertex mapper = dag.newVertex("map",
map(new DomainMapper(instance.getMap("mysqlNas"))));
Vertex sink = dag.newVertex("sink",
Sinks.writeFile("logs"));
sink.localParallelism(1);
Mais quand je tente d'utiliser IMap à DistributedFunction i get exception
Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator
Code DomainMapper:
package org.eltex.softwlc.sorm.replicator;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;
import java.io.Serializable;
import java.net.DatagramPacket;
/**
* Created by mickey on 21.07.17.
*/
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {
private final IMap<String, NasValue> map;
public DomainMapper(IMap<String, NasValue> map) {
this.map = map;
}
@Override
public IpData apply(DatagramPacket datagramPacket) {
final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
System.out.println(d);
final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
if (nasValue!=null) {
d.setDomain(nasValue.getDomain());
}
return d;
}
}
Quelle est mon erreur? Or Hazelcast Jet est mauvais choix pour mon but.
J'ai implémenté 'DomainMapper extends AbstractProcessor' et remplace 'tryProcess' pour cela. dag.newVertex ("map", DomainMapper.supplier()); fonctionne correctement. – MGaidamak
Prenez garde que 'IMap.get()' soit un appel bloquant (pouvant communiquer avec des membres distants), donc en général l'appeler dans un processeur coopératif casserait sa coopération. Vous pouvez déclarer le processeur non coopératif, mais les performances en souffriront encore (contrairement aux approches que je mentionne dans la réponse). –