0

Le suivant code Spark:Impossible de modifier le niveau de stockage de RDD

val model = ALS.trainImplicit(ratings = ratingsRDD, 
           rank = rank, 
           iterations = numIterations, 
           lambda = lambda, 
           alpha = alpha) 

model.productFeatures.cache() 

val modelSubsetRDD = new MatrixFactorizationModel(
    rank = rank, 
    userFeatures = model.productFeatures, 
    productFeatures = model.productFeatures) 

soulève l'exception ci-dessous:

Impossible de changer le niveau de stockage d'un RDD après avoir été déjà attribué un niveau

La même exception est soulevée avec StorageLevel.MEMORY_ONLY.

D'autre part, le code suivant fonctionne correctement:

val model = ALS.trainImplicit(ratings = ratingsRDD, 
           rank = rank, 
           iterations = numIterations, 
           lambda = lambda, 
           alpha = alpha) 
    val modelSubsetRDD = new MatrixFactorizationModel(
     rank = rank, 
     userFeatures = model.userFeatures, 
     productFeatures = model.productFeatures) 

    model.userFeatures.persist(StorageLevel.MEMORY_ONLY) 
    model.productFeatures.persist(StorageLevel.MEMORY_ONLY) 

Remarqué que cette fois userFeatures et productFeatures sont mis à deux membres différents du modèle. Cependant, je ne sais pas pourquoi cela fonctionne.

+0

Si vous rencontrez toujours des problèmes, veuillez mettre à jour votre question. Si la réponse ci-dessous résout votre problème, veuillez le marquer comme accepté. – micker

Répondre

1

Vous pouvez obtenir une certaine persistance d'ailleurs dans le code? Vous ne savez pas ce que fait ALS.trainImplicit avant de retourner le modèle.

L'appel cache() va stocker le RDD dans MEMORY_ONLY tandis que l'appel persist vous permet de modifier le type de mise en cache. Donc je devine que ce RDD a déjà été persistant quelque part et vous essayez de le ré-persister avec cache() qui est le problème. Cependant, changer le type de persistance avec persist est parfaitement acceptable.

modifier:

Essayez le code suivant:

val model = ALS.trainImplicit(ratings = ratingsRDD, 
           rank = rank, 
           iterations = numIterations, 
           lambda = lambda, 
           alpha = alpha) 
if(model.productFeatures.getStorageLevel() == StorageLevel.NONE) 
    model.productFeatures.cache() 

val modelSubsetRDD = new MatrixFactorizationModel(
    rank = rank, 
    userFeatures = model.productFeatures, 
    productFeatures = model.productFeatures) 

Cela devrait vous éviter d'essayer de mettre en cache quelque chose qui a déjà été mis en cache (soit dans la mémoire ou disque).

+0

Je reçois la même exception avec persist. Étrangement, le dernier morceau de code fonctionne. – Bob

+0

est-ce que MatrixFactorizationModel fait un individu? Vraiment difficile d'aider avec deux classes qui sont essentiellement des boîtes noires. – micker

+0

oui, mais j'utilise le même code dans les deux cas. La seule différence est que dans un cas, j'utilise deux fois le produit. – Bob