Custom transformer in pyspark mllib

Build custom data preprocessing / postprocessing steps as standard read/writable mllib pipeline objects


Author: Ashish Menkudale Published on: November 1, 2020

What this post is about

Spark mllib has enabled conventional data science algorithms on a distributed platform. Often times, we build a series of transformers around the core model to get the desired results. This series of transformers can be wrapped in a pipeline and exported as an object. Spark mllib has a variety of inbuilt transformers out of the box as pipeline components. Sometimes, we just want to build a custom step which is very specific to the dataset.

We can achieve that with something like,


import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnSelector(Transformer):
    """
    A custom Transformer which select all columns that have at least one of the
    words from the select_list in the name.
    """

    def __init__(self, select_list: Iterable[str]):
        super(ColumnSelector, self).__init__()
        self.select_list = select_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.select(*[x for x in df.columns if any(y in x for y in self.select_list)])
        return df
        

However, when it comes to packaging such custom transformer object, we get,


ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'ColumnSelector_4000a099a6a3', <class '__main__.ColumnSelector'>)

This post dives deep into why is it, and the solution.

Pyspark custom transformer

I want to build a custom transformer, that selects all the columns in my dataset which have the keyword ‘pressure’.

Extension of Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable

Check out for type converters pyspark.ml.param


# using https://stackoverflow.com/a/52467470/4769180
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class CustomColumnSelector(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,):
  
    value = Param(
        Params._dummy(),
        "value",
        "value to fill",
    )

    @keyword_only
    def __init__(self, value =['temperature']):
        super(CustomColumnSelector, self).__init__()
        self._setDefault(value=['temperature'])
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self,  value=['temperature']):
        """
        setParams(self, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, value):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(value=value)

    def getValue(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.value)

    def _transform(self, dataset):
        dataset = dataset.select(*[x for x in dataset.columns if any(y in x for y in self.getValue())])
        #dataset = dataset.withColumn('new_col', lit(self.getValue()))
        return dataset
        

Let’s build the pipeline


cs = CustomColumnSelector(value = ['pressure'])
pipeline = Pipeline(stages=[cs])

testing,


pm = pipeline.fit(df)
res = pm.transform(df)

let’s save the pipeline


pm.write().overwrite().save('/dbfs/mv792/pipeline/model_rev02/')

Now we will use this saved pipeline by loading it,


pm2 = PipelineModel.load('/dbfs/mv792/pipeline/model_rev02/')

and we get,

simple solution is t register the transformer by executing,


#https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel#comment95869495_52467470
m = __import__("__main__"); setattr(m, 'CustomColumnSelector', CustomColumnSelector)

let’s try again,

With the ability to build any such custom transformer on spark dataframes and any custom algorithm in python using sklearn - mlflow, possibilities are limitless.

Happy machine learning!