PySpark MLlib

Machine Learning is a technique of data analysis that combines data with statistical tools to predict the output. This prediction is used by the various corporate industries to make a favorable decision.

PySpark provides an API to work with the Machine learning called as mllib. PySpark's mllib supports various machine learning algorithms like classification, regression clustering, collaborative filtering, and dimensionality reduction as well as underlying optimization primitives. Various machine learning concepts are given below:

  • classification

The pyspark.mllib library supports several classification methods such as binary classification, multiclass classification, and regression analysis. The object may belong to a different class. The objective of classification is to differentiate the data based on the information. Random Forest, Naive Bayes, Decision Tree are the most useful algorithms in classification.

  • clustering

Clustering is an unsupervised machine learning problem. It is used when you do not know how to classify the data; we require the algorithm to find patterns and classify the data accordingly. The popular clustering algorithms are the K-means clustering, Gaussian mixture model, Hierarchical clustering.

  • fpm

The fpm means frequent pattern matching, which is used for mining various items, itemsets, subsequences, or other substructure. It is mostly used in large-scale datasets.

  • linalg

The mllib.linalg utilities are used for linear algebra.

  • recommendation

It is used to define the relevant data for making a recommendation. It is capable of predicting future preference and recommending the top items. For example, Online entertainment platform Netflix has a huge collection of movies, and sometimes people face difficulty in selecting the favorite items. This is the field where the recommendation plays an important role.

  • mllib regression

The regression is used to find the relationship and dependencies between variables. It finds the correlation between each feature of data and predicts the future values.

The mllib package supports many other algorithms, classes, and functions. Here we will understand the basic concept of pyspak.mllib.

MLlib Features

The PySpark mllib is useful for iterative algorithms. The features are the following:

  • Extraction: It extracts features from "row" data.
  • Transformation: It is used for scaling, converting, or modifying features.
  • Selection: Selecting a useful subset from a larger set of features.
  • Locality Sensitive Hashing: It combines aspects of feature transformation with other algorithms.

Let's have a look at the essential libraries of PySpark MLlib.

MLlib Linear Regression

Linear regression is used to find the relationship and dependencies between variables. Consider the following code:

snippet
frompyspark.sql import SparkSession
spark = SparkSession.builder.appName('Customer').getOrCreate()
frompyspark.ml.regression import LinearRegression
dataset = spark.read.csv(r'C:\Users\DEVANSH SHARMA\Ecommerce-Customers.csv')
dataset.show(10)

Output:

Output
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+ | _c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| +--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+ | Email| Address| Avatar|Avg Session Length| Time on App| Time on Website|Length of Membership|Yearly Amount Spent| |mstephenson@ferna...|835 Frank TunnelW...| Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616| 4.0826206329529615| 587.9510539684005| | hduke@hotmail.com|4547 Archer Commo...| DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744| 2.66403418213262| 392.2049334443264| | pallen@yahoo.com|24645 Valerie Uni...| Bisque|33.000914755642675|11.330278057777512|37.110597442120856| 4.104543202376424| 487.54750486747207| |riverarebecca@gma...|1414 David Throug...| SaddleBrown| 34.30555662975554|13.717513665142507| 36.72128267790313| 3.120178782748092| 581.8523440352177| |mstephens@davidso...|14023 Rodriguez P...|MediumAquaMarine| 33.33067252364639|12.795188551078114| 37.53665330059473| 4.446308318351434| 599.4060920457634| |alvareznancy@luca...|645 Martha Park A...| FloralWhite|33.871037879341976|12.026925339755056| 34.47687762925054| 5.493507201364199| 637.102447915074| |katherine20@yahoo...|68388 Reyes Light...| DarkSlateBlue| 32.02159550138701|11.366348309710526| 36.68377615286961| 4.685017246570912| 521.5721747578274| | awatkins@yahoo.com|Unit 6538 Box 898...| Aqua|32.739142938380326| 12.35195897300293| 37.37335885854755| 4.4342734348999375| 549.9041461052942| |vchurch@walter-ma...|860 Lee KeyWest D...| Salmon| 33.98777289568564|13.386235275676436|37.534497341555735| 3.2734335777477144| 570.2004089636196| +--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+ only showing top 10 rows

