diff --git a/data/raw/document.pdf b/data/raw/document.pdf index 8cd88fc..657981f 100644 Binary files a/data/raw/document.pdf and b/data/raw/document.pdf differ diff --git a/src/api/routes/sops.py b/src/api/routes/sops.py index 23e22c3..e98fad6 100644 --- a/src/api/routes/sops.py +++ b/src/api/routes/sops.py @@ -2,7 +2,7 @@ import os from flask import Blueprint, request, jsonify, current_app from werkzeug.utils import secure_filename from src.services.sop_generator import (SopPersonalAssessment,SopGeneratorExecutive) -from src.services.document_parser import DocumentParser +from src.services.sop_document_parser import DocumentParser from src.utils.utils import delete_all_files_in_directory from src.utils.document_loader import load_document @@ -384,64 +384,137 @@ def generate_sops_from_questionnaire(): -@sops_bp.route('/executive/get_roles_doc', methods=['POST']) -def generate_sops_from_questionnaire(): +@sops_bp.route('/executive/get_roles_for_reference_managers', methods=['POST']) +def get_roles_for_reference_managers(): try: # Retrieve form data - reference_roles = request.get_json().get('reference_roles') # List of reference roles in JSON format - document = request.files.get('document') # The uploaded document + reference_roles_text = request.form.get('reference_roles') # Reference roles sent as text + file = request.files.get('document') # The uploaded document - if not reference_roles or not document: + if not reference_roles_text or not file: return jsonify({"error": "Missing data", "message": "Reference roles or document not provided."}), 400 + + if file.filename == '': + return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400 - # Use extractor to extract roles from the document - extractor = DocumentParser() - extracted_data = extractor.extract_departments_and_managers_workers([document]) + # Convert reference_roles_text to JSON (list) + try: + reference_roles = json.loads(reference_roles_text) # Convert text to JSON (list) + except json.JSONDecodeError: + return jsonify({"error": "Invalid format", "message": "The reference roles should be in a valid JSON format."}), 400 - if not extracted_data: - return jsonify({"error": "Extraction error", "message": "No roles were extracted from the document."}), 400 + if file and allowed_file(file.filename): + filename = secure_filename(file.filename) + upload_folder = current_app.config['UPLOAD_FOLDER'] + file_path = os.path.join(upload_folder, filename) + + file.save(file_path) + docs = load_document(file_path) - # Extract all managers with their name, title (position), and classification (role: PRP or SRP) - extracted_managers = [] - for department in extracted_data['departments']: - extracted_managers.extend([{ - 'name': manager['name'], - 'position': manager.get('position', 'Unknown Position'), # Assuming title is the position - 'role': manager.get('classification', 'Unknown Role') # PRP or SRP classification - } for manager in department['managers']]) + # Use extractor to extract roles from the document + extractor = DocumentParser() + roles_comparison_with_doc = extractor.extract_roles_with_reference_managers(docs=docs, reference_roles=reference_roles) - # Prepare assigned, unassigned, and unavailable managers - assigned_managers = [manager for manager in extracted_managers if manager['name'] in reference_roles] - unassigned_managers = [{'name': role, 'position': 'Reference Role', 'role': 'N/A'} for role in reference_roles if role not in [manager['name'] for manager in extracted_managers]] - unavailable_managers = [manager for manager in extracted_managers if manager['name'] not in reference_roles] + if not roles_comparison_with_doc: + return jsonify({"error": "Processing error", "message": "No roles found matching the reference roles."}), 404 - # Return the results with detailed manager information - return jsonify({ - "assigned_roles": assigned_managers, - "unassigned_roles": unassigned_managers, - "unavailable_roles": unavailable_managers - }), 200 + return jsonify({"roles_info": roles_comparison_with_doc, "message": "Roles comparison successfully generated."}), 200 except Exception as e: return jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500 - - - - assigned_roles = [role for role in reference_roles if role in extracted_roles] - unassigned_roles = [role for role in reference_roles if role not in extracted_roles] - unavailable_roles = [role for role in extracted_roles if role not in reference_roles] - # Return the results - return jsonify({ - "assigned_roles": assigned_roles, - "unassigned_roles": unassigned_roles, - "unavailable_roles": unavailable_roles - }), 200 - + +@sops_bp.route('/manager/get_roles_for_reference_workers', methods=['POST']) +def get_roles_for_reference_workers(): + try: + # Retrieve form data + reference_roles_text = request.form.get('reference_roles') # Reference roles sent as text + file = request.files.get('document') # The uploaded document + + if not reference_roles_text or not file: + return jsonify({"error": "Missing data", "message": "Reference roles or document not provided."}), 400 + + if file.filename == '': + return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400 + + # Convert reference_roles_text to JSON (list) + try: + reference_roles = json.loads(reference_roles_text) # Convert text to JSON (list) + except json.JSONDecodeError: + return jsonify({"error": "Invalid format", "message": "The reference roles should be in a valid JSON format."}), 400 + + if file and allowed_file(file.filename): + filename = secure_filename(file.filename) + upload_folder = current_app.config['UPLOAD_FOLDER'] + file_path = os.path.join(upload_folder, filename) + + file.save(file_path) + docs = load_document(file_path) + + # Use extractor to extract roles from the document + extractor = DocumentParser() + roles_comparison_with_doc = extractor.extract_roles_with_reference_workers(docs=docs, reference_roles=reference_roles) + + if not roles_comparison_with_doc: + return jsonify({"error": "Processing error", "message": "No roles found matching the reference roles."}), 404 + + return jsonify({"roles_info": roles_comparison_with_doc, "message": "Roles comparison successfully generated."}), 200 + except Exception as e: return jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500 + +@sops_bp.route('/manager/generate_sop_workers_doc', methods=['POST']) +def generate_sop_workers_doc(): + try: + # Check if the document is provided + if 'document' not in request.files: + return jsonify({"error": "No file part", "message": "Please upload a file with the key 'document'."}), 400 + file = request.files['document'] + + # Check if the file is selected + if file.filename == '': + return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400 + + # Check if the file type is allowed + if file and allowed_file(file.filename): + filename = secure_filename(file.filename) + upload_folder = current_app.config['UPLOAD_FOLDER'] + file_path = os.path.join(upload_folder, filename) + file.save(file_path) + else: + return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400 + + # Get workers information from the form data (passed as JSON text) + workers_info_text = request.form.get('workers_info') + if not workers_info_text: + return jsonify({"error": "Missing data", "message": "Workers info is required in the form data."}), 400 + + # Load workers information as a list of dictionaries + try: + workers_list = json.loads(workers_info_text) + except json.JSONDecodeError: + return jsonify({"error": "Invalid data format", "message": "Workers info should be a valid JSON array."}), 400 + + # Now load the document + docs = load_document(file_path) + + # Generate SOPs for workers by department using SopGeneratorExecutive + sop_generator = DocumentParser() + result = sop_generator.extract_sops_for_workers_by_department(docs, workers_list) + + # Clean up the file + delete_all_files_in_directory(upload_folder) + + if not result: + return jsonify({"error": "Processing error", "message": "Failed to generate SOPs for workers."}), 500 + + return jsonify({"sops": result, "message": "SOPs successfully generated for workers."}), 200 + + except Exception as e: + delete_all_files_in_directory(upload_folder) + return jsonify({"error": "Processing error", "message": f"An error occurred while processing the document: {str(e)}"}), 500 diff --git a/src/models/response_schemas.py b/src/models/response_schemas.py index 101e02a..decc9da 100644 --- a/src/models/response_schemas.py +++ b/src/models/response_schemas.py @@ -46,6 +46,10 @@ class Worker(BaseModel): position: str responsibilities: List[str] = Field(default_factory=list) +class DepartmentMembers(BaseModel): + managers: List[Manager] = Field(default_factory=list) + workers: List[Worker] = Field(default_factory=list) + class Department(BaseModel): name: str managers: List[Manager] = Field(default_factory=list) # Updated to managers @@ -60,16 +64,55 @@ class ManagerSOPs(BaseModel): shall: List[str] = Field(default_factory=list) will: List[str] = Field(default_factory=list) +class WorkerSOPs(BaseModel): + must: List[str] = Field(default_factory=list) + shall: List[str] = Field(default_factory=list) + will: List[str] = Field(default_factory=list) + class ManagerWithSOPs(BaseModel): title: str sops: ManagerSOPs +class WorkerWithSOPs(BaseModel): + name: str + position:str + sops: ManagerSOPs + class DepartmentManagerSOPs(BaseModel): name: str managers: List[ManagerWithSOPs] + +class DepartmentWorkerSOPs(BaseModel): + name: str + workers: List[WorkerWithSOPs] + + class ExecutiveManagerSOPsResponse(BaseModel): departments: List[DepartmentManagerSOPs] +class WorkerSOPsResponse(BaseModel): + departments:List[DepartmentWorkerSOPs] + + + +# Model for an assigned or unavailable role with name, position, and role fields +class RoleWithPositionAndRole(BaseModel): + name: str # Role name + position: str # Role position (e.g., "Role Position") + role: str # Role classification (e.g., "PRP" or "SRP") + +# Model for an unassigned role with just name and position fields +class RoleWithPositionOnly(BaseModel): + name: str # Role name + position: str # Reference role position + +# Main model that includes assigned, unassigned, and unavailable roles +class RolesComparisonResponse(BaseModel): + assigned_roles: List[RoleWithPositionAndRole] = Field(default_factory=list) # List of assigned roles + unassigned_roles: List[RoleWithPositionOnly] = Field(default_factory=list) # List of unassigned roles + unavailable_roles: List[RoleWithPositionAndRole] = Field(default_factory=list) + + diff --git a/src/prompts/sops.py b/src/prompts/sops.py index 7b27705..f1a797e 100644 --- a/src/prompts/sops.py +++ b/src/prompts/sops.py @@ -129,6 +129,39 @@ def get_departments_managers_workers_extraction_prompt(): If no departments, managers, or workers are found in the document, return an empty list for departments. """ +def get_managers_workers_extraction_prompt(): + return """ + Extract only the managers and workers from the document. + For each manager, include their name, position, role, and key responsibilities. + Additionally, for each worker, extract their name, position, and list 1-2 key responsibilities. + Do not add any managers or workers that are not explicitly mentioned in the document. + Managers: Include the managers (e.g., Department Head, Manager), their role, and key responsibilities. + - **PRP (Primary Responsible Person)**: A manager who has primary responsibility for decision-making and overseeing operations. + - **SRP (Secondary Responsible Person)**: A manager who supports the PRP, often assisting with tasks and providing backup in decision-making. + + Format as JSON: + { + "managers": [ + { + "name": "Manager Name", + "position": "Manager Position", + "role": "PRP or SRP", # The classification field either PRP or SRP + "responsibilities": ["Key Responsibility 1", "Key Responsibility 2"] + } + ], + "workers": [ + { + "name": "Worker Name", + "position": "Worker Position", + "responsibilities": ["Key Responsibility 1", "Key Responsibility 2"] + } + ] + } + If no managers or workers are found in the document, return an empty list for them. + """ + + + @@ -202,23 +235,17 @@ def get_sop_executive_from_questionnaire(): Ensure that each specified department has its own set of SOPs. ''' -def generate_llm_comparison_prompt(reference_roles, extracted_managers): - reference_roles_str = ', '.join(reference_roles) - extracted_managers_str = '\n'.join([f"- {manager['name']} (Position: {manager['position']}, Role: {manager['role']})" for manager in extracted_managers]) - - prompt = f""" +def get_roles_reference_comparison(): + prompt = """ You are tasked with comparing a list of reference roles with the extracted roles from a document. - Reference roles: - [{reference_roles_str}] - - Extracted roles: - {extracted_managers_str} - Please classify the roles into the following categories: 1. **Assigned Roles**: Roles that are found in both the reference list and the extracted list. 2. **Unassigned Roles**: Roles that are found in the reference list but not in the extracted list. 3. **Unavailable Roles**: Roles that are found in the extracted list but not in the reference list. + + Instruction: + 1. Use only the position to judge the extraction. Return the result in the following JSON format: { @@ -234,3 +261,34 @@ def generate_llm_comparison_prompt(reference_roles, extracted_managers): } """ return prompt + + + +def get_sop_for_department_workers(): + return '''Generate SOPs for each worker under the unique department based on the information the workers info provided + + Instructions: + 1. Focus on the provided department and worker role. + 2. Categorize SOPs into "must," "shall," and "will." + 3. SOPs should be actionable and relevant to the worker's duties. + 4. If no SOPs can be generated, return empty lists for each category. + 5. Use the provided document and the workers and department information to generate the SOP. + 6. If the provided document cannot provide SOPs for a specific worker stated, then return an empty list for the SOP for that worker. + + Example format: + { + "departments": [ + { + "name": "Department A", + "workers": [ + { + "name": "Worker A", + "must": ["Conduct weekly meetings"], + "shall": ["Submit monthly reports"], + "will": ["Improve efficiency"] + } + ] + } + ] + }s + ''' diff --git a/src/services/document_parser.py b/src/services/sop_document_parser.py similarity index 56% rename from src/services/document_parser.py rename to src/services/sop_document_parser.py index 20e65a8..c949701 100644 --- a/src/services/document_parser.py +++ b/src/services/sop_document_parser.py @@ -149,13 +149,12 @@ class DocumentParser: return False - def generate_sops_(self, docs, reference_roles): + + def extract_roles_with_reference_managers(self, docs, reference_roles): try: - # First, extract departments and managers from the document + # Extract departments and managers from the document sop_doc = DocumentParser() - departments_and_roles = sop_doc.extract_departments_and_managers(docs) - - + departments_and_roles = sop_doc.extract_departments_and_managers_workers(docs) # Prepare extracted roles (only managers) extracted_managers = [] @@ -163,35 +162,133 @@ class DocumentParser: extracted_managers.extend([ { 'name': manager['name'], - 'position': manager.get('title', 'Unknown Position'), - 'role': manager.get('classification', 'Unknown Role') # PRP or SRP classification + 'position': manager.get('position', 'Unknown Position'), + 'role': manager.get('role', 'Unknown Role') # PRP or SRP classification } - for manager in department['managerial_roles'] + for manager in department['managers'] ]) # Generate prompt for the LLM to compare reference roles with extracted roles - prompt = generate_llm_comparison_prompt(reference_roles, extracted_managers) + prompt = get_roles_reference_comparison() + # Send prompt to the LLM for comparison response = self.client.beta.chat.completions.parse( model=self.model, messages=[ - {"role": "system", "content": "You are a role comparison assistant."}, + {"role": "system", "content": f"The reference roles are{reference_roles} while the extracted roles are {extracted_managers}"}, {"role": "user", "content": prompt} ], max_tokens=1024, - temperature=0.1 + temperature=0.1, + response_format=RolesComparisonResponse ) - # Parse LLM response (assuming it returns a structured JSON with assigned, unassigned, and unavailable roles) + + comparison_result = json.loads(response.choices[0].message.content) # Return the result as a JSON response - return jsonify(comparison_result), 200 - + return comparison_result except Exception as e: - return jsonify({"error": "Processing error", "message": f"An error occurred: {str(e)}"}), 500 + print(f"Error occurred: {e}") + return False + def extract_roles_with_reference_workers(self, docs, reference_roles): + try: + # Extract departments and managers from the document + sop_doc = DocumentParser() + departments_and_roles = sop_doc.extract_departments_and_managers_workers(docs) + + # Prepare extracted roles (only managers) + extracted_workers = [] + for department in departments_and_roles['departments']: + extracted_workers.extend([ + { + 'name': worker['name'], + 'position': worker.get('position', 'Unknown Position'), + 'role': "worker" # PRP or SRP classification + } + for worker in department['workers'] + ]) + + # Generate prompt for the LLM to compare reference roles with extracted roles + prompt = get_roles_reference_comparison() + + + # Send prompt to the LLM for comparison + response = self.client.beta.chat.completions.parse( + model=self.model, + messages=[ + {"role": "system", "content": f"The reference roles are{reference_roles} while the extracted roles are {extracted_workers}"}, + {"role": "user", "content": prompt} + ], + max_tokens=1024, + temperature=0.1, + response_format=RolesComparisonResponse + ) + comparison_result = json.loads(response.choices[0].message.content) + + # Return the result as a JSON response + return comparison_result + except Exception as e: + print(f"Error occurred: {e}") + return False + + + def extract_managers_workers(self, docs): + """ + Extract departments, managers, and workers from the document. + + :param docs: List of document chunks + :return: Dictionary containing departments, their managers, and workers. + """ + try: + docs_text = self._extract_text_from_docs(docs) + prompt = get_managers_workers_extraction_prompt() # Update your prompt to handle managers and workers + + response = self.client.beta.chat.completions.parse( + model=self.model, + messages=[ + {"role": "system", "content": prompt}, + {"role": "user", "content": [{"type": "text", "text": text} for text in docs_text]} + ], + response_format=DepartmentMembers, # Use the updated response schema + max_tokens=4096, + temperature=0.1 + ) + + return json.loads(response.choices[0].message.content) + + except json.JSONDecodeError: + return False + def extract_sops_for_workers_by_department(self, docs,depts_workers): + """ + Extract departments, managers, and workers from the document. + + :param docs: List of document chunks + :return: Dictionary containing departments, their managers, and workers. + """ + try: + docs_text = self._extract_text_from_docs(docs) + prompt = get_sop_for_department_workers() # Update your prompt to handle managers and workers + + response = self.client.beta.chat.completions.parse( + model=self.model, + messages=[ + {"role": "system", "content": prompt}, + {"role": "user", "content": f"Workers information: {depts_workers}"}, + {"role": "user", "content": [{"type": "text", "text": text} for text in docs_text]} + ], + response_format=WorkerSOPsResponse, # Use the updated response schema + max_tokens=4096, + temperature=0.1 + ) + + return json.loads(response.choices[0].message.content) + + except json.JSONDecodeError: + return False \ No newline at end of file diff --git a/src/services/sop_generator.py b/src/services/sop_generator.py index ff118c5..8bce598 100644 --- a/src/services/sop_generator.py +++ b/src/services/sop_generator.py @@ -5,7 +5,7 @@ from pydantic import BaseModel, Field from typing import List, Dict, Optional from src.prompts.sops import * from src.models.response_schemas import * -from src.services.document_parser import DocumentParser +from src.services.sop_document_parser import DocumentParser from dotenv import load_dotenv load_dotenv() @@ -264,146 +264,5 @@ class SopGeneratorManager: self.client = OpenAI(api_key=self.api_key) self.model = "gpt-4o-mini" - def extract_sops_from_executive_vision_goals_doc(self, data: dict,executives:List) -> SOPsResponse: - """ - Extracts SOPs categorized into 'must,' 'shall,' and 'will' based on executive vision and goals. - - :param data: A dictionary containing vision and goals. - :return: SOPsResponse containing the SOPs for executives - """ - try: - - vision = data.get("vision", "No vision provided") - goals = data.get("goals", "No goals provided") - - prompt = get_sop_executive_from_vision_goals(executives) - - user_content = f''' - Vision: {vision} - Goals: {goals} - ''' - - response = self.client.beta.chat.completions.parse( - model=self.model, - messages=[ - { - "role": "system", - "content": f'''{prompt}''' - }, - { - "role": "user", - "content": user_content, - } - ], - response_format=Categories, - max_tokens=2048, - temperature=0.1 - ) - - extracted_text = json.loads(response.choices[0].message.content) - return extracted_text - - except Exception as e: - print(f"Error occurred: {str(e)}") - return False - - - def generate_sops_for_department_managers(self, docs): - try: - # First, extract departments and managers - sop_doc = DocumentParser() - departments_and_roles = sop_doc.extract_departments_and_managers(docs) - - if not departments_and_roles or not departments_and_roles.get('departments'): - return False - - departments_with_sops = [] - - for department in departments_and_roles['departments']: - managers_with_sops = [] - for role in department['managerial_roles']: - prompt = get_sop_for_department_managers() - response = self.client.beta.chat.completions.parse( - model=self.model, - messages=[ - {"role": "system", "content": prompt}, - {"role": "user", "content": f"Generate SOPs for {role['title']} in {department['name']} department."} - ], - response_format=ManagerSOPs, - max_tokens=1024, - temperature=0.1 - ) - manager_sops = json.loads(response.choices[0].message.content) - managers_with_sops.append(ManagerWithSOPs(title=role['title'], sops=manager_sops)) - - departments_with_sops.append(DepartmentManagerSOPs( - name=department['name'], - managers=managers_with_sops - )) - - return ExecutiveManagerSOPsResponse(departments=departments_with_sops) - - except Exception as e: - print(f"Error in generate_sops_for_department_managers: {str(e)}") - return False - - - def generate_sops_from_questionnaire(self, questionnaire_data: dict, executives: List[str], managers: List[str], departments: List[str]): - try: - prompt = get_sop_executive_from_questionnaire() - - # Prepare the questionnaire data for the prompt - user_content = json.dumps(questionnaire_data, indent=2) - - response = self.client.beta.chat.completions.parse( - model=self.model, - messages=[ - {"role": "system", "content": prompt}, - {"role": "user", "content": f"Generate SOPs based on this questionnaire:\n{user_content}\n\nExecutives to consider: {', '.join(executives)}\nManagers to consider: {', '.join(managers)}\nDepartments to consider: {', '.join(departments)}"} - ], - response_format={"type": "json_object"}, - max_tokens=4096, - temperature=0.1 - ) - - sops_data = json.loads(response.choices[0].message.content) - - # Process executive SOPs - executive_sops = {} - for executive in executives: - if executive in sops_data['executives']: - executive_sops[executive] = Categories(**sops_data['executives'][executive]) - else: - executive_sops[executive] = Categories() - - # Process department manager SOPs - departments_with_sops = [] - for dept_name in departments: - dept_data = next((d for d in sops_data['departments'] if d['name'].lower() == dept_name.lower()), None) - if dept_data: - managers_with_sops = [ - ManagerWithSOPs( - title=manager, - sops=ManagerSOPs(**dept_data['managers']) - ) - for manager in managers - - ] - if managers_with_sops: - departments_with_sops.append(DepartmentManagerSOPs( - name=dept_name, - managers=managers_with_sops - )) - - return { - "executive_sops": executive_sops, - "department_sops": ExecutiveManagerSOPsResponse(departments=departments_with_sops) - } - - except Exception as e: - print(f"Error in generate_sops_from_questionnaire: {str(e)}") - return False - - - + diff --git a/test.py b/test.py index bb19d4d..ba79129 100644 --- a/test.py +++ b/test.py @@ -9,6 +9,34 @@ docs = load_document(file_path) if __name__ == "__main__": SOP = DocumentParser() so = SopGeneratorExecutive() - info = SOP.extract_departments_and_managers_workers(docs) - print(info) + referencs_roles = ["AR Director "] + workers_list = [ + { + + "position": "AR dIRECTOR ", + "role": "Developer", + "department": "IT" + }, + { + "name": "Jane Smith", + "position": "Project Manager", + "role": "Manager", + "department": "IT" + } +] + + departments_and_roles = SOP.extract_sops_for_workers_by_department(docs,workers_list) + # Prepare extracted roles (only managers) + '''extracted_managers = [] + for department in departments_and_roles['departments']: + extracted_managers.extend([ + { + 'name': manager['name'], + 'position': manager.get('position', 'Unknown Position'), + 'role': manager.get('role', 'Unknown Role') # PRP or SRP classification + } + for manager in department['managers'] + ])''' + + print(departments_and_roles)