2015-07-30 7 views
1

Je travaille sur le développement d'une fonction pour convertir un vecteur en une ligne sql afin de le convertir en une trame de données et l'enregistrer dans une table en utilisant SQLcontext dans l'étincelle Apache. Je me développe en clojure et je me suis perdu en cours de route. Je pensais que la mise en œuvre de la solution ainsi:Convertir le vecteur clojure en flambo sql row

  1. Pour chaque RDD (vecteur) convertir en lignes
  2. Convertir les lignes à une trame de données
  3. trame de données Enregistrer une table
  4. utiliser le SqlContext pour demander des informations particulières dans la table
  5. et comment convertir le résultat de la requête en RDD pour une analyse ultérieure.

    (defn assign-ecom 
        [] 
        (let [rdd-fields (-> (:rdd @transformed-rdd) 
           (f/map #(sql/row->vec %)) 
            f/collect)] 
        (clojure.pprint/pprint rdd-fields))) 
    

J'utilise Flambo v0.60 fonction api pour abstraire fonctions Apache-étincelle, je salue aussi toute suggestion quant à la façon de s'y prendre pour résoudre le problème. Merci

Voici le lien vers la ligne Flambo -> VEC docs:

Flambo documentation:

+0

@ zero323 Merci !. J'ai essayé de l'éditer mais d'une façon ou d'une autre, le code n'était pas correctement formaté. – Jyd

+0

Si vous avez une liste, vous avez besoin d'un autre niveau d'imbrication pour le code (8 espaces au total). – zero323

Répondre

4

Je suppose que vous avez déjà spark-context (sc) et sql-context (sql-ctx). Tout d'abord permet d'importer tous les trucs que nous aurons besoin:

(import org.apache.spark.sql.RowFactory) 
(import org.apache.spark.sql.types.StructType) 
(import org.apache.spark.sql.types.StructField) 
(import org.apache.spark.sql.types.Metadata) 
(import org.apache.spark.sql.types.DataTypes) 
  1. Pour chaque RDD (vecteur) convertir en lignes

    ;; Vector to Row conversion 
    (defn vec->row [v] 
        (RowFactory/create (into-array Object v))) 
    
    ;; Example data 
    (def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]]) 
           (f/map vec->row))) 
    
  2. Convertir les lignes à une trame de données

    ;; Define schema 
    (def schema 
        (StructType. 
        (into-array StructField 
        [(StructField. "k" (DataTypes/StringType) false (Metadata/empty)) 
         (StructField. "v" (DataTypes/IntegerType) false (Metadata/empty))]))) 
    
    ;; Create data frame 
    (def df (.createDataFrame sql-ctx rows schema)) 
    
    ;; See if it works 
    (.show df) 
    
  3. Enregistrer la trame de données dans un tableau

    (.registerTempTable df "df") 
    
  4. utiliser la SqlContext à rechercher des informations notamment dans le tableau

    (def df-keys (.sql sql-ctx "SELECT UPPER(k) as k FROM df")) 
    ;; Check results 
    (.show df-keys) 
    
  5. et comment convertir le résultat de la requête en en RDD à nouveau pour une analyse ultérieure.

    (.toJavaRDD df-keys) 
    

    ou si vous voulez des vecteurs:

    (f/map (.toJavaRDD df-keys) sql/row->vec) 
    
+0

Merci de votre aide. C'est exactement ce dont j'avais besoin – Jyd