J'ai écrit l'exemple simple pour faire couler la table mais obtenir cette exception dans Apache Flink même après avoir implémenté AppendTableSink Interface.Obtention d'une erreur en-dessous même après l'implémentation d'AppendTableStream Interface dans Apache Flink
package com.cc.flink.functionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
public class MyTable implements AppendStreamTableSink<Row>{
@Override
public TableSink<Row> configure(String[] arg0, TypeInformation<?>[] arg1) {
// TODO Auto-generated method stub
return null;
}
@Override
public String[] getFieldNames() {
// TODO Auto-generated method stub
return null;
}
@Override
public TypeInformation<?>[] getFieldTypes() {
// TODO Auto-generated method stub
return null;
}
@Override
public TypeInformation<Row> getOutputType() {
// TODO Auto-generated method stub
return null;
}
@Override
public void emitDataStream(DataStream<Row> arg0) {
// TODO Auto-generated method stub
arg0.print();
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.setPort(5672)
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"test", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1);
final ArrayList<String> values = new ArrayList<>();
StreamTableEnvironment StreamTableEnv = TableEnvironment.getTableEnvironment(env);
Table fromDataStream = StreamTableEnv.fromDataStream(stream,
"member_id");
StreamTableEnv.registerTable("emp1",fromDataStream);
Table output =StreamTableEnv.sql("select count(*) from emp1 where member_id Like '%test%'");
fromDataStream.writeToSink(new MyTable());
env.execute();
}
}
log4j: WARN Aucun appenders n'a pu être trouvée pour enregistreur (org.apache.calcite.sql.parser). log4j: WARN Veuillez initialiser correctement le système log4j. log4j: WARN voir http://logging.apache.org/log4j/1.2/faq.html#noconfig pour plus d'infos
Exception dans le thread "principal" org.apache.flink.table.api.TableException: Tableaux de flux ne peuvent être émis par AppendStreamTableSink, RetractStreamTable
à org.apache. flink.table.api.StreamTableenvironment.writeToSink (StreamTableenvironment.scala: 219)
à org.apache.flink.table.api.Table.writeToSink (table.scala: 800)
à org.apache. flink.table.api.Table.writeToSink (table.scala: 773)
à com.cc.flink.functionutils.MyTable.main (MyTable.java:103)
Salut twalthr Merci pour la réponse, si vous voyez le code j'essaye de couler la table nommée "fromDataStream" pas la table nommée "output" .première j'ai essayé d'implémenter le retractTableSink ça ne marchera pas alors j'ai essayé d'implémenter appendTableSink & essayé de couler le fromDataStream mais a eu la même erreur dans les deux cas –