MetaFlow AI data preparation on S3

jboothomas
7 min readDec 24, 2024

--

When working with AI, three critical factors come into play: model parameters/architecture, compute resources, and data. These dimensions collectively influence the accuracy of your models:

  • Larger or better-designed models
  • Longer training durations or more powerful computation
  • Larger datasets or better-prepared data

The quality of your dataset significantly affects both the model’s accuracy and the training time. For example, I recently trained a “small language model” without preprocessing the text dataset, and the results were poor, to say the least. The model produced outputs like French words mixed with English or single letters. This happened because I skipped data cleaning intentionally to evaluate the impact of unprocessed datasets — and it clearly highlighted the importance of proper data preparation.

Today, I’ll walk you through the initial data preparation steps for handling large datasets stored on S3. We’ll be using MetaFlow, a tool that simplifies the implementation of task-specific workflows.

Metaflow code

To access data on S3, we need an access_key_id and a secret_access_key associated with our S3 account. Since I’m working with an on-premise Pure Storage FlashBlade S3 storage solution, I also need to specify the endpoint URL.

Additionally, I’ll leverage MetaFlow’s Kubernetes integration to scale the parallelism for specific tasks.

For clarity, I’ve divided the code into sections to explain each part. However, the entire code is executed as a single block within a unique Jupyter Notebook cell.

Imports and Parameters

import os
from metaflow import FlowSpec, step, pypi, NBRunner, kubernetes, S3

# Set environment variables for metaflow kubernetes
os.environ['KUBECONFIG'] = '/home/pureuser/.metaflowconfig/config'
os.environ['METAFLOW_KUBERNETES_NAMESPACE']= 'default'

# Set environment variables for S3 metaflow task artefacts
os.environ['METAFLOW_DATASTORE']= 's3'
os.environ['METAFLOW_DEFAULT_ENVIRONMENT'] = 'pypi'
os.environ['METAFLOW_DATASTORE_SYSROOT_S3'] = 's3://metaflow'
os.environ['METAFLOW_S3_VERIFY_SSL'] = 'false'
os.environ['METAFLOW_S3_ENDPOINT_URL'] = 'http://1.2.3.4'

# Set environment variables for metaflow worker parallelism
os.environ['METAFLOW_RUN_MAX_WORKERS'] = '80'

# S3 variables
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
#Pure Storage Flashblade endpoint with dataset
os.environ['S3_ENDPOINT_URL'] = 'http://1.2.3.4'
os.environ['AWS_ACCESS_KEY_ID'] = 'PSFBSAZRDK****HIABGCMD'
os.environ['AWS_SECRET_ACCESS_KEY'] = '4F4F10877****615BMCM'

#define a class to hold all our steps
class S3ParallelCellFlow(FlowSpec):

In this first code section I start by defining the Kubernetes config file to use when a given step is assigned the @kubernetes decorator and the namespace, in this case default. The next section defines the S3 variables for metaflow to access a shared S3 datastore for the tasks artefacts (logs, outputs,…). I then set the maximum metaflow run workers, this is simply the limit of the number of parallel tasks I can run, so it should be sized accordingly to your environments capabilities. In the last portion I provide the S3 account access and secret keys.

Then a class is used to tie all our steps together to a callable module.

Step 1 — start

    @step
def start(self):
#set split / batch tasks
self.split = 32

self.next(self.list_objects)

I always define a start step to store specific variables within the MetaFlow objects self keyword.

Step 2 — List objects

    @kubernetes(image='jboothomas/metaflow-image:latest', memory=6000, cpu=4, disk=1024, secrets=['fb-secret'])
@step
def list_objects(self):
import boto3

s3_client = boto3.client(
's3',
endpoint_url=os.environ['S3_ENDPOINT_URL'],
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
verify=False
)

self.bucket_name = 'warp'
self.prefix = ''

