Sunday, June 9, 2019

Handling imbalanced datasets in supervised learning using family of SMOTE algorithm.

Consider a problem where you are working on a machine learning classification problem. You get an accuracy of 98% and you are very happy. But that happiness doesn’t last long when you look at the confusion matrix and realize that majority class is 98% of the total data and all examples are classified as majority class. Welcome to the real world of imbalanced datasets!!
Some of the well-known examples of imbalanced datasets are : 
1 - Fraud detection:  where number of fraud cases could be much smaller than non-fraudulent transactions.
2- Prediction of disputed / delayed invoices: where the problem is to predict default / disputed invoices.
3- Predictive maintenance datasets, etc

In all the above examples, the cost of misclassifying minority class could very high. That means, if I am not able to identify the fraud cases correctly, the model won’t be useful.

There are multiple ways of handling unbalanced datasets. Some of them are : collecting more data, trying out different ML algorithms, modifying class weights, penalizing the models, using anomaly detection techniques, oversampling and under sampling techniques etc.

                I am focusing mainly on SMOTE based oversampling techniques in this article. As a data scientist you might not have direct control over collection of more data which might need various approvals from client, top management and could also take more time etc. Also, applying class weights or too much parameter tuning can lead to overfitting. Undersampling technique can lead to loss of important information. Even with oversampling. But that might not be the case with oversampling techniques. Oversampling methods can be easily tried and embedded in your framework.


Over Sampling Algorithms based on SMOTE 

1-   SMOTE: Synthetic Minority Over sampling Technique (SMOTE) algorithm applies KNN approach where it selects K nearest neighbors, joins them and creates the synthetic samples in the space. The algorithm takes the feature vectors and its nearest neighbors, computes the distance between these vectors. The difference is multiplied by random number between (0, 1) and it is added back to feature. SMOTE algorithm is a pioneer algorithm and many other algorithms are derived from SMOTE.
Reference: Smote Paper
R Implementation:  smotefamily, unbalanced, DMwR
Python Implementation: imblearn


2- ADASYN:  ADAptive SYNthetic (ADASYN) is based on the idea of adaptively generating minority data samples according to their distributions using K nearest neighbor. The algorithm adaptively updates the distribution and there are no assumptions made for the underlying distribution of the data.  The algorithm uses Euclidean distance for KNN Algorithm. The key difference between ADASYN and SMOTE is that the former uses a density distribution, as a criterion to automatically decide the number of synthetic samples that must be generated for each minority sample by adaptively changing the weights of the different minority samples to compensate for the skewed distributions. The latter generates the same number of synthetic samples for each original minority sample.
Paper reference:ADASYN Paper
R Implementation: smotefamily
Python Implementation:  imblearn

3-  ANS: Adaptive Neighbor Synthetic (ANS) dynamically adapts the number of neighbors needed for oversampling around different minority regions. This algorithm eliminates the parameter K of SMOTE for a dataset and assign different number of neighbors for each positive instance. Every parameter for this technique is automatically set within the algorithm making it become parameter free.
Reference: ANS Paper
R Implementation: smotefamily

4-      Border SMOTE: Borderline-SMOTE generates the synthetic sample along the borderline of minority and majority classes. This also helps in separating out the minority and majority classes.
Reference - Border SMOTE
R Implementation: smotefamily

5-   Safe Level SMOTE: Safe level is defined as the number of a positive instances in k nearest neighbors. If the safe level of an instance is close to 0, the instance is nearly noise. If it is close to k, the instance is considered safe. Each synthetic instance is generated in safe position by considering the safe level ratio of instances. In contrast, SMOTE and Borderline-SMOTE may generate synthetic instances in unsuitable locations, such as overlapping regions and noise regions.
Reference:SLS Paper
R Implementation: smotefamily

