2017-01-29 3 views
0

Je cherchais depuis un certain temps maintenant sur la façon d'utiliser un opérateur Java personnalisé sont acceptés dans les InfoSphere Streams Java APIen utilisant Java opérateur personnalisé au sein de l'API java dans InfoSphere Streams

Ce que j'ai besoin après avoir écrit un opérateur personnalisé comme ci-dessous .. .

public class Test extends AbstractOperator { 
private int i; 
private int num; 
@Override 
public synchronized void initialize(OperatorContext context) throws Exception { 
super.initialize(context); 
i = 0; .... 

Je veux l'utiliser comme ci-dessous ....

 Topology topology = new Topology("toplogy_test"); 
     TStream<String> inDataFileName = ... 
//call the "Test" operator here 

Répondre

0

Vous pouvez appeler un opérateur de l'opérateur Java/C++ de l'API de topologie en procédant comme suit:

  1. Ajouter la boîte à outils de l'opérateur Java:

    SPL.addToolkit (topologie, nouveau fichier ("/ home/streamsadmin/myTk"));

  2. convertir le flux entrant flux SPL:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>"); 
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){ 
    
        @Override 
        public OutputTuple apply(String input_string, OutputTuple output_rstring) { 
        output_rstring.setString("rstring_attr_name", input_string); 
        return output_rstring; 
        }}, rstringSchema); 
    
  3. invoquons l'opérateur:

    SPLStream splOutputStream = SPL.invokeOperator ("OperatorNamespace :: YourOperatorName", splInputStream, rstringSchema, nouvelle HashMap ());

Vous pouvez trouver plus d'informations à ce sujet ici:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

Sur une note de côté, si vous songez à utiliser l'API de topologie pour écrire la topologie Streams, il est plus facile d'écrire une classe Java régulière et l'invoque directement à partir de l'API de topologie.

Par exemple:

MyJavaCode someObj = new MyJavaCode(); 

Topology topology = new Topology("MyTopology"); 
TStream<String> inDataFileName = ... 
inDataFileName.transform(new Function<String, String>(){ 
     @Override 
     public String apply(String word) { 
      return someObj.someFunction(word); 
     } 
    }); 

La seule exigence est ici que votre classe Java doit implémenter Serializable.

+0

someObj.someFunction (mot), cette fonction peut-elle être un appel de service Web ou un appel d'insertion JDBC? –

+0

Oui, la fonction peut faire n'importe quoi. J'ai pris cette approche et encapsulé la bibliothèque GSON pour effectuer l'analyse JSON en continu avec l'API de topologie. L'essentiel est de s'assurer que la classe est Serializable et que les objets qui sont retournés à l'API de topologie sont également sérialisables. –