role extracion and sop generation added

This commit is contained in:
2024-08-31 01:29:39 +00:00
parent ccb0db21d6
commit 1f02a30a16
15 changed files with 734 additions and 11 deletions
+21
View File
@@ -0,0 +1,21 @@
import os
from flask import Flask
from src.api.routes.sops import sops_bp
def create_app():
app = Flask(__name__)
# Register the blueprint with the desired prefix
app.register_blueprint(sops_bp, url_prefix='/api/v1/sop')
# Set up the upload folder configuration inside the src directory
UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../uploads')
UPLOAD_FOLDER = os.path.abspath(UPLOAD_FOLDER)
# Make sure the upload folder exists
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Assign the upload folder path to Flask config
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
return app
View File
+116
View File
@@ -0,0 +1,116 @@
import os
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
from src.services.sop_generator import SopGenerator
from src.utils.utils import delete_all_files_in_directory
from src.utils.document_loader import load_document
import json
# Initialize the Blueprint
sops_bp = Blueprint('sops', __name__)
# Initialize SopGenerator
sop_generator = SopGenerator()
ALLOWED_EXTENSIONS = {'pdf', 'doc', 'docx'}
def allowed_file(filename):
"""Check if the file has an allowed extension."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@sops_bp.route('/get_roles', methods=['POST'])
def get_roles():
# Check if the post request has the file part
if 'document' not in request.files:
return jsonify({"error": "No file part", "message": "Please upload a file with the key 'document'."}), 400
file = request.files['document']
# If the user does not select a file, the browser may also submit an empty part without filename
if file.filename == '':
return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_folder = current_app.config['UPLOAD_FOLDER']
file_path = os.path.join(upload_folder, filename)
# Save the file to the upload folder
file.save(file_path)
try:
# Use the utility function to generate docs from the file
docs = load_document(file_path)
# Generate roles from the docs
roles = sop_generator.get_roles(docs)["roles"]
# Cleanup: Delete all files in the upload directory after processing
delete_all_files_in_directory(upload_folder)
return jsonify({"roles": roles, "message": "Roles successfully extracted from the document."}), 200
except Exception as e:
# Cleanup: Delete all files in the upload directory if an error occurs
delete_all_files_in_directory(upload_folder)
return jsonify({"error": "Processing error", "message": f"An error occurred while processing the document: {str(e)}"}), 500
return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400
@sops_bp.route('/generate_sops', methods=['POST'])
def generate_sops():
# Check if the POST request has the file part
if 'document' not in request.files:
return jsonify({"error": "No file part", "message": "Please upload a file with the key 'document'."}), 400
print("Running................")
file = request.files['document']
roles_json = request.form.get('roles') # Get the roles as a JSON string
if not roles_json:
return jsonify({"error": "No roles provided", "message": "Please provide a list of roles in the 'roles' field."}), 400
try:
roles = json.loads(roles_json) # Parse the roles from JSON string to a list
print(f"Roles are:{roles}")
except json.JSONDecodeError:
return jsonify({"error": "Invalid JSON", "message": "The 'roles' field contains invalid JSON."}), 400
# If the user does not select a file, the browser may also submit an empty part without a filename
if file.filename == '':
return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_folder = current_app.config['UPLOAD_FOLDER']
file_path = os.path.join(upload_folder, filename)
# Save the file to the upload folder
file.save(file_path)
try:
# Use the utility function to generate docs from the file
docs = load_document(file_path)
# Check if the document can generate SOPs for the roles
status_check = sop_generator.check_role_sop(roles=roles, docs=docs)
if not status_check["status"]:
return jsonify({"error": "Document cannot extract SOPs", "message": status_check["message"]}), 400
# Generate SOPs based on the roles provided
sops = sop_generator.generate_sops(roles, docs)
# Cleanup: Delete all files in the upload directory after processing
delete_all_files_in_directory(upload_folder)
return jsonify({"sops": sops, "message": "SOPs successfully generated for the roles from the document."}), 200
except Exception as e:
# Cleanup: Delete all files in the upload directory if an error occurs
delete_all_files_in_directory(upload_folder)
return jsonify({"error": "Processing error", "message": f"An error occurred while processing the document: {str(e)}"}), 500
return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400
+113
View File
@@ -0,0 +1,113 @@
import os
import json
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
class SOPs(BaseModel):
must: Optional[List[str]] = Field(default_factory=list)
shall: Optional[List[str]] = Field(default_factory=list)
will: Optional[List[str]] = Field(default_factory=list)
class RoleSOPs(BaseModel):
sops: SOPs
class SOPsFound(BaseModel):
message: str
status: bool
class RolesResponse(BaseModel):
roles: List[str]
class SOPsResponse(BaseModel):
roles_sops: Dict[str, SOPs] = Field(default_factory=dict)
class SopGenerator:
def __init__(self):
self.api_key = os.getenv("OPENAI_API_KEY")
self.client = OpenAI(api_key=self.api_key)
self.model = "gpt-4o-mini"
def _extract_text_from_docs(self, docs):
"""Extract text content from document objects."""
return [doc.page_content for doc in docs]
def get_roles(self, docs) -> RolesResponse:
docs_text = self._extract_text_from_docs(docs)
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": '''Suppose you are a role/position extractor from a company document.
You extract the roles as a list, e.g., ["financial analyst", "data scientist", etc.].
If no roles are found, return an empty list.''',
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=RolesResponse,
max_tokens=1024,
temperature=0.1
)
return json.loads(response.choices[0].message.content)
def check_role_sop(self, roles: str, docs) -> SOPsFound:
docs_text = self._extract_text_from_docs(docs)
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": f'''Your role is to check if the SOPs for the provided roles "{roles}" are found in the document.
You are validating if the document can provide the SOPs.
Return status=True with a proper message if found, and status=False with a proper message if not.
Keep the message short, e.g., "SOPs found for the role: {roles}" or "SOPs not found for the role: {roles}".'''
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=SOPsFound,
max_tokens=1024,
temperature=0.1
)
return json.loads(response.choices[0].message.content)
def generate_sops(self, roles: List[str], docs) -> SOPsResponse:
roles_sops_all = {}
docs_text = self._extract_text_from_docs(docs)
for role in roles:
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": f'''You are a Standard Operating Procedure (SOP) extractor.
Your task is to find SOPs for the role "{role}" in the provided text.
SOPs should be categorized under "must", "shall", and "will".
If the SOPs for the role are not explicitly stated, you are required to infer them from the context provided in the document,
but only if there is clear evidence within the text.
Do not generate or assume SOPs that are not directly supported by the document.
Your extraction should strictly adhere to the content of the document, ensuring that no information is fabricated or inferred beyond what is present.
If no SOPs are found for the role, return an empty list for each category.''',
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=RoleSOPs,
max_tokens=1024,
temperature=0.1
)
role_sop = json.loads(response.choices[0].message.content)
roles_sops_all[role] = role_sop
return roles_sops_all
+48
View File
@@ -0,0 +1,48 @@
import os
from spire.doc import Document, FileFormat
from langchain_community.document_loaders import PyPDFLoader
def convert_word_to_pdf(doc_path: str) -> str:
"""
Convert a .doc or .docx file to PDF using Spire.Doc.
Args:
doc_path (str): The path to the .doc or .docx file.
Returns:
str: The path to the converted PDF file.
"""
pdf_path = os.path.splitext(doc_path)[0] + '.pdf'
# Create a Document object
document = Document()
# Load the Word document
document.LoadFromFile(doc_path)
# Save as PDF
document.SaveToFile(pdf_path, FileFormat.PDF)
document.Close()
return pdf_path
def load_document(file_path: str):
"""
Utility function to load a PDF, DOCX, or DOC file by first converting it to PDF.
Args:
file_path (str): The path to the file to load.
Returns:
List[Document]: A list of Document objects representing the contents of the file.
"""
extension = os.path.splitext(file_path)[1].lower()
if extension in ['.doc', '.docx']:
# Convert .doc or .docx to PDF first
pdf_path = convert_word_to_pdf(file_path)
loader = PyPDFLoader(pdf_path)
elif extension == '.pdf':
loader = PyPDFLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {extension}. Only .pdf, .docx, and .doc are supported.")
return loader.load()
+20
View File
@@ -0,0 +1,20 @@
import os
def delete_file(file_path):
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except OSError as e:
print(f"Error deleting file {file_path}: {e}")
import os
def delete_all_files_in_directory(directory_path):
try:
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Deleted file: {file_path}")
except OSError as e:
print(f"Error deleting files in {directory_path}: {e}")