6-  DBSMOTE: Density-Based Synthetic Minority Over-sampling Technique is based on clustering algorithm DBSCAN. The clusters are discovered by DBSCAN Algorithm. DBSMOTE generates synthetic instances along a shortest path from each positive instance to a pseudo-centroid of a minority-class cluster
Reference: DBSMOTE
R Implementation: smotefamily

Combining SMOTE and Undersampling Algorithms :

        1.     SMOTETomek: Tomek links can be used as an under-sampling method or as a data cleaning             method. Tomek links to the over-sampled training set as a data cleaning method. Thus, instead            of removing only the majority class examples that form Tomek links, examples from both                    classes are removed
            Reference:SMOTE Tomek
             Python Implementation: imblearn
2.       2- MOTEENN: Just like Tomek, Edited Nearest Neighbor removes any example whose class label differs from the class of at least two of its three nearest neighbors. The ENN method removes the instances of the majority class whose prediction made by KNN method is different from the majority class. ENN method can remove both the noisy examples as borderline examples, providing a smoother decision surface. ENN tends to remove more examples than the Tomek links does, so it is expected that it will provide a more in depth data cleaning
b.      Python Implementation: imblearn

Hiring the right data scientist for the organisation

Any organisation needs talented, hardworking and skilled employees irrespective of department, business unit or a team. But finding and nurturing such talent can be challenging sometimes. When it comes to data science field, with rapid change and demand in the technology, many organisations have set up the data science teams. A successful data science team has 3 major strengths, A-availability of data, B- infrastructure and most importantly C - the “right” data scientists. 
The biggest risk for a data science group is hiring wrong data scientist. The wrong hire will lead to failure of the projects and will also harm reputation of the team within organisation. Hiring wrong talent is not only ruinous in terms of costs but is also damaging for new data science initiatives of the organisation. In this article, I suggest 3 tips to circumvent this situation.
  • Right Selection Process
  • Knowledge and Passion over education
  • Scientist attitude with business acumen

1-   Do I have right selection process? 

The data scientist should go through multiple rounds of interview before final selection. The first round should involve solving a business problem with sample or masked data. This has to be an assignment and a solution can be submitted in ~1 to 2 weeks. Hiring manager may let candidate chose his own preferred coding language and reporting tools. In case of lateral hiring, the candidate may need to work on a problem after office hours. A week or two time would ensure that they get enough time to think about innovative solutions and coding. Even if the candidate is not selected in later stages, organisation gets the benefit of listening to new ideas to solve a business problem.
Second round will be explaining the solution to the interview panel. This will make sure that the candidate is also able to explain the solution to the business stakeholders in future. The same interview can be extended further with a regular technical round.
Data science project is a team work where we need to work with data engineers, domain experts, business leaders and your own data science team members. The final round would be HR round interview where panel can check the person organisation fit and right attitude. This interview can be conducted by experts in this field.

2-   Knowledge and Passion over educational qualification

When it comes to hiring good candidates in technical field, the first choice of many organisations to look for candidates from premier institutes. But this should not be the ‘only’ criteria while shortlisting. Data science is a new, emerging and evolving field. Most of the development in this field is driven by people who have passion of data science. These people have gathered knowledge in ways which are unconventional.
In case of college campus recruitment, passion and knowledge can be assessed from participation in competitions / hackathons, github profile, research papers etc. This is also true for the experienced professionals whose expertise lies in some other field but now they want to enter in to data science. If the candidate has previous work experience in data science then evaluation could be combination of past projects and the self-learning. The goal here is to get data scientist who are self-motivated and eager to learn new technologies and concepts. I believe that combination of passion, knowledge seeking and hard work outshine just an educational qualification.

3-   "Scientist" attitude with business acumen

