“How to” MLRun and storage

jboothomas
3 min readOct 13, 2021

--

In this blog I will cover how to access file or object storage from an MLrun job. The entire process was done using the Iguazio and Pure Storage MLOps platform as per this post.

As per the documentation MLRun is an open-source MLOps framework that offers an integrative approach to managing your machine-learning pipelines from early development through model development to full pipeline deployment in production, please refer to official documentation.

MLRun can leverage either Portworx storage or S3/NFS scalable storage from a FlashBlade.

Accessing FlashBlade from within an MLRun job is simple, using either a shared RWX mapping into the container(s) running the job or by passing the required S3 parameters for access to our S3 bucket(s). The multidimensional performance of FlashBlade is well suited for ingestion, clean & transform and training; its shared nature simplifies the exploration stage of an AI/ML workflow.

MLRUN and NFS share(s)

I create for this a proxy storage class in Portworx in order to pass my NFS share as a PVC, the two yaml files following were used in order to do this:

$ cat px-proxy-sc.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: portworx-proxy-fb-nfs
provisioner: kubernetes.io/portworx-volume
parameters:
proxy_endpoint: “nfs://10.21.200.4”
proxy_nfs_exportpath: “/jbtfs”
mount_options: “vers=4.1”
allowVolumeExpansion: true

And

$ cat fbnfspvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: fb-nfs-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 200Gi
storageClassName: portworx-proxy-fb-nfs

These were both applied to the Kubernetes cluster using:

kubectl -n default-tenant -f <yaml_file>

Within my mlrun notebook I then add the following after the mlrun.code_to_function call:

from mlrun.platforms import auto_mounttaxi_func = taxi_func.apply(auto_mount(pvc_name=’fb-nfs-pvc’, volume_mount_path=’/mnt/piepline’, volume_name=None))

When we issue the <function>.run it will mount our pvc to the desired location.

​​Jupyter notebook code for PVC proxy FlashBlade share

# Install prerequisites
%pip install mlrun
from os import path
import mlrun
project_name_base = ‘nyc-taxi-pvc’project_name, artifact_path = mlrun.set_environment(project=project_name_base, user_project=True)print(f’Project name: {project_name}’)
print(f’Artifact path: {artifact_path}’)

# mlrun: start-code
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
def fetch_data(context : MLClientCtx, taxi_records_csv_path: DataItem, zones_csv_path: DataItem): context.logger.info(‘Reading taxi records data from {}’.format(taxi_records_csv_path)) taxi_records_dataset = taxi_records_csv_path.as_df()context.logger.info(‘Reading zones data from {}’.format(zones_csv_path))
zones_dataset = zones_csv_path.as_df()

# mlrun: end-code
taxi_records_csv_path = ‘/mnt/pipeline/csv/iguazio/yellow_tripdata_2020–12.csv’
zones_csv_path = ‘/mnt/pipeline/csv/taxizones/taxi_zones.csv’
taxi_func = mlrun.code_to_function(name=’taxi’,
kind=’job’,
image=’mlrun/mlrun’,
requirements=[])
from mlrun.platforms
import mount_pvctaxi_func = taxi_func.apply(mount_pvc(pvc_name=’fb-nfs-pvc’, volume_mount_path=’/mnt/pipeline’, volume_name=’pvc-5b2c597c-d542–4a00–8915-caaedbd160fa’))
fetch_data_run = taxi_func.run(handler=’fetch_data’,
inputs={‘taxi_records_csv_path’: taxi_records_csv_path, ‘zones_csv_path’: zones_csv_path,},
local=False)

When run we get the following output:

MLRUN and S3 bucket(s)

Here is the required code to read/write to an S3 bucket from within an MLrun pipeline.

Jupyter notebook code for S3 FlashBlade bucket

# Install prerequisites
%pip install mlrun
from os import path
import mlrun
project_name_base = 'nyc-taxi-s3'
project_name, artifact_path = mlrun.set_environment(project=project_name_base, user_project=True)
print(f'Project name: {project_name}')
print(f'Artifact path: {artifact_path}')

The MLRun code:

# mlrun: start-code
import pandas as pd
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
def fetch_data(context : MLClientCtx, taxi_records_csv_path: DataItem, zones_csv_path: DataItem, storage_options_key, storage_options_secret, storage_options_endpoint):
context.logger.info('Reading taxi records data from {}'.format(taxi_records_csv_path))
taxi_records_dataset = pd.read_csv(f"{taxi_records_csv_path}",
storage_options={'key': storage_options_key,
'secret': storage_options_secret,
'client_kwargs' : {
'endpoint_url': storage_options_endpoint},
'use_ssl': False})
context.logger.info('Reading zones data from {}'.format(zones_csv_path)) zones_dataset = zones_csv_path.as_df()
# mlrun: end-code

Then I define our variables for the S3 backend object store:

taxi_records_csv_path = 's3://jbtbucket/iguazio/yellow_tripdata_2020-12.csv'
zones_csv_path = 'https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv'
storage_options_key = "PSFBSAZ-----------------------HCCMBFNNN"
storage_options_secret = "E1351C22----------------------23PFNO"
storage_options_endpoint = 'http://1.2.3.4'

Then I create and run the function with the variables passed:

taxi_func = mlrun.code_to_function(name='taxi',                                   
kind='job',
image='mlrun/mlrun',
requirements=[])
fetch_data_run = taxi_func.run(handler='fetch_data',
inputs={'taxi_records_csv_path':taxi_records_csv_path,
'zones_csv_path': zones_csv_path},
params={'storage_options_key': storage_options_key,
'storage_options_secret': storage_options_secret,
'storage_options_endpoint': storage_options_endpoint},
local=False

When run we get the following output:

I hope to have shown how simple accessing Pure Storage fast scalable object storage, FlashBlade is from an MLRun job.

--

--

jboothomas

Infrastructure engineering for modern data applications