self.s3_objects = []
paginator = s3_client.get_paginator('list_objects_v2')
try:
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix)
for page in pages:
if "Contents" in page:
for obj in page["Contents"]:
self.s3_objects.append(obj["Key"])
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
raise ValueError(f"Bucket '{self.bucket_name}' does not exist.")
else:
raise


self.next(self.batch)

The first line of this section calls the MetaFlow Kubernetes operator, for this we need to define an image, the memory, cpu and disk to be assigned to the deployed pod and any additional information. In my case I also provide a secret that contains the S3 access, secret keys and endpoint url to my FlashBlade S3 storage.

I used the following command to create such a secret:

kubectl create secret generic fb-secret --from-literal=AWS_ACCESS_KEY_ID=PSF***CMD --from-literal=AWS_SECRET_ACCESS_KEY=4F4F****MCM --from-literal=S3_ENDPOINT_URL=http://1.2.3.4

The next section of code defines a boto3 S3 client and the bucket to list the objects from. Using the self metaflow keyword variable I create a list to hold all the keys returned: self.s3_objects = []

Listing is a lengthy process as one has to iterate over pages of 1000 objects, if you are doing repetitive processing of a bucket that has no content change I would recommend storing this list and pulling it back in to save time.

Step 3 — Split into batches

@step
def batch(self):
"""
Splits the list into n batches.
"""
split = self.split
batch_size = len(self.s3_objects) // split
self.batches = [
self.s3_objects[i * batch_size:(i + 1) * batch_size]
for i in range(split)
]
self.next(self.get_headers, foreach="batches")

In the batch step I split the total objects returned by the list into a defined number of sub batches that are passed into the next step via the foreach. This will spin up n pods equal to the split count upto our maximum metaflow worker setting (global limit on flow parallelism).

Step 4 —Per batch get headers

@kubernetes(image='jboothomas/metaflow-image:latest', memory=1000, cpu=1, disk=1024, secrets=['fb-secret'])   
@step
def get_headers(self):
import time
import boto3

geth_start_time = time.time()

s3_client = boto3.client(
's3',
endpoint_url=os.environ['S3_ENDPOINT_URL'],
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
verify=False
)

s3_batch_size = len(self.input)
print(f"this batch length: {s3_batch_size}")

self.headers = []
for key in self.input:
try:
response = s3_client.head_object(Bucket=self.bucket_name, Key=key)
header_data = { # Create a dictionary to store all relevant headers
"key": key,
"content_type": response.get('ContentType'),
"content_length": response.get('ContentLength'), # Size in bytes
"last_modified": response.get('LastModified'), # Last modified timestamp
"etag": response.get('ETag'),
"metadata": response.get('Metadata', {}), # User-defined metadata
"accept_ranges": response.get('AcceptRanges'),
"cache_control": response.get('CacheControl'),
"content_disposition": response.get('ContentDisposition'),
"content_encoding": response.get('ContentEncoding'),
"expires": response.get('Expires'),
"server_side_encryption": response.get('ServerSideEncryption'),
"sse_customer_algorithm": response.get('SSECustomerAlgorithm'),
"sse_customer_key_md5": response.get('SSECustomerKeyMD5'),
"sse_kms_key_id": response.get('SSEKMSKeyId'),
"storage_class": response.get('StorageClass'),
"website_redirect_location": response.get('WebsiteRedirectLocation'),
# Add other headers as needed
}
self.headers.append(header_data)
except botocore.exceptions.ClientError as e:
print(f"Error getting header for {key}: {e}")
self.headers.append({"key": key, "error": str(e)})

geth_end_time = time.time()
elapsed_time = geth_end_time - geth_start_time
headers_per_second = s3_batch_size / elapsed_time if elapsed_time > 0 else 0

print(f"Processed {s3_batch_size} objects in {elapsed_time:.2f} seconds. Headers/sec: {headers_per_second:.2f}")