In the end what we are looking in to candidate is traits of a good scientist such as curiosity, clarity, creativity, etc. But for a data scientist who will be working in a business setting should also have a business acumen. Data scientist should be able to translate the model output in to business benefits, let’s say either cost savings, revenue generations or work efficiency. This might not directly apply entry level data scientist but if we are hiring a senior resource or leaders, the person has to interact with various stakeholders and explain the model in the simplest but business language.
As a closing remark, I understand that finding ‘jack of all trades’ is like finding needle in haystack. Some of the points or skills requirement can be relaxed as needed. A key takeaway is, risk of hiring a wrong data scientist can be mitigated if we have right selection process ensuring candidate has right attitude and skills to help organisation succeed and prosper.
Please let me know your thoughts and suggestions in the comments below.
Disclaimer: The views and opinions expressed in this article are those of the author's and do not necessarily reflect the official policy or position of current or previous employer, organization, committee, other group or individual. Analysis performed within this article is based on limited dated open source information. Assumptions made within the analysis are not reflective of the position of any previous or current employer.

Building machine learning models in Apache Spark using SCALA in 6 steps

Introduction:

When dealing with building machine learning models, Data scientists spend most of the time on 2 main tasks when building machine learning models
Pre-processing and Cleaning
The major portion of time goes in to collecting, understanding, and analysing, cleaning the data and then building features. All the above steps mentioned are very important and critical to build successful machine learning model.
Iterations
The optimization algorithms and finalizing the model accesses the data over multiple iterations
Now, when it comes to big data, we need a tool which can efficiently pre-process and iterate on large data sets and this is where spark is useful. As compared with only MapReduce, Spark uses MapReduce where the intermediate results can be passed directly to the next stage in the pipeline. Spark also has provision to cache the data for in memory operations. This makes spark well suited for machine learning algorithms.
The next part of the article uses the data set from UCI data repository to build the machine learning model in Scala. I have assumed that the user has access to cluster where Scala and spark is already installed. Spark can also work on local system but I highly recommend applying and trying out these codes on a cluster to realize the true power Spark and Scala. I have used shell command everywhere.

Content

  • Why Scala for Spark?
  • About Data
  • Download and Store
  • Read, understand and preprocess the data
  • First cut machine learning model
  • Better Machine Learning Models

1-    Why Scala for Spark

Most of us including me are comfortable and prefer using R and Python for solving machine learning problems. Spark has libraries in R and Python namely sparkr and pyspark. But spark is written in scala and it does make sense to build machine learning model in the same language in which the spark is written.

2-    About Data

I am using the data from UCI Repository and can be found here. The data contains 10.5 million rows including train and test data. Though this data is not so “big” enough as compared to real life big data sets, it will give fair idea of computations and coding in to scala. The data set consists of ~7 million points as training data and remaining as test data. The label is defined as 1 and 0 and it is binary classification problem. The data has 28 features of which 27 features are already normalized.

3-    Download and store data in HDFS

The first step is to create a folder to save the data. Once the folder is ready, we download the train and test data in gzip format. The extracted csv files are then put in to hdfs from where the scala can access those files.
// create directory
mkdir hepmass
cd hepmass
// use curl to access the data and download the data to the directory
// use gzip to unzip and extract the files in to specified directory

gzip -d all_train.csv.gz
gzip -d all_test.csv.gz
// Check if the files are downloaded and extracted
ls
// Create the directory in to hdfs for storing the files
hadoop fs -mkdir hepmassTrain
hadoop fs -mkdir hepmassTest

// put the downloaded csv files in to hdfs
hadoop fs -put all_train.csv hepmassTrain
hadoop fs -put all_test.csv hepmassTest
// In case you need to remove files from hdfs, following code can be used.
// hadoop fs -rm -R hepmassTest   // To remove the data  from hdfs

4-    Read, Understand and preprocess the Data

