2017-08-24 1 views
0

J'ai écrit l'exemple simple pour faire couler la table mais obtenir cette exception dans Apache Flink même après avoir implémenté AppendTableSink Interface.Obtention d'une erreur en-dessous même après l'implémentation d'AppendTableStream Interface dans Apache Flink

package com.cc.flink.functionUtils; 

    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collection; 
    import java.util.Iterator; 

    import org.apache.flink.api.common.functions.IterationRuntimeContext; 
    import org.apache.flink.api.common.functions.MapFunction; 
    import org.apache.flink.api.common.functions.RichFunction; 
    import org.apache.flink.api.common.io.OutputFormat; 
    import org.apache.flink.api.common.typeinfo.TypeInformation; 
    import org.apache.flink.api.java.io.LocalCollectionOutputFormat; 
    import org.apache.flink.api.java.tuple.Tuple2; 
    import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
    import org.apache.flink.configuration.Configuration; 
    import org.apache.flink.contrib.streaming.DataStreamUtils; 
    import org.apache.flink.streaming.api.datastream.DataStream; 
    import org.apache.flink.streaming.api.datastream.DataStreamSink; 
    import org.apache.flink.streaming.api.datastream.DataStreamSource; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.streaming.api.functions.sink.SinkFunction; 
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; 
    import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; 
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 
    import org.apache.flink.table.api.Table; 
    import org.apache.flink.table.api.TableEnvironment; 
    import org.apache.flink.table.api.java.StreamTableEnvironment; 
    import org.apache.flink.table.sinks.AppendStreamTableSink; 
    import org.apache.flink.table.sinks.RetractStreamTableSink; 
    import org.apache.flink.table.sinks.TableSink; 
    import org.apache.flink.types.Row; 




    public class MyTable implements AppendStreamTableSink<Row>{ 



     @Override 
     public TableSink<Row> configure(String[] arg0, TypeInformation<?>[] arg1) { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public String[] getFieldNames() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public TypeInformation<?>[] getFieldTypes() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public TypeInformation<Row> getOutputType() { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public void emitDataStream(DataStream<Row> arg0) { 
      // TODO Auto-generated method stub 
      arg0.print(); 

     } 




     public static void main(String[] args) throws Exception { 

      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
      final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() 
        .setHost("localhost") 
        .setVirtualHost("/") 
        .setUserName("guest") 
        .setPassword("guest") 
        .setPort(5672) 
        .build(); 


      final DataStream<String> stream = env 
        .addSource(new RMQSource<String>(
          connectionConfig,   // config for the RabbitMQ connection 
          "test",     // name of the RabbitMQ queue to consume 
          true,      // use correlation ids; can be false if only at-least-once is required 
          new SimpleStringSchema())) // deserialization schema to turn messages into Java objects 
        .setParallelism(1); 



      final ArrayList<String> values = new ArrayList<>(); 
      StreamTableEnvironment StreamTableEnv = TableEnvironment.getTableEnvironment(env); 
      Table fromDataStream = StreamTableEnv.fromDataStream(stream, 
        "member_id"); 
      StreamTableEnv.registerTable("emp1",fromDataStream); 
      Table output =StreamTableEnv.sql("select count(*) from emp1 where member_id Like '%test%'"); 
      fromDataStream.writeToSink(new MyTable()); 
      env.execute(); 

     } 

    }  

log4j: WARN Aucun appenders n'a pu être trouvée pour enregistreur (org.apache.calcite.sql.parser). log4j: WARN Veuillez initialiser correctement le système log4j. log4j: WARN voir http://logging.apache.org/log4j/1.2/faq.html#noconfig pour plus d'infos

Exception dans le thread "principal" org.apache.flink.table.api.TableException: Tableaux de flux ne peuvent être émis par AppendStreamTableSink, RetractStreamTable

à org.apache. flink.table.api.StreamTableenvironment.writeToSink (StreamTableenvironment.scala: 219)

à org.apache.flink.table.api.Table.writeToSink (table.scala: 800)

à org.apache. flink.table.api.Table.writeToSink (table.scala: 773)

à com.cc.flink.functionutils.MyTable.main (MyTable.java:103)

Répondre

0

Le problème dans votre exemple est que vous essayez d'utiliser un AppendTableSink mais votre requête produit rétractations. Cela est dû à COUNT(*) dans votre déclaration. Chaque fois qu'une nouvelle ligne arrive, l'ancien compte émis n'est plus valide et doit être rétracté.

S'il s'agissait simplement d'un SELECT *, chaque ligne entrante produirait exactement une ligne de sortie qui n'affecte pas les lignes précédentes.

+0

Salut twalthr Merci pour la réponse, si vous voyez le code j'essaye de couler la table nommée "fromDataStream" pas la table nommée "output" .première j'ai essayé d'implémenter le retractTableSink ça ne marchera pas alors j'ai essayé d'implémenter appendTableSink & essayé de couler le fromDataStream mais a eu la même erreur dans les deux cas –