1

Quand je lance le code:similitudes PySpark récupérées par IndexedRowMatrix() columnSimilarities() ne sont pas acessible: INFO ExternalSorter: Discussion * spilling carte en mémoire

from pyspark import SparkContext 
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating 
from random import random 
import os 
from scipy.sparse import csc_matrix 
import pandas as pd 
from pyspark.mllib.linalg.distributed import RowMatrix 
from pyspark.mllib.linalg import Vectors 
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry 

from pyspark.sql import SQLContext 

sc =SparkContext() 
sqlContext = SQLContext(sc) 
df = pd.read_csv("/Users/Andre/Code/blitsy-analytics/R_D/Data/cust_item_counts.csv", header=None) 
customer_map = {x[1]:x[0] for x in enumerate(df[0].unique())} 
item_map = {x[1]:x[0] for x in enumerate(df[1].unique())} 
df[0] = df[0].map(lambda x: customer_map[x]) 
df[1] = df[1].map(lambda x: item_map[x]) 
#matrix = csc_matrix((df[2], (df[0], df[1])),shape=(max(df[0])+1, max(df[1])+1)) 

entries = sc.parallelize(df.apply(lambda x: tuple(x), axis=1).values) 
mat = CoordinateMatrix(entries).toIndexedRowMatrix() 
sim = mat.columnSimilarities() 
sim.entries.map(lambda x: x).first() 

je suis jeté dans une boucle de fils déversant sur le disque:

> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 294 
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 293 
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 292 
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 291 
> 16/04/01 12:09:42 INFO ExternalSorter: Thread 108 spilling in-memory 
> map of 137.6 MB to disk (1 time so far) 16/04/01 12:09:42 INFO 
> ExternalSorter: Thread 112 spilling in-memory map of 158.1 MB to disk 
> (1 time so far) 16/04/01 12:09:42 INFO ExternalSorter: Thread 114 
> spilling in-memory map of 154.2 MB to disk (1 time so far) 16/04/01 
> 12:09:42 INFO ExternalSorter: Thread 113 spilling in-memory map of 
> 143.4 MB to disk (1 time so far) 

Ceci n'est pas vrai à partir de la matrice 'mat' qui retourne son entrée de première ligne.

Cela concerne-t-il la gestion de la mémoire ou la fonction columnSimilarity() elle-même?

J'ai ~ 86000 lignes et colonnes dans la variable sim.

Mon jeu de données était une liste de tuples (user_id, item_id, value). Je transforme la plage user_id et item_id en valeurs entre 0 et len ​​(user_id | tem_id). C'est ainsi qu'un identifiant de 800000 ne force pas une matrice aussi grande.

Il existe 800 000 entrées de ce type. La matrice de la variable 'mat' contient la valeur du tuple aux coordonnées de (user_id, item_id). Ceci est vérifié par moi comme étant le cas.

La matrice de 'mat' compte environ 41 000 utilisateurs et environ 86 000 éléments. La colonne Similarité crée des comparaisons entre chaque article, c'est pourquoi il a des dimensions 86k x 86k

Tout cela a été fait dans le terminal pyspark ./bin/pyspark.

+1

Quelle version d'étincelle utilise? Quelle est la configuration de votre cluster? – eliasah

+0

@eliasah Je suis juste en cours d'exécution sur local pour tester. J'utilise le spark-master actuel de github. – user1340048

+1

Pouvez-vous essayer avec la branche 1.6? Pour s'assurer que ce n'est pas seulement dans la version 2.0. Ces versions ne sont pas encore officiellement publiées. – eliasah

Répondre

1

Comme indiqué dans le commentaire, le problème est lié au fait que vous avez beaucoup de données qui n'ont pas été correctement partitionnées en fonction de la configuration de votre cluster. C'est pourquoi ça débordait sur le disque.

Vous aurez besoin de donner à votre application plus de ressources en mémoire et/ou d'augmenter les partitions de données.