2016-04-11 2 views
1

J'utilise actuellement un maître (machine locale) et deux travailleurs (2 * 32 noyaux, mémoire 2 * 61,9 Go) pour l'algorithme classique de la SLA du Spark et produire le code suivant pour l'évaluation du temps:évolutivité Spark

import numpy as np 
from scipy.sparse.linalg import spsolve 
import random 
import time 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating 
import hashlib 

#Spark configuration settings 
conf = SparkConf().setAppName("Temp").setMaster("spark://<myip>:7077").set("spark.cores.max","64").set("spark.executor.memory", "61g") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

#first time 
t1 = time.time() 

#load the DataFrame and transform it into RDD<Rating> 
rddob = sqlContext.read.json("file.json").rdd 
rdd1 = rddob.map(lambda line:(line.ColOne, line.ColTwo)) 
rdd2 = rdd1.map(lambda line: (line, 1)) 
rdd3 = rdd2.reduceByKey(lambda a,b: a+b) 
ratings = rdd3.map(lambda (line, rating): Rating(int(hash(line[0]) % (10 ** 8)), int(line[1]), float(rating))) 
ratings.cache() 

# Build the recommendation model using Alternating Least Squares 
rank = 10 
numIterations = 5 
model = ALS.train(ratings, rank, numIterations) 

# Evaluate the model on training data 
testdata = ratings.map(lambda p: (p[0], p[1])) 
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) 
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) 
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() 
print("Mean Squared Error = " + str(MSE)) 

#second time 
t2 = time.time() 

#print results 
print "Time of ALS",t2-t1 

Dans ce code je maintiens tous les paramètres constants à l'exception du paramètre set("spark.cores.max","x") pour lequel j'utilise les valeurs suivantes pour x: 1,2,4,8,16,32,64. J'ai eu la prochaine évaluation de temps:

#cores time [s] 
1  20722 
2  11803 
4  5596 
8  3131 
16  2125 
32  2000 
64  2051 

Les résultats de l'évaluation sont un peu étranges pour moi. Je vois une bonne évolutivité linéaire par le petit nombre de cœurs. Mais dans la gamme de 16, 32 et 64 cœurs possibles, je ne vois plus d'évolutivité, ni d'amélioration de la performance temporelle. Comment est-ce possible? Mon fichier d'entrée mesure environ 70 Go et a 200 000 000 lignes.

Répondre

2

L'évolutivité linéaire dans un système distribué tel que Spark n'est que dans une petite partie du fait de l'augmentation du nombre de cœurs. La partie la plus importante est l'opportunité de distribuer des IO disque/réseau. Si vous avez un nombre constant de travailleurs et que vous n'échelonnez pas le stockage en même temps, vous arriverez rapidement au point où le débit est limité par IO.