Machine Learning is a technique of data analysis that combines data with statistical tools to predict the output. This prediction is used by the various corporate industries to make a favorable decision.
PySpark provides an API to work with the Machine learning called as mllib. PySpark's mllib supports various machine learning algorithms like classification, regression clustering, collaborative filtering, and dimensionality reduction as well as underlying optimization primitives. Various machine learning concepts are given below:
The pyspark.mllib library supports several classification methods such as binary classification, multiclass classification, and regression analysis. The object may belong to a different class. The objective of classification is to differentiate the data based on the information. Random Forest, Naive Bayes, Decision Tree are the most useful algorithms in classification.
Clustering is an unsupervised machine learning problem. It is used when you do not know how to classify the data; we require the algorithm to find patterns and classify the data accordingly. The popular clustering algorithms are the K-means clustering, Gaussian mixture model, Hierarchical clustering.
The fpm means frequent pattern matching, which is used for mining various items, itemsets, subsequences, or other substructure. It is mostly used in large-scale datasets.
The mllib.linalg utilities are used for linear algebra.
It is used to define the relevant data for making a recommendation. It is capable of predicting future preference and recommending the top items. For example, Online entertainment platform Netflix has a huge collection of movies, and sometimes people face difficulty in selecting the favorite items. This is the field where the recommendation plays an important role.
The regression is used to find the relationship and dependencies between variables. It finds the correlation between each feature of data and predicts the future values.
The mllib package supports many other algorithms, classes, and functions. Here we will understand the basic concept of pyspak.mllib.
The PySpark mllib is useful for iterative algorithms. The features are the following:
Let's have a look at the essential libraries of PySpark MLlib.
Linear regression is used to find the relationship and dependencies between variables. Consider the following code:
frompyspark.sql import SparkSession spark = SparkSession.builder.appName('Customer').getOrCreate() frompyspark.ml.regression import LinearRegression dataset = spark.read.csv(r'C:\Users\DEVANSH SHARMA\Ecommerce-Customers.csv') dataset.show(10)
Output:
In the following code, we are importing the VectorAssembler library to create a new column Independent feature:
frompyspark.ml.linalg import Vectors frompyspark.ml.feature import VectorAssembler featureassembler = VectorAssembler(inputCols = ["Avg Session Length","Time on App","Time on Website"],outputCol = "Independent Features") output = featureassembler.transform(dataset) output.show()
Output:
z = featureassembler.transform(dataset) finlized_data = z.select("Indepenent feature", "Yearly Amount Spent",) z.show()
Output:
PySpark provides the LinearRegression() function to find the prediction of any given dataset. The syntax is given below:
regressor = LinearRegression(featureCol = 'column_name1', labelCol = 'column_name2 ')
The K- Mean cluster algorithm is one of the most popular and commonly used algorithms. It is used to cluster the data points into a predefined number of clusters. The below example is showing the use of MLlib K-Means Cluster library:
from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator # Loads data. dataset = spark.read.format("libsvm").load(r"C:\Users\DEVANSH SHARMA\Iris.csv") # Trains a k-means model. kmeans = KMeans().setK(2).setSeed(1) model = kmeans.fit(dataset) # Make predictions predictions = model.transform(dataset) # Evaluate clustering by computing Silhouette score evaluator = ClusteringEvaluator() silhouette = evaluator.evaluate(predictions) print("Silhouette with squared euclidean distance = " + str(silhouette)) # Shows the result. centers = model.clusterCenters() print("Cluster Centers: ") for center in centers: print(center)
The few important parameters of PySpark MLlib are given below:
It is RDD of Ratings or (userID, productID, rating) tuple.
It represents Rank of the computed feature matrices (number of features).
It represents the number of iterations of ALS. (default: 5)
It is the Regularization parameter. (default : 0.01)
It is used to parallelize the computation of some number of blocks.
Collaborative filtering is a technique that is generally used for a recommender system. This technique is focused on filling the missing entries of a user-item. Association matrix spark.ml currently supports model-based collaborative filtering. In collaborative filtering, users and products are described by a small set of hidden factors that can be used to predict missing entries.
The regularization parameter regParam is scaled to solve least-squares problem. The least-square problem occurs when the number of ratings are user-generated in updating user factors, or the number of ratings the product received in updating product factors.
The ALS Model (Alternative Least Square Model) is used for prediction while making a common prediction problem. The problem encountered when user or items in the test dataset occurred that may not be present during training the model. It can occur in the two scenarios which are given below:
#importing the libraries frompyspark.ml.evaluation import RegressionEvaluator frompyspark.ml.recommendation import ALS frompyspark.sql import Row no_of_lines = spark.read.text(r"C:\Users\DEVANSH SHARMA\MovieLens.csv").rdd no_of_parts = no_of_lines.map(lambda row: row.value.split("::")) ratingsRDD = no_of_lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=long(p[3]))) ratings = spark.createDataFrame(ratingsRDD) (training, test) = ratings.randomSplit([0.8, 0.2]) # Develop the recommendation model using ALS on the training data # Note we set cold start strategy to make sure that we don't get NaN evaluation metrics. als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop") model = als.fit(training) # Calculate the model by computing the RMSE on the test data predictions = model.transform(test) evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse = evaluator.evaluate(predictions) print("Root-mean-square error = " + str(rmse)) # Evaluate top 10 movie recommendations for each user userRecs = model.recommendForAllUsers(10) # Evaluate top 10 user recommendations for each movie movieRecs = model.recommendForAllItems(10) # Evaluate top 10 movie recommendations for a specified set of users users = ratings.select(als.getUserCol()).distinct().limit(3) userSubsetRecs = model.recommendForUserSubset(users, 10) # Evalute top 10 user recommendations for a specified set of movies movies = ratings.select(als.getItemCol()).distinct().limit(3) movieSubSetRecs = model.recommendForItemSubset(movies, 10)