Efficiency comparison between using PySpark data frames and using PySpark RDD for higher level models like recommender systems
Overview of Project
Problem domain and project motivation: PySpark offers two options for storing and manipulating data: Pandas like data frame structure (not exactly same as Pandas df), resilient distributed dataset (RDD) data structure. Users often ask which one should be chosen for the task at hand [1]. Knowing which of two data-structures (dataframe and RDD) perform better for a model-based recommender system will help Spark practitioners develop a good understanding of the choice they have to make for applications similar to model-based recommender system.
When it comes to simple low level actions on tables in a dataset, it is known that PySpark data frames are faster than RDD [2]. But that information alone does not tell much about how they compare in higher-level modeling actions, in terms of overall training error and training time for predictive model (e.g. recommender systems). The goal of this project is to fill the gap mentioned above by establishing a comparison of training performance of a matrix-factorization based recommender system created using data frame and one created using RDD.
Background on recommender and matrix factorization: Recommendation Engines better known as recommender system is a tool that analyzes data in order to make suggestions for something that might be a website, Ad banner, products user might be interested in. Collaborative filtering is one of many ways to make a recommender system, by finding similar users or items and to calculate rating based on ratings of similar users. In this approach, the similarity is not calculated using features like the age of users, genre of the movie, or any other data about users or items. It is calculated only on the basis of the rating (explicit or implicit) a user gives to an item. One of ways of collaborative filtering is model-based approach, which involve a step to reduce or compress the large but sparse user-item association matrix. Matrix factorization is one way to achieve that. A matrix user-item matrix A with dimensions m x n is reduced to a product of two matrices X and Y with dimensions m x p and p x n respectively, where p is number of latent factors learned in process.
Related data sets: For this comparison, I used MovieLens dataset ‘mllatest’ available here for downloading: https://grouplens.org/datasets/movielens/ . The data set has 27,000,000 ratings applied to 58,000 movies by 280,000 users.
Problem Statement
Do dataframes enable faster training and better learning accuracy than RDD? Also, are they fast in making prediction post training? Find answer to these questions by comparing the training performance (using the metrics defined later) of collaborative filtering model created on a PySpark cluster using data frame versus that created using RDD.
For MovieLens dataset used in this project
- user in user-item matrix is identified by userID
- item is movie rated by that user
- value at (user,item) cell is the rating given by that user
The strategy for accomplishing my goal is as follows. As matrix factorization is a well known efficient way of learning latent factors for user-item association matrix, I chose alternating least squares (ALS) [3] algorithm of spark.ml to learn these latent factors, and create collborative filtering recommender system.
I setup the PySpark cluster on local machine or GCP cloud, create a session of Apache Spark, read the data in expected format (dataframe or RDD). Fix the hyper parameters of project: range of ranks ALS will be trained for, maximum iterations of training, and the value of regularization parameter. These 3 parameters will be same for dataframe based ALS and RDD based ALS. Then, for both dataframe and RDD, I train ALS model using stored data, record the values of chosen metrics, and finally compare the measurements by visualizing them.
The expected outcome is that ALS using dataframe should train and predict faster than ALS using RDD without compromising significantly on the training accuracy.
Choice of Metrics
I want to measure the efficiency of the recommender made by choosing dataframe over RDD. Typically, such efficiency can be measured in terms of either learning time or learning error. Therefore, I chose following three metrics for the comparison.
- Validation RMSE: In matrix factorization large errors (differences between values predicted by a model and the ground truth values)are highly undesirable. In RMSE (Root mean squared error), the errors are squared before they are averaged. Therefore, it gives a relatively high weight to large errors. This means the RMSE is more useful here than other options like MAE(Mean absolute error). Therefore, our first metric is RMSE computed on validation set used during training of ALS.
- Training Time: This is the clock time taken for training ALS recommender model for a given choice of the number of latent factors.
- Prediction Time: The clock time taken for by trained recommender system to make a prediction for a single user.
Data Exploration and Processing
The data set has 27,000,000 ratings applied to 58,000 movies by 280,000 users. It has four columns: userId, movieId, rating, timeStamp. The dataset mllatest-small is quite clean because there are no NaN or missing values. Even if there are missing values, Spark ALS model accomodate them. ALS assigns NaN
predictions during ALSModel.transform
when a user and/or item factor is not present in the model.
Here is a basic description of dataset.
Let us see the overall distribution of data. Following plots show the distribution for ratings per user and ratings per movie. It is clear that some users are considerably more active raters than others, and some movies are considerably more rated than others. Such diversity is expected in user-item matrix.
For using dataframes in matrix-factorization, there are no issues in this dataset that will hamper the training of ALS. But, since RDD does not work with unstructured data, I had to explicitly add structure by removing header, reading only 3 out of 4 columns, and removing ratings column for validation part.
ratings_raw_data = sparkSess.sparkContext.textFile(‘./ml-latest-small/ratings.csv’)
# re-format rdd for usage with ALS
ratings_raw_data_header = ratings_raw_data.take(1)[0]
# remove header and separate (userID, movieID, rating)
ratings_rdd = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
.map(lambda line: line.split(“,”)).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()# remove ratings from validation and testing datasets
validation_for_predict_ratings_rdd = validation_ratings_rdd.map(lambda x: (x[0], x[1]))
test_for_predict_ratings_rdd = test_ratings_rdd.map(lambda x: (x[0], x[1]))As we are making a recommender, there is no need to look at outliers in ratings.
Here is a basic description here.
Post-training processing: For dataframes, there is a post-processing step needed after training and before validation. ALS assigniin NaN values is undesirable for measuring RMSE. Therefore, before computing RMSE, I used an additional filtering step removing rows with NaNs. (Offcourse, this can also be done before training!)
# remove NaN values from predictions
predicted_ratings_df_val = predict_df_val.filter(predict_df_val.prediction != float(‘nan’))
# evaluate rmse for predicted ratings
error = evaluator.evaluate(predicted_ratings_df_val)
Implementation of The Comparison
Now I explain the algorithms and steps used in implementation.
Algorithms / Techniques Used
For PySpark dataframe, we use pyspark.ml.recommendation.ALS as recommender model and pyspark.ml.evaluation.RegressionEvaluator as RMSE calculator.
# for working with sql dataframes
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
ALS method takes following arguments [4]:
- numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
- rank is the number of latent factors in the model (defaults to 10).
- maxIter is the maximum number of iterations to run (defaults to 10).
- regParam specifies the regularization parameter in ALS (defaults to 1.0).
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to
false
which means using explicit feedback). - alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
- nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to
false
).
For PySpark RDD, we need to use pyspark.mllib.recommendation version of ALS
# import MLLibversion of ALS
from pyspark.mllib.recommendation import ALS as ALS2
from pyspark.mllib.recommendation import MatrixFactorizationModel as ALS2Model
Setting Up Machine
The initial steps vary depending on whether the testing is done on local machine or a cluster on cloud. I have used both, but the results are more clear in latter.
Requirements for setting up local machine for the project:
- Your machine should have latest version of Python, Apache Spark and Jupyter Notebook installed.
- You also need to connect PySpark with Jupyter Notebook and many tutorials are available out there to do the same.
Requirements for setting up a cluster on cloud (GCP in this case):
- Create a cloud project on GCP
- Follow the instructions here to create a cluster and storage bucket in same region. Make sure anaconda and jupyter are enabled. The chosen dataset need to be uploaded to the storage bucket and accessed from there.
Steps of Algorithm for Comparison
- A SparkSession has become an entry point to PySpark to work with RDD, DataFrame. So we create a SparkSession first.
sparkSess = SparkSession.builder.appName(‘movieApp’).getOrCreate()
2. Define the hyperparameters for both ALS model using dataframe and ALS model using RDD.
# a range for the number of latent factors in ALS model
ranks = [4, 8, 12]
# value of regularization
reg_Param = 0.01
# maximum nuber of iterations for training
max_Iter = 5
3. Read data
data = sparkSess.read.csv(‘gs://bucket-capstoneudacitydatascience/ml-latest/ratings.csv’,inferSchema=True,header=True)
4. Train and validate dataframe based ALS model for each value in the chosen range of ranks (Refinement of model). During the process, record training time and RMSE.
# split data in training, testing and validation subsets
(training, validation, test) = data.randomSplit([0.6, 0.2, 0.2])
# intialize fixed part of ALS model
als = ALS(maxIter=max_Iter, regParam=reg_Param, userCol=”userId”, itemCol=”movieId”, ratingCol=”rating”)
# define RMSE evaluator
evaluator = RegressionEvaluator(metricName=”rmse”, labelCol=”rating”,predictionCol=”prediction”)min_error = float(‘inf’)
best_rank = -1
for rank in ranks:
# Set the rank here
als.setRank(rank)
# record training time
t0 = time()
# fit the model with these parameters
model = als.fit(training)
tt = time() — t0
print (“Training completed in {} seconds”.format(round(tt,3)))
# create rating predictions against the validation dataframe
predict_df_val = model.transform(validation)
# remove NaN values from predictions
predicted_ratings_df_val = predict_df_val.filter(predict_df_val.prediction != float(‘nan’))
# evaluate rmse for predicted ratings
error = evaluator.evaluate(predicted_ratings_df_val)print (‘For rank %s, the validation RMSE is %s’ % (rank, error))
# record the best rank
if error < min_error:
min_error = error
best_rank = rankprint (‘The best model was trained with rank %s’ % best_rank)
4. Save the model with best RMSE.
# fit the model for best rank
als.setRank(best_rank)
t0 = time()
model = als.fit(training)
tt = time() — t0
print (“Training completed in {} seconds”.format(round(tt,3)))
model.write().overwrite().save(“./ml-latest-small/dfALS-model”)
5. Make prediction and record the time it took
# try creating recommendation for a user
single_user = test.filter(test[‘userId’]==11).select([‘movieId’,’userId’])
recommendations = saved_model.transform(single_user)
6. Repeat the steps 3–5 for Spark RDD based ALS model
# Use SparkContext to create resilient distributed dataset (RDD)
ratings_raw_data = sparkSess.sparkContext.textFile(‘gs://bucket-capstoneudacitydatascience/ml-latest/ratings.csv’,10)# split data in trainig testing and validation subsets
training_ratings_rdd, validation_ratings_rdd, test_ratings_rdd = ratings_rdd.randomSplit([6, 2, 2], seed=0)min_error = float(‘inf’)
best_rank = -1
for rank in ranks:
# record training time
t0rdd = time()
# fit the model with these parameters
model2 = ALS2.train(training_ratings_rdd, rank, iterations=max_Iter,
lambda_=reg_Param)
ttrdd = time() — t0rdd
print (“Training completed in {} seconds”.format(round(ttrdd,3)))# create rating predictions against the validation dataframe
predictions_val = model2.predictAll(validation_for_predict_ratings_rdd).map(lambda r: ((r[0], r[1]), r[2]))
# join ratings and predictions to compute rmse
rates_and_preds_val = validation_ratings_rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions_val)
# evaluate rmse for predicted ratings
error = math.sqrt(rates_and_preds_val.map(lambda r: (r[1][0] — r[1][1])**2).mean())
print (‘For rank %s, the validation RMSE is %s’ % (rank, error))
# record the best rank
if error < min_error:
min_error = error
best_rank = rankprint (‘The best model was trained with rank %s’ % best_rank)
# fit the model for best rank
t0rdd = time()
model2 = ALS2.train(training_ratings_rdd, 4, iterations=max_Iter,lambda_=reg_Param)
ttrdd = time() — t0rdd
print (“Training completed in {} seconds”.format(round(ttrdd,3)))
model2.save(sparkSess.sparkContext,”./ml-latest-small/rddALS-model”)single_user = test_ratings_rdd.filter(lambda line: line[0]==’11').map(lambda x: (x[0], x[1]))
recommendations = saved_model2.predictAll(single_user)
7. Visualize the results using matplotlib.pyplot
Improvement of Models
I tried a range of ranks [4,8,12] rather than a single rank value, and finally used the value corresponding to best RMSE. Both the ALS models showed best learning error for rank 4. Here are the RMSE values for those ranks
As the rank was only parameter varied, the final values of hyperparameters used by both ALS models are rank=4, maxIter =5, regParam=0.01
Challenges:
I faced a few challenges in this project. RDD works very differently from dataframe.
The manner of reading dataset in dataframe is not same as that for RDD. Dataframe can be read using read_csv method of a SparkSession. But RDD existed before dataframes were introduced, and RDD can be read using an older version of Spark entry point, called ‘SparkContext’. Therefore, I had to make a SparkContext within a SparkSession to read data as RDD.
Second challenge was using the right type of ALS library and ALSModel. pyspark.ml.recommendation version is more recent that pyspark.mllib.recommendation version of ALS. The former can be used with dataframe but not with RDD. So I had to use mllib version with RDD.
Results
Following plots show the RMSE and the training time for the chosen range of ranks. We show the outcomes for both ALS trained using PySpark dataframes and that trained using PySpark RDD. The RMSE values for both models stayed more or less around 0.85. The training times for ranks [4,8,12] for df-based ALS stayed around 73 seconds, [75, 70, 76], and those for RDD-based ALS model were [166, 91, 108] resp.
In terms of prediction times, for same user, ALS model using dataframes predicted movies in 0..038 seconds and ALS model using RDD predicted movies in 14.22 seconds.
Prediction completed in 0.038 seconds
+ — — — -+ — — — + — — — — — +
|movieId|userId|prediction|
+ — — — -+ — — — + — — — — — +
| 2502| 11| 4.146366|
| 2791| 11| 3.9784715|
| 1282| 11| 3.6588442|
+ — — — -+ — — — + — — — — — +Prediction completed in 14.22 seconds
[Rating(user=11, product=2502, rating=4.246173387283797),
Rating(user=11, product=3623, rating=2.532487153760588)]
Intuition about why dataframe performed better: While RDD offers low-level functionality and more control, the dataframe allows custom view and structure, offers high-level and domain specific operations, saves space, and executes at superior speeds. Inspired from Pandas dataframes, the Spark dataframes have been introduced as more efficient alternative in cases where lower level manipulation of data is not needed. A recommender system is an example of such a case. Therefore, as expected in hypothesis in beginning of post, this comparison shows for a higher-level modeling like recommender systems, using data frames is way better option.
Conclusion
The aim of this project was to compare the consequences of choice between PySpark dataframe and PySpark RDD on the training and the performance of a recommender system. For the purpose, I chose well known collaboartive filtering based on ALS matrix factorization, and an open source movie reviews dataset from MovieLens site. For each datastructure type (dataframe, RDD), I followed the standard process of reading data, re-formatting it, training the model on range of hyperparameter values, recording training time and RMSE, using best model for prediction, and recording the time for prediction. I got the results as intutively expected in the beginning.
The results showed that both the training and the prediction of ALS recommender systems using PySpark data frame were considerably faster as compared to PySpark RDD. Former took at least 33% less time than latter for training and made at least 14 times faster prediction. As shown by RMSE plot, despite being faster than RDD, using dataframe did not compromise on training accuracy. Both of these facts makes dataframes a better choice for being used in developing web apps. This conforms with the expected results I began with. The jupyter notebook for this project can be found here.
Improvement in project: There are a few ways to improve this project. I establish the same comparison for a bigger dataset because that will bring out a clearer difference in training and prediction times. The instructions given here won’t change significantly if you want to test this project on a bigger dataset like https://www.kaggle.com/laowingkin/netflix-movie-recommendation. Another options is vary more than one hyperparameter of ALS model while tuning.
References
[1] https://www.adsquare.com/comparing-performance-of-spark-dataframes-api-to-spark-rdd/
[2] https://databricks.com/glossary/what-is-rdd
[3]Yehuda Koren, Robert Bell, and Chris Volinsky. 2009. Matrix Factorization Techniques for Recommender Systems. Computer 42, 8 (August 2009), 30–37. DOI: https://doi.org/10.1109/MC.2009.263
[4] https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html