2017-05-16 1 views
4

Je suis en œuvre une application étincelle, dont ci-dessous est un extrait de l'échantillon (pas le même code exact):Quand le cache et la persistance sont-ils exécutés (puisqu'ils ne ressemblent pas à des actions)?

val rdd1 = sc.textfile(HDFS_PATH) 
val rdd2 = rdd1.map(func) 
rdd2.persist(StorageLevel.MEMORY_AND_DISK) 
println(rdd2.count) 

Lors de la vérification de la performance de ce code de l'interface utilisateur Spark, je vois une entrée pour le count action, mais pas pour le persist. Le DAG pour cette action de compte possède également un nœud pour la transformation 'map' (ligne 2 du code ci-dessus).

Est-il sûr de conclure que la transformation de carte est exécutée lorsque count (dans la dernière ligne) est rencontré, et non lorsque persist est rencontré?

En outre, à quel moment est RDD2 réellement persisté? Je comprends que seulement deux types d'opérations peuvent être appelés sur les RDD - les transformations et les actions. Si le RDD est persistant paresseusement lorsque l'action count est appelée, persisterait-il être considéré comme une transformation ou une action ou pas?

Répondre

1

Est-il sûr de conclure que la transformation de la carte est exécutée lorsque le nombre (dans la dernière ligne) est rencontrée, et non quand persist est rencontré?

Oui

De plus, à quel point est RDD2 réellement persisté?

Les données sont lues, cartographié, et a persisté tout à la fois d'exécuter l'instruction de comptage

persisterait être considérée comme une transformation ou une action ou aucun des deux?

Ce n'est pas vraiment non plus, mais en termes de travail de traitement, vous pouvez le considérer comme une transformation. Spark est paresseux et ne fonctionnera que lorsque vous demanderez un résultat. Aucun résultat n'est requis lorsque vous persistez dans un bloc de données. Spark ne fonctionne donc pas. De cette façon, persist est comme une transformation

7

Les opérateurs cache et persist de Dataset sont paresseux et n'ont aucun effet jusqu'à ce que vous appeliez une action (et attendez que la mise en cache soit terminée, ce qui est le prix supplémentaire pour avoir de meilleures performances plus tard).

De la documentation officielle de Spark RDD Persistence (avec la phrase dans le mien gras):

L'une des capacités les plus importantes de Spark persiste (ou la mise en cache) un ensemble de données dans la mémoire à travers les opérations. Lorsque vous persistez dans un RDD, chaque nœud stocke toutes les partitions qu'il calcule en mémoire et les réutilise dans d'autres actions sur cet ensemble de données (ou les jeux de données qui en dérivent). Cela permet aux actions futures d'être beaucoup plus rapide (souvent de plus de 10x). La mise en cache est un outil clé pour les algorithmes itératifs et l'utilisation interactive rapide.

Vous pouvez marquer un RDD à persister à l'aide des méthodes persist() ou cache(). La première fois qu'il est calculé dans une action, il sera conservé en mémoire sur les noeuds. Le cache de Spark est tolérant aux pannes. Si une partition d'un RDD est perdue, il sera automatiquement recalculé en utilisant les transformations qui l'ont créé. (! Spark et SQL lui-même)

C'est exactement la raison pour laquelle certaines personnes font l'astuce suivante:

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count 

pour déclencher la mise en cache. L'opérateur est assez bon marché, l'effet net est que la mise en cache est exécutée presque immédiatement après la ligne (il peut y avoir un petit délai avant la fin de la mise en cache car elle s'exécute de manière asynchrone).

Alors quand vous avez demandé:

serait persist être considéré comme une transformation ou une action ou aucun des deux?

Je dirais que ce n'est ni et considère comme un indice d'optimisation (qui peut ou ne peut pas être exécuté ou pris en compte jamais).Utilisez l'onglet Stockage de l'interface utilisateur Web pour voir quels sont les jeux de données (en tant que leurs RDD sous-jacents) qui ont déjà été conservés.

enter image description here

Vous pouvez également voir la sortie des opérateurs cache ou persist en utilisant explain (ou simplement QueryExecution.optimizedPlan).

val q1 = spark.range(10).groupBy('id % 5).count.cache 
scala> q1.explain 
== Physical Plan == 
InMemoryTableScan [(id % 5)#84L, count#83L] 
    +- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
     +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)]) 
      +- Exchange hashpartitioning((id#77L % 5)#88L, 200) 
       +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)]) 
        +- *Range (0, 10, step=1, splits=8) 

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString) 
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L]) 
02  +- Exchange hashpartitioning((id#77L % 5)#88L, 200) 
03   +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L]) 
04    +- *Range (0, 10, step=1, splits=8) 

// Cache sample table range5 using pure SQL 
// That registers range5 to contain the output of range(5) function 
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") 
val q2 = spark.sql("SELECT * FROM range5") 
scala> q2.explain 
== Physical Plan == 
InMemoryTableScan [id#0L] 
    +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5` 
     +- *Range (0, 5, step=1, splits=8) 

opérateur physique InMemoryTableScan (avec InMemoryRelation plan logique) est de savoir comment vous pouvez vous assurer que la requête est mise en mémoire cache en mémoire et donc réutilisé.


De plus, Spark SQL lui-même utilise le même schéma pour déclencher la mise en cache de dataframe pour CACHE TABLE query de SQL (qui, contrairement à la mise en cache RDD, est par défaut impatient):

if (!isLazy) { 
    // Performs eager caching 
    sparkSession.table(tableIdent).count() 
} 

Cela signifie que selon les opérateurs vous pouvez avoir un résultat différent en ce qui concerne la mise en cache. cache et persist opérateurs sont paresseux par défaut tandis que CACHE TABLE de SQL est désireux.