In the following code, we are importing the VectorAssembler library to create a new column Independent feature:

snippet
frompyspark.ml.linalg import Vectors
frompyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols = ["Avg Session Length","Time on App","Time on Website"],outputCol = "Independent Features")
output = featureassembler.transform(dataset)
output.show()

Output:

Output
+------------------+ Independent Feature +------------------+ |34.49726772511229 | |31.92627202636016 | |33.000914755642675| |34.30555662975554 | |33.33067252364639 | |33.871037879341976| |32.02159550138701 | |32.739142938380326| |33.98777289568564 | +------------------+
snippet
z = featureassembler.transform(dataset)
finlized_data = z.select("Indepenent feature", "Yearly Amount Spent",)
z.show()

Output:

Output
+--------------------++-------------------+ |Independent Feature | Yearly Amount Spent| +--------------------++-------------------+ |34.49726772511229 | 587.9510539684005 | |31.92627202636016 | 392.2049334443264 | |33.000914755642675 | 487.5475048674720 | |34.30555662975554 | 581.8523440352177 | |33.33067252364639 | 599.4060920457634 | |33.871037879341976 | 637.102447915074 | |32.02159550138701 | 521.5721747578274 | |32.739142938380326 | 549.9041461052942 | |33.98777289568564 | 570.2004089636196 | +--------------------++-------------------+

PySpark provides the LinearRegression() function to find the prediction of any given dataset. The syntax is given below:

snippet
regressor = LinearRegression(featureCol = 'column_name1', labelCol = 'column_name2 ')

MLlib K- Mean Cluster

The K- Mean cluster algorithm is one of the most popular and commonly used algorithms. It is used to cluster the data points into a predefined number of clusters. The below example is showing the use of MLlib K-Means Cluster library:

snippet
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load(r"C:\Users\DEVANSH SHARMA\Iris.csv")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Parameters of PySpark MLlib

The few important parameters of PySpark MLlib are given below:

  • Ratings

It is RDD of Ratings or (userID, productID, rating) tuple.

  • Rank

It represents Rank of the computed feature matrices (number of features).

  • Iterations

It represents the number of iterations of ALS. (default: 5)

  • Lambda

It is the Regularization parameter. (default : 0.01)

  • Blocks

It is used to parallelize the computation of some number of blocks.

Collaborative Filtering (mllib.recommendation)

Collaborative filtering is a technique that is generally used for a recommender system. This technique is focused on filling the missing entries of a user-item. Association matrix spark.ml currently supports model-based collaborative filtering. In collaborative filtering, users and products are described by a small set of hidden factors that can be used to predict missing entries.

Scaling of the regularization parameter

The regularization parameter regParam is scaled to solve least-squares problem. The least-square problem occurs when the number of ratings are user-generated in updating user factors, or the number of ratings the product received in updating product factors.

Cold-start strategy

The ALS Model (Alternative Least Square Model) is used for prediction while making a common prediction problem. The problem encountered when user or items in the test dataset occurred that may not be present during training the model. It can occur in the two scenarios which are given below:

  • In the prediction, the model is not trained for users and items that have no rating history (it is called a cold-start strategy).
  • The data is splitted between training and evaluation sets during cross-validation. It is widespread to encounter users and items in the evaluation set that are not in the training set.
snippet
#importing the libraries
frompyspark.ml.evaluation import RegressionEvaluator
frompyspark.ml.recommendation import ALS
frompyspark.sql import Row
no_of_lines = spark.read.text(r"C:\Users\DEVANSH SHARMA\MovieLens.csv").rdd
no_of_parts = no_of_lines.map(lambda row: row.value.split("::"))
ratingsRDD = no_of_lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Develop the recommendation model using ALS on the training data
# Note we set cold start strategy to make sure that we don't get NaN evaluation metrics.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
    coldStartStrategy="drop")
model = als.fit(training)

# Calculate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Evaluate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Evaluate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Evaluate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Evalute top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
Related Tutorial
Follow Us
https://www.facebook.com/Rookie-Nerd-638990322793530 https://twitter.com/RookieNerdTutor https://plus.google.com/b/117136517396468545840 #
Contents