J'utilise actuellement un maître (machine locale) et deux travailleurs (2 * 32 noyaux, mémoire 2 * 61,9 Go) pour l'algorithme classique de la SLA du Spark et produire le code suivant pour l'évaluation du temps:évolutivité Spark
import numpy as np
from scipy.sparse.linalg import spsolve
import random
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import hashlib
#Spark configuration settings
conf = SparkConf().setAppName("Temp").setMaster("spark://<myip>:7077").set("spark.cores.max","64").set("spark.executor.memory", "61g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#first time
t1 = time.time()
#load the DataFrame and transform it into RDD<Rating>
rddob = sqlContext.read.json("file.json").rdd
rdd1 = rddob.map(lambda line:(line.ColOne, line.ColTwo))
rdd2 = rdd1.map(lambda line: (line, 1))
rdd3 = rdd2.reduceByKey(lambda a,b: a+b)
ratings = rdd3.map(lambda (line, rating): Rating(int(hash(line[0]) % (10 ** 8)), int(line[1]), float(rating)))
ratings.cache()
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 5
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
#second time
t2 = time.time()
#print results
print "Time of ALS",t2-t1
Dans ce code je maintiens tous les paramètres constants à l'exception du paramètre set("spark.cores.max","x")
pour lequel j'utilise les valeurs suivantes pour x: 1,2,4,8,16,32,64
. J'ai eu la prochaine évaluation de temps:
#cores time [s]
1 20722
2 11803
4 5596
8 3131
16 2125
32 2000
64 2051
Les résultats de l'évaluation sont un peu étranges pour moi. Je vois une bonne évolutivité linéaire par le petit nombre de cœurs. Mais dans la gamme de 16, 32 et 64 cœurs possibles, je ne vois plus d'évolutivité, ni d'amélioration de la performance temporelle. Comment est-ce possible? Mon fichier d'entrée mesure environ 70 Go et a 200 000 000 lignes.