Now here comes the stage where we actually start spark instance.
// Start Spark Instance
spark-shell --master yarn --deploy-mode client
Earlier versions of spark extensively used RDD for data operations. The recent version uses the data frame approach for the data. We will use data frame in this code. The following piece of code will read the data as spark dataframe. After reading we will look in to the schema of the dataframe. The data frame will identify the type of columns.
// Read Train Data
val trainDataRead = spark.read.option("inferSchema", true).option("header", true).csv("hepmassTrain")
val testDataRead = spark.read.option("inferSchema", true).option("header", true).csv("hepmassTest")
// Print the Schema and check the header to ensure data is loaded correctly
trainDataRead.printSchema
trainDataRead.head
testDataRead.printSchema
testDataRead.head
The schema will show us the column name and type of the column. If you simply want to extract the column names, you can use following code.
// Check Column Names
trainDataRead.columns
testDataRead.columns
The column name of label is “ # label”. First we will change the column name to “ label”. At the sametime we will also count the number of records.
// Rename the columns
val trainData1 = trainDataRead.withColumnRenamed("# label","label")
val testData1 = testDataRead.withColumnRenamed("# label","label")
// Counting records
trainData1.count()
testData1.count()
Sometimes the type of columns could be different. E.g the column having date could be string or we might have to convert a string to double. Following function could come handy in those case. As far as the hepmass data is concerned we are already dealing with pretty clean data which surely will not be the case in real life datasets.
// Function to convert the type of column
def convertColumnType(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {
val df1 = df.withColumnRenamed(name, "swap")
df1.withColumn(name, df1.col("swap").cast(newType)).drop("swap")
}
In the next piece of code, we will check the distribution of label. The distributions looks well balanced. The interesting point to note here is if we do not append the command ‘show’ the code runs instantly. This is because the spark is considered to be lazy language. It starts the actual computations when you ask to show the results.
// Check the number of Ones and zeros.
trainData1.groupBy("label").count().orderBy($"count".desc).show()
testData1.groupBy("label").count().orderBy($"count".desc).show()

The summary f all the variables can be obtained be just one single command. The summary gives the count, mean, standard deviation, min and max. This is very useful because we might not be able to plot many graphs to understand big data. The summary comes handy to check the outliers in the system. In our case we have the normalized data. So the mean will be closer to 0 and standard deviation will be closer to 1.
// Check Summary of Other columns
val summary = trainData1.describe()
// Show the selected columns
summary.select("summary", "f0", "f1","f2","f3","mass").show()
But wait !!! the column mass seems to be the categorical value. We will have to treat mass separately. Lets check the values of mass separately.
// check the occurrences of various values in mass using groupby
trainData1.groupBy("mass").count().orderBy($"count".desc).show()
We will first convert the value 499.999 to 500 and later use on hot encoder to create a single column features. We will apply this operation on train as well as test. We are creating new column as massIndex.
// Round the values of column mass
import org.apache.spark.sql.functions.round
val trainData2 = trainData1.withColumn("mass", ceil($"mass"))
val testData2 = testData1.withColumn("mass", ceil($"mass"))
// since the mass value has ordering, so rather than using one hot ending we
// will transform the categorical in to index
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val indexer = new StringIndexer().setInputCol("mass").setOutputCol("massIndex").fit(trainData2)
val trainData3 = indexer.transform(trainData2).drop("mass")
val indexer = new StringIndexer().setInputCol("mass").setOutputCol("massIndex").fit(testData2)
val testData3 = indexer.transform(testData2).drop("mass")
// Check the transformation again
trainData3.groupBy("massIndex").count().orderBy($"count".desc).show()

5-    First Cut Machine Learning Model


The spark offers the processed data to cache so that it can be uses iteratively by models. We will cache our train and test and validation data. When to cache the data is an art. It is a trade-off between reducing the in memory and faster execution of algorithms. The rule of thumb is the cache should be used after data cleaning and pre-processing when the features and labels are ready. For first cut model the training data is divided in to train and validation. We have an option of k fold cross-validation but when dealing with big data,  it is computationally expensive. We will not use k fold cross validation in this example.
val testDataFinal = testData3
val trainDataFinal = trainData3
// Cache the data
trainDataFinal.cache()
testDataFinal.cache()
// Simple Method
val Array(trainData, validData) = trainData3.randomSplit(Array(0.7, 0.3))
trainData.cache()
validData.cache()
We will first use random forest model to build the model with default parameters and check the model performance. We will use Spark ML library to build the model. Spark offers the pipeline functionality and we will use that for building better models in next section.  The following code builds the model and evaluates the performance.
// import
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.Pipeline
import scala.util.Random
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.evaluation.MulticlassMetrics
// Select the g=feature columns and group them in to features
val featureColsNames = trainData.columns.filter(_ != "label")
val assembler = new VectorAssembler().setInputCols(featureColsNames).setOutputCol("featureVector")
val assembledTrainData = assembler.transform(trainData)
val assembledValidData = assembler.transform(validData)
// random forest classifier
// New Classifier
val classifier = new RandomForestClassifier().setSeed(Random.nextLong()).setLabelCol("label").
setFeaturesCol("featureVector").setPredictionCol("prediction")
// Build model
val model = classifier.fit(assembledTrainData)
// Predict
val predictions = model.transform(assembledValidData)
For evaluation we will use binary as well as multi class evaluations to cover implementation of both. We will now convert the predictions and labels in to RDD format.
/ Create RDD
val predictionAndLabels = predictions.select("prediction", "label").as[(Double,Double)].rdd
val metricsBinary = new BinaryClassificationMetrics(predictionAndLabels)
// AUROC
val auROC = metricsBinary.areaUnderROC
println("Area under ROC = " + auROC)
// Show confusion matrix
val metricsMulti = new MulticlassMetrics(predictionAndLabels)
println("Confusion matrix:")
println(metricsMulti.confusionMatrix)
//  Show accuracy
val accuracy = metricsMulti.accuracy
println("Summary Statistics")
println(s"Accuracy = $accuracy")

6-    Building better model

In this section we will build better models using parameter grid and pipeline. The spark gives us the facility to build a pipeline of operations and it is very useful. Though some of the steps are repeated from earlier sections, I have purposefully kept them to understand the entire chain of events. Here in this section we first create assembler, evaluator, parameter grid and validator. We will use the inbuilt method ‘TrainValidationSplit’ to divide data in to train and validation. All the computations will start when we call fit on train data. This step will take time to run because of heavy computations.
// Building Better Models
// Parameter Tuning
val featureColsNames = TrainData3.columns.filter(_ != "label")
val assembler = new VectorAssembler().setInputCols(featureColsNames).setOutputCol("featureVector")
val multiclassEval = new MulticlassClassificationEvaluator().
setLabelCol("label").
setPredictionCol("prediction").
setMetricName("accuracy")

// Set Pipeline
val pipeline = new Pipeline().setStages(Array(assembler, classifier))
// Set Parameter Grid
val paramGrid = new ParamGridBuilder().
addGrid(classifier.impurity, Seq("gini", "entropy")).
addGrid(classifier.maxDepth, Seq(5,10)).
addGrid(classifier.numTrees, Seq(10,100)).
build()

// Validate
import org.apache.spark.ml.tuning.TrainValidationSplit
val validator = new TrainValidationSplit().
setSeed(Random.nextLong()).
setEstimator(pipeline).
setEvaluator(multiclassEval).
setEstimatorParamMaps(paramGrid).
setTrainRatio(0.7)
// This process should take time
val validatorModel = validator.fit(trainDataFinal)
Once the model is ready, we can check the best model and choose that best model to get the final predictions
// Check accuracy of best model
import org.apache.spark.ml.PipelineModel
val bestModel = validatorModel.bestModel
bestModel.asInstanceOf[PipelineModel].stages.last.extractParamMap
// use the best model for test data
validatorModel.validationMetrics.max
multiclassEval.evaluate(bestModel.transform(testData))