Subscribe to my newsletter
What this post is about
Spark mllib has enabled conventional data science algorithms on a distributed platform. Often times, we build a series of transformers around the core model to get the desired results. This series of transformers can be wrapped in a pipeline and exported as an object. Spark mllib has a variety of inbuilt transformers out of the box as pipeline components. Sometimes, we just want to build a custom step which is very specific to the dataset.
We can achieve that with something like,
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd
# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnSelector(Transformer):
"""
A custom Transformer which select all columns that have at least one of the
words from the select_list in the name.
"""
def __init__(self, select_list: Iterable[str]):
super(ColumnSelector, self).__init__()
self.select_list = select_list
def _transform(self, df: DataFrame) -> DataFrame:
df = df.select(*[x for x in df.columns if any(y in x for y in self.select_list)])
return df
However, when it comes to packaging such custom transformer object, we get,
ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'ColumnSelector_4000a099a6a3', <class '__main__.ColumnSelector'>)
This post dives deep into why is it, and the solution.
Pyspark custom transformer
I want to build a custom transformer, that selects all the columns in my dataset which have the keyword ‘pressure’.
Extension of Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable
Check out for type converters pyspark.ml.param
# using https://stackoverflow.com/a/52467470/4769180
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
class CustomColumnSelector(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,):
value = Param(
Params._dummy(),
"value",
"value to fill",
)
@keyword_only
def __init__(self, value =['temperature']):
super(CustomColumnSelector, self).__init__()
self._setDefault(value=['temperature'])
kwargs = self._input_kwargs
self._set(**kwargs)
@keyword_only
def setParams(self, value=['temperature']):
"""
setParams(self, value=0.0)
Sets params for this SetValueTransformer.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)
def setValue(self, value):
"""
Sets the value of :py:attr:`value`.
"""
return self._set(value=value)
def getValue(self):
"""
Gets the value of :py:attr:`value` or its default value.
"""
return self.getOrDefault(self.value)
def _transform(self, dataset):
dataset = dataset.select(*[x for x in dataset.columns if any(y in x for y in self.getValue())])
#dataset = dataset.withColumn('new_col', lit(self.getValue()))
return dataset
Let’s build the pipeline
cs = CustomColumnSelector(value = ['pressure'])
pipeline = Pipeline(stages=[cs])
testing,
pm = pipeline.fit(df)
res = pm.transform(df)
let’s save the pipeline
pm.write().overwrite().save('/dbfs/mv792/pipeline/model_rev02/')
Now we will use this saved pipeline by loading it,
pm2 = PipelineModel.load('/dbfs/mv792/pipeline/model_rev02/')
and we get,
simple solution is t register the transformer by executing,
#https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel#comment95869495_52467470
m = __import__("__main__"); setattr(m, 'CustomColumnSelector', CustomColumnSelector)
let’s try again,
With the ability to build any such custom transformer on spark dataframes and any custom algorithm in python using sklearn - mlflow, possibilities are limitless.
Happy machine learning!