2017-08-31 1 views
0

Je suis en train de traiter des données avec Scala 2.11.7 et Flink 1.3.2. Maintenant, je voudrais stocker le org.apache.flink.api.scala.DataSet résultant dans une base de données de graphes neo4j.neo4j avec Flink et Scala

Il y a des projets Github pour la compatibilité:

  • Flink avec Neo4j: https://github.com/s1ck/flink-neo4j
  • Scala avec Neo4j: _https: //github.com/FaKod/neo4j-scala
  • bibliothèque graphique de Flink « Gelly "avec neo4j: _https: //github.com/albertodelazzari/gelly-neo4j

Quelle est la voie la plus prometteuse? Ou devrais-je mieux utiliser directement l'API REST de neo4j?

(BTW: Pourquoi stackoverflow limiter le nombre de liens postet ...?)

J'ai essayé Flink-Neo4j, mais il semble qu'il ya des problèmes avec le mélange des classes Java et Scala:

package dummy.neo4j 

import org.apache.flink.api.common.io.OutputFormat 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.{Tuple, Tuple2} 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish 

    val tuple1: Tuple = new Tuple2("abc", 1) 
    val tuple2: Tuple = new Tuple2("def", 2) 

    val test = env.fromElements[Tuple](tuple1, tuple2) 
    println("test: " + test.getClass) 
    test.output(outputFormat) 
    } 

} 

Exception dans le fil "principal" java.lang.ClassCastException: [Ljava.lang.Object; ne peut pas être converti en [Lorg.apache.flink.api.common.typeinfo.TypeInformation; à dummy.neo4j.Neo4jDummyWriter $ .main (Neo4jDummyWriter.scala: 20) à dummy.neo4j.Neo4jDummyWriter.main (Neo4jDummyWriter.scala)

et

Type mismatch, attendu: OutputFormat [tuple], réelle: OutputFormat [_ <: tuple]

Répondre

0

La solution est de ne pas modifier les objets Tuple2 à tuple:

package dummy.neo4j 

import org.apache.flink.api.common.io._ 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.Tuple2 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val tuple1 = ("user9", 1978) 
    val tuple2 = ("user10", 1996) 
    val datasetWithScalaTuples = env.fromElements(tuple1, tuple2) 
    val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2)) 

    val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]] 

    dataset.output(outputFormat) 
    env.execute 
    } 

}