self.batch_metrics = {"headers_per_second": headers_per_second, "batch_size": s3_batch_size, "elapsed_time": elapsed_time, 'start_time': geth_start_time, 'end_time':geth_end_time}

self.next(self.join_results)

Each batch is spun up as a unique pod on the Kubernetes cluster. Each pod takes its given chunk of object keys and proceeds to pull the header information storing the results in a self.headers item.

When pre-processing our data you often need to base the following steps on given metadata that is stored within an objects header. This step gets us that information for any following steps where it may be needed.

Step 5 — Collate results

@step
def join_results(self, inputs): # The key is here

self.all_headers = []
self.all_metrics = []
start_times = []
end_times = []
for input in inputs:
self.all_headers.extend(input.headers)
if hasattr(input,"batch_metrics"):
self.all_metrics.append(input.batch_metrics)
start_times.append(input.batch_metrics["start_time"])
end_times.append(input.batch_metrics["end_time"])

self.earliest_start_time = min(start_times) if start_times else None
self.latest_end_time = max(end_times) if end_times else None

self.next(self.end)

At this point data processing steps could be applied either as more foreach batches or filtering the returned listings and processing. In this example I simply proceed to join the results and calculate the earliest start time and the latest end time to get a total elapsed for the overall get header operation.

Step 7 — Output flow metrics

    @pypi(python='3.10.11', packages={'boto3': '1.34.112', 'pandas' :'2.2.3'})
@step
def end(self):
import time
import pandas as pd
import json

df = pd.DataFrame(self.all_headers)
print(df.head())

metrics_df = pd.DataFrame(self.all_metrics)
print("\nBatch Metrics Summary:")
print(metrics_df)

total_processed_objects = metrics_df['batch_size'].sum()
total_elapsed_time = self.latest_end_time - self.earliest_start_time if self.latest_end_time and self.earliest_start_time else 0

overall_headers_per_second = total_processed_objects / total_elapsed_time if total_elapsed_time > 0 else 0
print(f"\nOverall Headers per Second: {overall_headers_per_second:.2f}")
print(f"\nOverall time elapsed in Second: {total_elapsed_time:.2f}")

## additional code to show per object obtained information ##
if self.all_headers: # Check if the list is not empty
print("\nFirst 2 Header Elements:")
for i, header in enumerate(self.all_headers[:2]): # Iterate only through the first 5
print(f" {key}: {json.dumps(value, indent=4, default=str)}")
else:
print("\nNo headers were collected.")

The end step outputs relevant metrics for the overall Metaflow operation, I am interested in the overall headers per second and total time spent listing the object headers so as to see how my parallelism impacts my processing speed.

Step 8 — Run the flow

run = NBRunner(flow=S3ParallelCellFlow).nbrun()

As I am running from within a jupyter notebook cell the above single line will execute the entire flow.

Parallelism and S3 performance

I ran the above code against a FlashBlade bucket with 1 million objects and varied the parallelism of the get_headers operation to see the impact this has on the overall performance.

In my case based on the available compute of the Kubernetes platform and the actual FlashBlade configuration, with 80 parallel workers I achieved 15'804 get header operations per second !

This would seem to be the upper bounds of my lab setup, more client compute and a bigger FlashBlade would push this result higher.

I proceeded to run the same code against an AWS Bucket with 1 million objects and compare with my FlashBlade results. If fast S3 performance is needed for your workloads, Pure Storage FlashBlade delivers!

Conclusion

In this coming age of S3 powered AI workloads S3 metadata performance will be key in order to process in either pre or post workflow steps the data. FlashBlade was built around metadata performance and the ability to scale to adapt not only to usable raw storage space but also performance. Each blade within a FlashBlade cluster adds 600'000 metadata IOPS, a single chassis can house 10 blades and multiple chassis can be stacked to create a platform that delivers true metadata performance at scale.

--

--

jboothomas
jboothomas

Written by jboothomas

Infrastructure engineering for modern data applications

No responses yet