2017-09-26 13 views
1

cela fonctionne avec parquetComment interroger des ensembles de données au format avro?

val sqlDF = spark.sql("SELECT DISTINCT field FROM parquet.`file-path'") 

J'ai essayé de la même manière avec Avro, mais il continue à me donner une erreur, même si j'utilise com.databricks.spark.avro.

Lorsque j'exécute la requête suivante:

val sqlDF = spark.sql ("SELECT Source_Product_Classification DISTINCT Avro file path.")

je reçois le AnalysisException. Pourquoi?

org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;; line 1 pos 51 
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) 
    at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:61) 
    at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:38) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58) 
    at org.apache.spark.sql.execution.datasources.ResolveDataSource.apply(rules.scala:38) 
    at org.apache.spark.sql.execution.datasources.ResolveDataSource.apply(rules.scala:37) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67) 
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50) 
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63) 
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) 

Changer le nom du format à com.databricks.spark.avro ne fait aucune différence et les requêtes échouent.

val sqlDF = spark.sql("SELECT DISTINCT Source_Product_Classification FROM com.databricks.spark.avro`file-path`") 

org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '.' expecting {<EOF>, ',', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'RIGHT', 'FULL', 'NATURAL', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', 'DIV', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 65) 

== SQL == 
SELECT DISTINCT Source_Product_Classification FROM com.databricks.spark.avro`/uat/myfile` 
-----------------------------------------------------------------^^^ 

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197) 
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99) 
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45) 
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) 
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592) 
    ... 48 elided 

Répondre

0

Spark SQL prend en charge le format Avro via un module spark-avro séparé.

Une bibliothèque pour lire et écrire des données Avro à partir de SQL Spark.

Veuillez noter que spark-avro est un module Seaprate qui n'est pas inclus par défaut dans Spark.

Vous devez charger le module à l'aide de spark-submit --packages, par ex.

$ bin/spark-shell --packages com.databricks:spark-avro_2.11:3.2.0 

Voir With spark-shell or spark-submit.