Machine Learning with 2nd Generation Operators

Quite a number of blogs have been written about how to use SAP Data Intelligence for Machine Learning scenarios. Have a look for example to excellent and very instructive blogs of Andreas Foster. With the 2nd generation operators of SAP Data Intelligence we have got some additional facilitations to create productive training and predication pipelines. In the following I assume that a data scientist has already done his data exploration and wants to deploy the training and the prediction pipeline in an enterprise environment. Im using one of the “Drosophila”-ML use cases: the price prediction of used cars. if you like to build it by your own then you can download the data “Ebay Used Car Sales Data”  from Kaggle.

The ML process consists of 2 pipelines and 2 custom operators (build as a kind of template):

  1. Training Pipeline
    1. Read training data (Standard: Table Consumer)
    2. Train the model (Custom Operator)
    3. Save the model as binary to an object store
  2. Prediction Pipeline
    1. Read data (Standard: Table Consumer)
    2. Read model (Currently a custom operator, but will soon replaced by a standard operator)
    3. Predict (Custom Operator)
    4. Save result

The particular advantage of using generation 2 operators is that you can use the very convenient input and output operators of the Structured Data Operators as you will seen when sew the pipelines together.

Data

After downloading the data from Kaggle I have created a HANA table and imported the csv-file.

CREATE COLUMN TABLE "DEMO"."USEDCARS"( "CAR_ID" INTEGER, "BRAND" NVARCHAR(14), "MODEL" NVARCHAR(11), "VEHICLETYPE" NVARCHAR(10), "YEAROFREGISTRATION" INTEGER, "HP" INTEGER, "FUELTYPE" NVARCHAR(6), "GEARBOX" NVARCHAR(9), "KILOMETER" INTEGER, "PRICE" INTEGER
)

Dockerfile

Because I am using Python-packages that are not part of the standard Docker images I have to create a new Dockerfile and build it.

ARG DEPENDENCY_BASE_IMAGE_VERSION=2107.0.1
FROM §/com.sap.datahub.linuxx86_64/sles:${DEPENDENCY_BASE_IMAGE_VERSION}
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow RUN python3.9 -m pip install sklearn
RUN python3.9 -m pip install boto3

It is very important to run the installation with python3.9 to make the image available for generation 2 pipelines/operators. This is necessary until the next or next next release when the whole of SAP Data Intelligence has switched to python 3.9. You can tag the Docker image with

  • sklearn
  • boto3

Because the release DI Cloud 2113 provides no Binary File Reader. I have written an S3 Binary file reader for the time being. Hopefully this will be needless with the next next release.

Custom Training Operator

I have designed the training operator as a template for being able to be reused for specific training operators.

  1. Inport: Generic Table ‘*’ in order to consume data conveyed by Structured Data Operators
  2. Receiving part that collects data until the last batch. There is nothing data specific in this part and results in a DataFrame.
  3. Training part that is specific to the task
  4. Output part is again generic and can be reused unaltered. It sends the model as binary to the outport.
import pickle
import io
import pandas as pd from sklearn.linear_model import LinearRegression df = pd.DataFrame() def on_input(msg_id, header, data): ### GENERIC PART (Receiving Batches) global df tbl = data.get() tbl_info = api.type_context.get_vtype(tbl.type_ref) col_names = list(tbl_info.columns.keys()) if df.empty : df = pd.DataFrame(tbl,columns = col_names) else : df = pd.concat([df,pd.DataFrame(tbl,columns = col_names)]) # In case of stream wait for other data if 'com.sap.headers.batch' in header and header['com.sap.headers.batch'][1] == False: return 1 ### SPECIFIC (training) model = LinearRegression() model.fit(df[['YEAROFREGISTRATION','HP','KILOMETER']].values, df['PRICE'].values) ### GENERIC (output) model_binary = pickle.dumps(model) bstream = io.BytesIO(model_binary) bstream.seek(0) api.outputs.model.publish(bstream,-1,header) api.set_port_callback("input", on_input)

You can download the operator from my personal GitHub and import it as solution: ml_train-operator

Training Pipeline

Having the training operator then the pipeline is pretty much straightforward to create:

In any case ensure to group the “Train Model”-operator because this one needs to run in a separate docker container that provides the package “sklearn”.

Custom Operator Prediction

The custom operator for the prediction is a bit more complex because we have 2 inports:

  1. Model (Binary)
  2. Data

and we need to ensure that the model and all the batches have been loaded before the prediction is calculated. I have solved this by having separate functions called as callbacks by the ports to do the preparation. The orchestration is done via global variables: Once the model is set and all the batches are read, then the actual prediction is calculated. In this case only the latter function “process” is specific whereas the “on_model” and “on_data” are generic.

import pickle
import io import pandas as pd
from sklearn.linear_model import LinearRegression #
# GLOBAL VARIABLES
#
df = pd.DataFrame()
all_data_loaded = False
model = None
data_header = None #
# MODEL INPUT
#
def on_model(msg_id, header, data): # GENERIC PART global model bstream = io.BytesIO(data.get_reader().read(-1)) model = pickle.load(bstream) if all_data_loaded: process() #
# DATA INPUT
#
def on_input(msg_id, header, data): # GENERIC INPUT global df global all_data_loaded global table_ref global data_header tbl = data.get() tbl_info = api.type_context.get_vtype(tbl.type_ref) col_names = list(tbl_info.columns.keys()) if df.empty: df = pd.DataFrame(tbl,columns = col_names) else: df = pd.concat([df,pd.DataFrame(tbl,columns = col_names)]) # In case of stream wait for other data data_header = header if 'com.sap.headers.batch' in header and header['com.sap.headers.batch'][1] == False: return 1 all_data_loaded = True if model: process() api.set_port_callback("model", on_model)
api.set_port_callback("data", on_input) #
# Custom Process
#
def process() : # PREDICTION df['PRICE'] = model.predict(df[['YEAROFREGISTRATION','HP','KILOMETER' ]]) # GENERIC OUTPUT tbl = api.Table(df.values.tolist(),'mycompany.used_cars') api.outputs.prediction.publish(tbl,header=data_header) 

You can download the operator from my personal GitHub and import it as solution: ml_predict_price

Custom Operator read_S3

The custom operator that reads from S3 is pretty much straightforward and won’t further comment on it. You can download and import the operator as solution: read_s3.

Prediction Pipeline

Now we have all the components for our prediction pipeline:

Again we have to group the custom operators because these require non-standard Python packages that are not part of the Docker-container that provides the code for the Structured Data Operators.

Again you see how much the generation 2 operators eases our life when creating pipelines. Of course there is some preparation and learning required but once it is done you might be more productive and producing more robust data pipelines than before.