complete document ingestion pipeline
This commit is contained in:
+168
-9
@@ -1,5 +1,15 @@
|
||||
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
import faiss
|
||||
from langchain_community.docstore.in_memory import InMemoryDocstore
|
||||
from langchain_community.vectorstores import FAISS
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
from langchain_community.document_loaders import TextLoader
|
||||
from langchain_community.document_loaders import Docx2txtLoader
|
||||
from uuid import uuid4
|
||||
from langchain_core.documents import Document
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
# loading the embedding model
|
||||
@@ -16,21 +26,170 @@ def load_embedding_model():
|
||||
embeddings = load_embedding_model()
|
||||
|
||||
|
||||
# A function to create the vector store
|
||||
def create_vector_store(document, embeddings=embeddings):
|
||||
embed_db = FAISS.from_documents(document, embeddings)
|
||||
return embed_db
|
||||
def create_documents(doc):
|
||||
text = doc[0].page_content
|
||||
metadata = doc[0].metadata
|
||||
text_splitter = RecursiveCharacterTextSplitter(
|
||||
chunk_size=1000,
|
||||
chunk_overlap=10,
|
||||
length_function=len,
|
||||
is_separator_regex=False,
|
||||
)
|
||||
docs = text_splitter.create_documents([text])
|
||||
# converting the text into documents
|
||||
documents = []
|
||||
for i, chunk in enumerate(docs):
|
||||
# Increment page number based on the chunk index
|
||||
doc_metadata = metadata.copy()
|
||||
doc_metadata['page'] = i # Assign page number based on chunk index
|
||||
document = Document(page_content=chunk.page_content, metadata=doc_metadata)
|
||||
documents.append(document)
|
||||
return documents
|
||||
|
||||
# A function to save the embedded data
|
||||
def save_embedded_data(docs, key="pdf"):
|
||||
docs.save_local(f"vec-db/index/faiss_index_{key}")
|
||||
|
||||
def load_txt_document(document_path):
|
||||
try:
|
||||
txt_doc = TextLoader(document_path)
|
||||
text = txt_doc.load()
|
||||
# implementig document splitting
|
||||
docs = create_documents(text)
|
||||
return docs
|
||||
except:
|
||||
raise ValueError(f"Error loading -- {document_path}")
|
||||
|
||||
|
||||
def load_docx_document(document_path):
|
||||
try:
|
||||
docx_doc = Docx2txtLoader(document_path)
|
||||
text = docx_doc.load()
|
||||
# implementig document splitting
|
||||
docs = create_documents(text)
|
||||
return docs
|
||||
except:
|
||||
raise ValueError(f"Error loading -- {document_path}")
|
||||
|
||||
|
||||
# creating a function that checks the document type and loads the document
|
||||
def load_pdf_document(document_path):
|
||||
try:
|
||||
pdf_doc = PyPDFLoader(document_path)
|
||||
pages = pdf_doc.load_and_split()
|
||||
return pages
|
||||
except:
|
||||
raise ValueError(f"Error loading -- {document_path}")
|
||||
|
||||
|
||||
|
||||
|
||||
# A general function that loads textual documents
|
||||
def load_document(document_path):
|
||||
if document_path.endswith(".pdf"):
|
||||
return load_pdf_document(document_path)
|
||||
elif document_path.endswith(".txt"):
|
||||
return load_txt_document(document_path)
|
||||
elif document_path.endswith(".docx"):
|
||||
return load_docx_document(document_path)
|
||||
else:
|
||||
raise ValueError(f"Unsupported document type for {document_path}")
|
||||
|
||||
|
||||
|
||||
def save_embedded_data(embeddings, key="data"):
|
||||
embeddings.save_local(f"vec-db/index/faiss_index_{key}")
|
||||
print("Embeddings saved")
|
||||
|
||||
# A function to load the embedded data
|
||||
def load_embedded_data(embeddings=embeddings, key="pdf"):
|
||||
def load_embedded_data(embeddings=embeddings, key="data"):
|
||||
embed_db = FAISS.load_local(f"vec-db/index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True)
|
||||
return embed_db
|
||||
|
||||
|
||||
# creating a function to load all documents from a directory.
|
||||
def load_documents_from_directory(directory_path: str):
|
||||
text_doc = ['pdf', 'txt', 'docx', 'doc', 'md']
|
||||
image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp']
|
||||
audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a']
|
||||
video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov']
|
||||
|
||||
# accessing the name of the files in the directory
|
||||
files = os.listdir(directory_path)
|
||||
# creating a list to store the documents
|
||||
documents = []
|
||||
# another list for the document names
|
||||
doc_names = []
|
||||
# counting the number of pages in the document
|
||||
num_pages= []
|
||||
# iterating through the files in the directory
|
||||
for file in files:
|
||||
# updating the path
|
||||
path = os.path.join(directory_path, file)
|
||||
# getting the file extension and doc name
|
||||
doc_name, extension = file.split('.')[0] , file.split('.')[-1]
|
||||
# checking if the file is a text document
|
||||
if extension in text_doc:
|
||||
# loading the document
|
||||
doc = load_document(path)
|
||||
# appending the document to the documents list
|
||||
documents.append(doc)
|
||||
# appending the number of pages in the document
|
||||
num_pages.append(len(doc))
|
||||
# adding the document name to the doc_names list
|
||||
doc_names.append(doc_name)
|
||||
print(f"Document {doc_name} loaded")
|
||||
|
||||
# so we need to create a document id for each document
|
||||
docs_id = [uuid4().hex for i in range(len(documents))]
|
||||
# creating a json file to store the documents, checking if it exists then open it, else create it
|
||||
json_file = f"{directory_path}/documents.json"
|
||||
if os.path.exists(json_file):
|
||||
with open(json_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
data['doc_names'] = doc_names
|
||||
data['docs_id'] = docs_id
|
||||
data['num_pages'] = num_pages
|
||||
with open(json_file, 'w') as f:
|
||||
json.dump(data, f)
|
||||
else:
|
||||
data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages}
|
||||
with open(json_file, 'w') as f:
|
||||
json.dump(data, f)
|
||||
|
||||
# returning the documents, and doc ids
|
||||
return documents, docs_id, num_pages
|
||||
|
||||
|
||||
# A function to create vector store
|
||||
def create_vector_store(embeddings, documents: list, docs_id: list, num_pages: list):
|
||||
# index set up with the embedding dimension
|
||||
index = faiss.IndexFlatL2(384)
|
||||
# Initialize the FAISS vector store
|
||||
vector_store = FAISS(
|
||||
embedding_function=embeddings,
|
||||
index=index,
|
||||
docstore=InMemoryDocstore(),
|
||||
index_to_docstore_id={},
|
||||
)
|
||||
# Now adding other documents to the store.
|
||||
for i in range(len(documents)):
|
||||
doc_id = docs_id[i]
|
||||
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
|
||||
vector_store.add_documents(documents=documents[i], ids=page_ids)
|
||||
|
||||
# saving the vector store automatically
|
||||
save_embedded_data(vector_store, key="data")
|
||||
|
||||
return vector_store
|
||||
|
||||
# creating a function to add documents to the vector store
|
||||
def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list):
|
||||
# loading the vector store
|
||||
vector_store = load_embedded_data(embeddings)
|
||||
for i in range(len(documents)):
|
||||
doc_id = docs_id[i]
|
||||
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
|
||||
vector_store.add_documents(documents=documents[i], ids=page_ids)
|
||||
print ("Documents added to the vector store")
|
||||
|
||||
|
||||
# A document search function
|
||||
def search(db, query, k=4):
|
||||
docs = db.similarity_search(query, k)
|
||||
|
||||
Reference in New Issue
Block a user