RAG and the challenge of raw to embedded dataset size.
With the increase of RAG enterprise deployments it is always interesting to see how things work under the covers. With that in mind in this “how to blog” I will go over the code to embed text chunks from files into a vector Database and look into the storage space required.
When leveraging a vector DB for a RAG use case we search across the embeddings based on the user query and return the most relevant hits. The associated text of these hits and the initial query is then passed on to the LLM for it to generate the response based on this enhanced context.
The vector database has to not only store the vector representation but also the raw text associated, so let’s get that setup and then see what impact a large amount of source data has on the storage required for the vectorDB.
Setup
I already have a milvus instance running within a kubernetes cluster using a Pure Storage Flashblade for the S3 backend. Milvus was installed using the milvus operator and the following values.yaml was applied during the deployment of the instance:
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: jbt-milvus
labels:
app: milvus
spec:
mode: cluster # Omit other fields ...
config:
minio:
# your bucket name
bucketName: jbt-milvus
# Optional, config the prefix of the bucket milvus will use
rootPath: milvus/jbt-milvus
useSSL: false
dependencies:
storage:
# enable external object storage
external: true
type: S3 # MinIO | S3
# the endpoint of AWS S3
endpoint: flashblade.purestorage.com
#useSSL: false
# the secret storing the access key and secret key
secretRef: "jbt-milvus-s3-secret"
For ease of use I also added a loadbalancer service so as to obtain from metalLB an IP visible from my workstation, here is the quick k8s config for that:
apiVersion: v1
kind: Service
metadata:
name: jbt-milvus-loadbalancer
spec:
type: LoadBalancer
selector:
app.kubernetes.io/component: proxy
app.kubernetes.io/instance: jbt-milvus
app.kubernetes.io/name: milvus
milvus.io/service: "true"
ports:
- protocol: TCP
port: 19530
targetPort: 19530
The ‘jbt-milvus-s3-secret’ contains the relevant S3 access and secret keys for the S3 object account, here is the layout of the required yaml:
apiVersion: v1
kind: Secret
metadata:
name: jbt-milvus-s3-secret
type: Opaque
stringData:
accesskey: PSFB*****BJEIA
secretkey: A121*****eJOEN
Applying the above yamls results in a running milvus instance accessible from outside the k8s cluster, and it should look similar to this:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
jbt-milvus-etcd-0 1/1 Running 0 43h 172.168.96.110 jbt-54-k <none> <none>
jbt-milvus-etcd-1 1/1 Running 0 45h 172.168.96.151 jbt-51-k <none> <none>
jbt-milvus-etcd-2 1/1 Running 0 13d 172.168.96.163 jbt-51-k <none> <none>
jbt-milvus-milvus-datacoord-764f9475c5-7kjcb 1/1 Running 0 18h 172.168.96.226 jbt-52-k <none> <none>
jbt-milvus-milvus-datanode-7f899c8c58-7mj5f 1/1 Running 0 18h 172.168.96.241 jbt-52-k <none> <none>
jbt-milvus-milvus-indexcoord-5b5f8655f-b5qcw 1/1 Running 0 18h 172.168.96.109 jbt-54-k <none> <none>
jbt-milvus-milvus-indexnode-8d86c6db8-2c5mt 1/1 Running 0 18h 172.168.96.216 jbt-52-k <none> <none>
jbt-milvus-milvus-proxy-5dcccff4b-chngq 1/1 Running 0 18h 172.168.96.233 jbt-52-k <none> <none>
jbt-milvus-milvus-querycoord-fff7966b7-gm89l 1/1 Running 1 (38h ago) 38h 172.168.96.189 jbt-51-k <none> <none>
jbt-milvus-milvus-querynode-0-54f76d6f6f-8cw2g 1/1 Running 0 17h 172.168.96.113 jbt-54-k <none> <none>
jbt-milvus-milvus-rootcoord-99477cd9f-zm7cp 1/1 Running 0 18h 172.168.96.229 jbt-52-k <none> <none>
jbt-milvus-pulsar-bookie-0 1/1 Running 0 28d 172.168.96.49 jbt-53-k <none> <none>
jbt-milvus-pulsar-bookie-1 1/1 Running 0 28d 172.168.96.95 jbt-54-k <none> <none>
jbt-milvus-pulsar-bookie-2 1/1 Running 0 45h 172.168.96.165 jbt-51-k <none> <none>
jbt-milvus-pulsar-broker-0 1/1 Running 0 13d 172.168.96.142 jbt-51-k <none> <none>
jbt-milvus-pulsar-proxy-0 1/1 Running 0 23h 172.168.96.230 jbt-52-k <none> <none>
jbt-milvus-pulsar-recovery-0 1/1 Running 0 45h 172.168.96.172 jbt-51-k <none> <none>
jbt-milvus-pulsar-zookeeper-0 1/1 Running 0 44h 172.168.96.156 jbt-51-k <none> <none>
jbt-milvus-pulsar-zookeeper-1 1/1 Running 0 45h 172.168.96.160 jbt-51-k <none> <none>
jbt-milvus-pulsar-zookeeper-2 1/1 Running 0 28d 172.168.96.111 jbt-54-k <none> <none>
Within a Jupyter notebook I load a kernel with the following requirements installed: pymilvus 2.4.3, PyPDF2 3.0.1, torch 2.3.0 and transformers 4.41.2.
Code
The first step is to load the required modules our tokenizer and model:
import os
import torch
from pymilvus import connections, Collection, CollectionSchema, DataType, FieldSchema, utility
from PyPDF2 import PdfReader
from transformers import DPRContextEncoder, DPRQuestionEncoderTokenizer
tokenizer = DPRQuestionEncoderTokenizer.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
model = DPRContextEncoder.from_pretrained("facebook/dpr-ctx_encoder-single-nq-base")
Then I define the following functions, the connection to our milvus database:
def connect_to_milvus():
connections.connect(alias="default", host="1.2.3.4", port="19530")
print("Connected to Milvus.")
The creation of a milvus collection and a function to insert our data. Here I define the schema with id, embeddings and text for my fields:
def create_collection(collection_name, dim=768):
exists = utility.has_collection(collection_name)
if not exists:
print(f"Collection {collection_name} does not exist.")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512)
]
schema = CollectionSchema(fields, description="Embedding Collection")
collection = Collection(name=collection_name, schema=schema)
print(f"Collection {collection_name} created.")
else:
collection = Collection(name=collection_name)
print(f"Collection {collection_name} already exists.")
return collection
def insert_embeddings(collection, embeddings, texts):
data = [embeddings, texts]
ids = collection.insert(data)
collection.flush()
return ids
Then functions to extract text from a files in a folder, chunk and encode the text. Note: if you have suggestions around how best to handle chunk length and byte size of the actual schema do say:
def extract_text_from_pdf(pdf_path):
# Implement PDF reading here
# Example: Using PyPDF2 (pip install PyPDF2)
reader = PdfReader(pdf_path)
text = ''
for page in reader.pages:
text += page.extract_text()
return text
def chunk_text(text, chunk_size=512):
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
# Calculate the byte length if the word is added
additional_length = len(word.encode('utf-8')) + (1 if current_chunk else 0)
new_length = current_length + additional_length
if additional_length <= chunk_size:
current_chunk.append(word)
current_length = new_length
else:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_length = len(word.encode('utf-8'))
# Add the last chunk if it exists
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def encode_text(text):
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
outputs = model(**inputs)
return outputs.pooler_output.detach().numpy()[0] #assuming single embedding
def move_file_to_done(file_name, source_folder, destination_folder):
# Construct full file paths
source_path = os.path.join(source_folder, file_name)
destination_path = os.path.join(destination_folder, file_name)
# Move the file
try:
shutil.move(source_path, destination_path)
print(f'File {file_name} moved to {destination_folder}')
except FileNotFoundError:
print(f'File {file_name} not found in {source_folder}')
except Exception as e:
print(f'Error moving file: {e}')
and finally a function to process the files in a folder:
def process_files(in_directory, out_directory, collection):
for filename in os.listdir(in_directory):
if filename.endswith('.pdf'):
print(f"Processing {filename}.")
texts = []
embeddings = []
pdf_path = os.path.join(in_directory, filename)
text = extract_text_from_pdf(pdf_path)
text_chunks = chunk_text(text)
for txt_chunk in text_chunks:
embedding = encode_text(txt_chunk)
texts.append(txt_chunk)
embeddings.append(embedding)
ids = insert_embeddings(collection, embeddings, texts)
move_file_to_done(filename, in_directory, out_directory)
print(f"Inserted {ids.insert_count} embeddings from {filename}.")
elif filename.endswith('.txt'):
print(f"Processing {filename}.")
texts = []
embeddings = []
txt_path = os.path.join(in_directory, filename)
text = extract_text_from_txt(txt_path)
text_chunks = chunk_text(text)
for txt_chunk in text_chunks:
embedding = encode_text(txt_chunk)
texts.append(txt_chunk)
embeddings.append(embedding)
ids = insert_embeddings(collection, embeddings, texts)
move_file_to_done(filename, in_directory, out_directory)
print(f"Inserted {ids.insert_count} embeddings from {filename}.")
Embedding a single PDF
I connect and create a collection to store my embeddings by running the following code:
connect_to_milvus()
collection_name = "my_test_rag"
dimension_size = 768
collection = create_collection(collection_name, dimension_size)
This will return:
Connected to Milvus.
Collection my_test_rag does not exist.
Collection my_test_rag created.
Then I run the processing of the files within a given directory :
my_source_directory = './PDFs'
my_dest_directory = './donePDFs'
process_files(my_source_directory, my_dest_directory, collection)
Running against a single PDF file inserted 1523 embeddings.
Searching the vector database
I can now query the vector database and retrieve relevant text chunks:
from pymilvus import connections, Collection, utility
from transformers import DPRQuestionEncoderTokenizer, DPRQuestionEncoder
tokenizer = DPRQuestionEncoderTokenizer.from_pretrained("facebook/dpr-question_encoder-single-nq-base")
model = DPRQuestionEncoder.from_pretrained("facebook/dpr-question_encoder-single-nq-base")
collection = Collection(name="my_test_rag")
# Check if the index already exists, and create if it does not
if not collection.has_index():
index_params = {
"index_type": "IVF_FLAT",
"params": {"nlist": 100},
"metric_type": "L2"
}
collection.create_index(field_name="embeddings", index_params=index_params)
# Load the collection into memory
collection.load()
# Prepare a query text and encode it into a vector
query_text = "Flashblade S3 statistics"
inputs = tokenizer(query_text, return_tensors="pt")
query_embedding = model(**inputs).pooler_output.detach().numpy()
# Convert the embedding to a list of floats
query_embedding = query_embedding.tolist()
# Perform the search
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=query_embedding,
anns_field="embeddings",
param=search_params,
limit=5,
expr=None,
output_fields=["text"]
)
# Print the search results
for result in results:
for hit in result:
print(f"ID: {hit.id}, Score: {hit.distance}, Text: {hit.entity.get('text')}")
Will retrieve the matching text chunks, for example:
ID: 450165976472778458, Score: 95.83973693847656,
Text: Hover over any part of a chart to display values for a specific point
in time. The values that appear in the point-in-time tooltips are rounded to
two decimal places. The Performance Chart The following example displays
the IOPS statistics on the array at precisely 5:00:00 on March 12 . The
Performance panel includes the Latency, IOPS, and Bandwidth charts. Latency
The Latency chart displays the average latency times for various operations.
•Read Latency (R)
...
Embedding thousands of documents
So what happens when I build a collection based on several GBs of text files?
For this I downloaded 19101 text files from the Gutenberg project for a total raw volume of 7.1GB. Using the above code this dataset was ingested into the vector database resulting in just over 14 million embeddings. The final count of entities in the collection is obtained using:
from pymilvus import connections, Collection, utility
# Connect to Milvus
connections.connect("default", host="1.2.3.4", port="19530")
collection_name = "my_test_rag"
collection = Collection(name=collection_name)
# Get the number of entities in the collection
num_entities = collection.num_entities
print(f"The number of entities in the collection '{collection_name}' is: {num_entities}")
Result:
The number of entities in the collection 'my_test_rag' is: 14379441
Now Milvus creates index_files, insert_log & stats_log objects. I waited for all the insert_logs to be played into the index_files and for compaction and deep gc to run on the Milvus platform so as to get the final storage size for the database. To obtain the total storage space used for my collection I list and summarise the objects created on the S3 storage layer:
aws s3 --endpoint-url=http://192.168.40.165 --profile fbstaines03 ls --summarize --human-readable --recursive s3://jbt-milvus/milvus/jbt-milvus/index_files | grep Total
Total Objects: 4362
Total Size: 61.0 GiB
aws s3 --endpoint-url=http://192.168.40.165 --profile fbstaines03 ls --summarize --human-readable --recursive s3://jbt-milvus/milvus/jbt-milvus/insert_log | grep Total
Total Objects: 4030
Total Size: 45.2 GiB
We have experienced a 8x (or 15x if we consider all db log files) increase in storage required from our initial raw text dataset to its vectored and chunked database version.
Another notable aspect is the fact that during ingest the database size grows to several times the end result! This is due to the ingest of data into the insert_log(s) these are then played into index_logs with compression, compaction and garbage collection helping to reduce the storage space required by the database. During the above test my database grew to several hundred GiB, multiple times the actual end total size. As per the following screen shot where I took measurements of the evolution of the backend S3 storage usage over several hours:
Conclusion
The workload of an enterprise vector database implies:
- increased storage footprint from raw data to vectors in the database
- fluctuating size during ingest of the vector database
- frequent ingest to maintain searchable up to date domain knowledge
A storage platform built around simplicity, consistent performance and scalability is key when considering one's enterprise RAG use case, we tend to forget that not everything is about the GPU !