This commit is contained in:
2025-01-31 15:59:51 +00:00
parent 8ce331b023
commit 1a68b4407e
14 changed files with 5436 additions and 274 deletions
+13
View File
@@ -0,0 +1,13 @@
assessment_id,assessment_name,start_date,user_name,total_assigned_items,completed_items,area
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Agile Methodologies
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,API Developement
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Code Review
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Collaboration with Cross-Functional Teams
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Continuous Integration/Continuous Deployment (CI/CD)
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Debugging Techniques
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Documentation
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Performance Optimization
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,System Design
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Unit Testing
1,Test Assesment,2024-10-14T00:00:00.000Z,Saba JoeJoe,36,1,Version Control
1,Test Assesment,2024-10-14T00:00:00.000Z,Tahsin Protik,1,0,Performance Optimization
1 assessment_id assessment_name start_date user_name total_assigned_items completed_items area
2 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Agile Methodologies
3 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 API Developement
4 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Code Review
5 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Collaboration with Cross-Functional Teams
6 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Continuous Integration/Continuous Deployment (CI/CD)
7 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Debugging Techniques
8 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Documentation
9 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Performance Optimization
10 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 System Design
11 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Unit Testing
12 1 Test Assesment 2024-10-14T00:00:00.000Z Saba JoeJoe 36 1 Version Control
13 1 Test Assesment 2024-10-14T00:00:00.000Z Tahsin Protik 1 0 Performance Optimization
File diff suppressed because it is too large Load Diff
+170
View File
@@ -0,0 +1,170 @@
import requests
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
import os
DATA_KEY = os.getenv("AI_DATA_KEY")
# Constants for API requests
URL = "https://erpai.mkdlabs.com/v3/api/custom/erpai/common/get-data-ai"
HEADERS = {
"x-project": DATA_KEY # Replace with your actual key
}
# JSON bodies for API requests
def create_json_body(area_type, company_id):
return {
"type": area_type,
"options": {
"company_id": company_id
}
}
# Function to fetch data from the API
def fetch_data(json_body):
json_body["options"]["company_id"] = json_body["options"].get("company_id") # Ensure company_id is included
response = requests.post(URL, headers=HEADERS, json=json_body)
response.raise_for_status() # Raise an error for bad responses
return response.json()
def convert_assessment_data_to_dataframe(assessment_data):
if not assessment_data or "data" not in assessment_data or not assessment_data["data"]:
# Return empty DataFrame with expected columns
return pd.DataFrame(columns=[
"assessment_id", "assessment_name", "start_date", "open_items_overall",
"completed_items_overall", "total_assigned_items_overall", "user_name",
"user_total_assigned_items", "user_completed_items", "area", "red_flags"
])
df_assessment = []
for assessment in assessment_data.get("data", []):
assessment_id = assessment.get("assessment_id", "")
assessment_name = assessment.get("assessment_name", "")
start_date = assessment.get("start_date", "")
open_items = assessment.get("open_items", 0)
completed_items = assessment.get("completed_items", 0)
total_assigned_items = assessment.get("total_assigned_items", 0)
red_flags = assessment.get("red_flags", 0)
for user in assessment.get("user_details", []):
user_name = user.get("name", "")
user_total_items = user.get("total_assigned_items", 0)
user_completed_items = user.get("completed_items", 0)
for area in user.get("area_list", []):
df_assessment.append({
"assessment_id": assessment_id,
"assessment_name": assessment_name,
"start_date": start_date,
"open_items_overall": open_items,
"completed_items_overall": completed_items,
"total_assigned_items_overall": total_assigned_items,
"user_name": user_name,
"user_total_assigned_items": user_total_items,
"user_completed_items": user_completed_items,
"area": area,
"red_flags": red_flags
})
return pd.DataFrame(df_assessment)
def generate_summary_statistics(df):
if df.empty:
return {
"total_assessments": 0,
"avg_open_items_per_assessment": 0,
"avg_completed_items_per_assessment": 0,
"avg_total_assigned_items_per_assessment": 0,
"avg_red_flags": 0,
"total_users": 0,
"avg_user_total_assigned_items": 0,
"avg_user_completed_items": 0,
"completion_rate_per_user": 0,
"area_summary": {}
}
total_assessments = df['assessment_id'].nunique()
avg_open_items = df.groupby('assessment_id')['open_items_overall'].mean().mean()
avg_completed_items = df.groupby('assessment_id')['completed_items_overall'].mean().mean()
avg_total_assigned_items = df.groupby('assessment_id')['total_assigned_items_overall'].mean().mean()
avg_red_flags = df['red_flags'].mean()
total_users = df['user_name'].nunique()
avg_user_total_items = df.groupby('user_name')['user_total_assigned_items'].mean().mean()
avg_user_completed_items = df.groupby('user_name')['user_completed_items'].mean().mean()
completion_rate_per_user = (df['user_completed_items'].sum() / df['user_total_assigned_items'].sum()) * 100 if df['user_total_assigned_items'].sum() > 0 else 0
area_summary = df['area'].value_counts()
return {
"total_assessments": total_assessments,
"avg_open_items_per_assessment": avg_open_items,
"avg_completed_items_per_assessment": avg_completed_items,
"avg_total_assigned_items_per_assessment": avg_total_assigned_items,
"avg_red_flags": avg_red_flags,
"total_users": total_users,
"avg_user_total_assigned_items": avg_user_total_items,
"avg_user_completed_items": avg_user_completed_items,
"completion_rate_per_user": completion_rate_per_user,
"area_summary": area_summary.to_dict()
}
def generate_extended_statistics(df):
if df.empty:
return {
"top_5_efficient_users": {},
"bottom_5_least_efficient_users": {},
"areas_with_most_uncompleted_items": {}
}
df['user_completion_rate'] = (df['user_completed_items'] / df['user_total_assigned_items']).fillna(0) * 100
top_5_efficient_users = df.groupby('user_name')['user_completion_rate'].mean().nlargest(5).to_dict()
bottom_5_least_efficient_users = df.groupby('user_name')['user_completion_rate'].mean().nsmallest(5).to_dict()
df['uncompleted_items'] = df['user_total_assigned_items'] - df['user_completed_items']
areas_with_most_uncompleted_items = df.groupby('area')['uncompleted_items'].sum().nlargest(5).to_dict()
return {
"top_5_efficient_users": top_5_efficient_users,
"bottom_5_least_efficient_users": bottom_5_least_efficient_users,
"areas_with_most_uncompleted_items": areas_with_most_uncompleted_items
}
def generate_problematic_area_statistics(df):
if df.empty:
return pd.DataFrame(columns=["total_open_items", "total_red_flags"])
total_open_items = df.groupby('name')['open_items'].sum().sort_values(ascending=False)
total_red_flags = df.groupby('name')['red_flags'].sum().sort_values(ascending=False)
return pd.DataFrame({
"total_open_items": total_open_items,
"total_red_flags": total_red_flags
}).fillna(0)
def generate_summary_stats(assessment_data, area_data):
# Handle empty or invalid assessment data
assessment_df = convert_assessment_data_to_dataframe(assessment_data)
# Handle empty or invalid area data
problematic_area_df = pd.DataFrame(area_data.get("data", []) if area_data and "data" in area_data else [])
summary_stats = generate_summary_statistics(assessment_df)
extended_stats = generate_extended_statistics(assessment_df)
summary_stats["users(Workers) based stats"] = extended_stats
problematic_stats = generate_problematic_area_statistics(problematic_area_df)
summary_stats["Area based stats"] = problematic_stats.to_dict(orient='index')
return summary_stats
if __name__ == "__main__":
json_body_area = create_json_body("problematic-areas", 106)
json_body_assessment = create_json_body("user-stats-by-assessment", 106)
# Fetching data
problematic_areas_data = fetch_data(json_body_area)
assessment_data = fetch_data(json_body_assessment)
example_result = generate_summary_stats(assessment_data, problematic_areas_data)
print(example_result)
+6 -6
View File
@@ -70,11 +70,11 @@ def predict_next_n_assessments():
try: try:
# Retrieve JSON data from the request # Retrieve JSON data from the request
data = request.get_json() data = request.get_json()
company_info = data.get('company_info') #company_info = data.get('company_info')
companyid = data.get('companyid') companyid = data.get('companyid')
N = data.get('N') N = data.get('N')
if not company_info or not companyid or N is None: if not companyid or N is None:
return jsonify({"error": "Missing data", "message": "Company info, company ID, or N value not provided."}), 400 return jsonify({"error": "Missing data", "message": "Company info, company ID, or N value not provided."}), 400
# Instantiate the chatbot service # Instantiate the chatbot service
@@ -82,7 +82,7 @@ def predict_next_n_assessments():
# Call the prediction method # Call the prediction method
response = chatbot.predict_next_n_assessment( response = chatbot.predict_next_n_assessment(
company_info=company_info, #company_info=company_info,
companyid=companyid, companyid=companyid,
N=N N=N
) )
@@ -96,7 +96,7 @@ def predict_next_n_assessments():
@bot.route('/use_bot_predict_assessments', methods=['POST']) @bot.route('/direct-prompt-bot', methods=['POST'])
@auth_check @auth_check
def use_bot_predict_assessments(): def use_bot_predict_assessments():
try: try:
@@ -106,8 +106,8 @@ def use_bot_predict_assessments():
companyid = data.get('companyid') companyid = data.get('companyid')
query = data.get('query') query = data.get('query')
if not company_info or not companyid or query is None: if not companyid or query is None:
return jsonify({"error": "Missing data", "message": "Company info, company ID, or query value not provided."}), 400 return jsonify({"error": "Missing data", "message": "company ID, or query value not provided."}), 400
# Instantiate the chatbot service # Instantiate the chatbot service
chatbot = Chatbot() chatbot = Chatbot()
+189 -27
View File
@@ -7,6 +7,7 @@ from src.utils.auth import auth_check
from src.utils.utils import delete_all_files_in_directory from src.utils.utils import delete_all_files_in_directory
from src.utils.document_loader import load_document from src.utils.document_loader import load_document
from flask import Blueprint, jsonify, request, make_response
import json import json
# Initialize the Blueprint # Initialize the Blueprint
sops_bp = Blueprint('sops', __name__) sops_bp = Blueprint('sops', __name__)
@@ -74,7 +75,7 @@ def get_roles_questionnaire():
if not questionnaire_data.get('questionnaire_response'): if not questionnaire_data.get('questionnaire_response'):
return jsonify({ return jsonify({
"error": "Missing required fields", "error": "Missing required fields",
"message": "Please provide questionnaire_data in the request body" "message": "Please provide questionnaire_response in the request body"
}), 400 }), 400
generator = SopPersonalAssessment() generator = SopPersonalAssessment()
@@ -195,28 +196,32 @@ def generate_sops_by_roles_and_areas():
Generate SOPs based on the roles, SOP types (will, shall, must), and areas provided in the request body. Generate SOPs based on the roles, SOP types (will, shall, must), and areas provided in the request body.
""" """
try: try:
# Get the roles data from the request body
# Get the roles and qna data from the request body
roles = request.json.get('roles', None) roles = request.json.get('roles', None)
qna = request.json.get('qna', None)
sop_generator = SopPersonalAssessment() sop_generator = SopPersonalAssessment()
# Validate the presence of roles data # Validate the presence of roles and qna data
if not roles or not isinstance(roles, list): if not roles or not isinstance(roles, list):
return jsonify({"error": "Invalid input", "message": "The 'roles' field should be a non-empty list."}), 400 return make_response(jsonify({"error": "Invalid input", "message": "The 'roles' field should be a non-empty list."}), 400)
# Generate SOPs for all roles at once # Generate SOPs for all roles at once
sops_response = sop_generator.generate_sops_by_role_and_area(roles=roles) sops_response = sop_generator.generate_sops_by_role_and_area(roles=roles,qna=qna)
if not sops_response:
return jsonify({ return make_response(jsonify({
"sops": sops_response, "error": True,
"message": "SOPs successfully generated for all provided roles." "message": "Error in generating sops, please try again"
}), 200 }), 200)
# Return the generated SOPs
return make_response(jsonify(sops_response), 200)
except Exception as e: except Exception as e:
return jsonify({ return make_response(jsonify({
"error": "Processing error", "error": "Processing error",
"message": f"An error occurred while generating SOPs: {str(e)}" "message": f"An error occurred while generating SOPs: {str(e)}"
}), 500 }), 500)
@@ -297,6 +302,76 @@ def generate_executive_sops_from_doc():
return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400 return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400
@sops_bp.route('/executive/generate_mission_vision_doc', methods=['POST'])
@auth_check
def generate_executive_goals_from_doc():
"""
Generate SOPs for executives based on a document containing vision and mission.
"""
# Check if the POST request has the file part and roles
if 'document' not in request.files:
return jsonify({"error": "No file part", "message": "Please upload a file with the key 'document'."}), 400
if 'departments' not in request.form:
return jsonify({"error": "department missing", "message": "Please provide departments'."}), 400
try:
departments = request.form.get('departments')
# Manually load roles from the string to JSON
departments = json.loads(departments)
except json.JSONDecodeError:
return jsonify({"error": "Invalid JSON", "message": "The departments must be a valid JSON array."}), 400
except ValueError as e:
return jsonify({"error": "Invalid roles format", "message": str(e)}), 400
file = request.files['document']
# If the user does not select a file, the browser may also submit an empty part without filename
if file.filename == '':
return jsonify({"error": "No selected file", "message": "A file was not selected for upload. Please select a valid file."}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_folder = current_app.config['UPLOAD_FOLDER']
file_path = os.path.join(upload_folder, filename)
# Save the file to the upload folder
file.save(file_path)
try:
# Use the utility function to generate docs from the file
docs = load_document(file_path)
sop_doc = DocumentParser()
vision_mission = sop_doc.extract_vision_mission2(docs,departments)
if not vision_mission:
return jsonify({"error": "Vision and Mission generation error ", "message": "Error in generating mssion and viso."}), 400
# Check if both vision and mission are empty
if not vision_mission.get('vision') and not vision_mission.get('mission'):
# Cleanup: Delete all files in the upload directory if parsing fails
delete_all_files_in_directory(upload_folder)
return jsonify({"vision": [], "mission": [], "message": "The document does not contain mission and vision."}), 200
print(f"Vision and mission: {vision_mission}")
vission = vision_mission.get('vision')
mission = vision_mission.get('mission')
return jsonify({"mission": mission, "vission": vission, "message": "vision and mission generated successfully"}), 200
except Exception as e:
# Cleanup: Delete all files in the upload directory if an error occurs
delete_all_files_in_directory(upload_folder)
return jsonify({"error": "Processing error", "message": f"An error occurred while processing the document: {str(e)}"}), 500
return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400
@sops_bp.route('/executive/generate_sop_managers_doc', methods=['POST']) @sops_bp.route('/executive/generate_sop_managers_doc', methods=['POST'])
@auth_check @auth_check
@@ -340,6 +415,24 @@ def generate_sop_managers_doc():
return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400 return jsonify({"error": "File type not allowed", "message": "The uploaded file type is not allowed. Please upload a PDF, DOC, or DOCX file."}), 400
@sops_bp.route('/executive/generate_dept_sops_from_questionnaire', methods=['POST'])
@auth_check
def generate_sops_dept_from_questionnaire():
try:
data = request.json
questionnaire_data = data.get('questionnaire')
sop_generator = SopGeneratorExecutive()
result = sop_generator.generate_sops_from_questionnaire2(questionnaire_data)
if not result:
return make_response(jsonify({"error": "Processing error", "message": "Failed to generate SOPs from questionnaire."}), 500)
return make_response(jsonify({"sops": result, "message": "SOPs successfully generated from questionnaire."}), 200)
except Exception as e:
return make_response(jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500)
@sops_bp.route('/executive/generate_sops_from_questionnaire', methods=['POST']) @sops_bp.route('/executive/generate_sops_from_questionnaire', methods=['POST'])
@auth_check @auth_check
@@ -348,30 +441,39 @@ def generate_sops_from_questionnaire():
data = request.json data = request.json
questionnaire_data = data.get('questionnaire') questionnaire_data = data.get('questionnaire')
executives = data.get('executives', []) executives = data.get('executives', [])
managers = data.get('managers', [])
departments = data.get('departments', [])
if not questionnaire_data or not executives or not managers or not departments: if not questionnaire_data or not executives:
return jsonify({"error": "Missing data", "message": "Please provide questionnaire data, executives, managers, and departments."}), 400 return make_response(jsonify({"error": "Missing data", "message": "Please provide questionnaire data, executives, managers, and departments."}), 400)
sop_generator = SopGeneratorExecutive() sop_generator = SopGeneratorExecutive()
result = sop_generator.generate_sops_from_questionnaire(questionnaire_data, executives, managers, departments) result = sop_generator.generate_sops_from_questionnaire(questionnaire_data, executives)
if not result: if not result:
return jsonify({"error": "Processing error", "message": "Failed to generate SOPs from questionnaire."}), 500 return make_response(jsonify({"error": "Processing error", "message": "Failed to generate SOPs from questionnaire."}), 500)
# Convert Pydantic models to dictionaries return make_response(jsonify({"sops": result, "message": "SOPs successfully generated from questionnaire."}), 200)
serializable_result = {
"executive_sops": {exec: sops.dict() for exec, sops in result["executive_sops"].items()},
"department_sops": result["department_sops"].dict()
}
return jsonify({"sops": serializable_result, "message": "SOPs successfully generated from questionnaire."}), 200
except Exception as e: except Exception as e:
return jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500 return make_response(jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500)
@sops_bp.route('/executive/generate_mission_goals_from_questionnaire', methods=['POST'])
@auth_check
def generate_vision_goals_quest():
try:
data = request.json
questionnaire_data = data.get('questionnaire')
sop_generator = SopGeneratorExecutive()
vision_mission = sop_generator.generate_vision_mission_from_questionnaire(questionnaire_data)
vission = vision_mission.get('vision')
mission = vision_mission.get('mission')
return jsonify({"mission": mission, "vission": vission, "message": "vision and mission generated successfully"}), 200
except Exception as e:
return make_response(jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500)
@sops_bp.route('/executive/get_roles_for_reference_managers', methods=['POST']) @sops_bp.route('/executive/get_roles_for_reference_managers', methods=['POST'])
@@ -416,6 +518,66 @@ def get_roles_for_reference_managers():
@sops_bp.route('/executive/extract_vision_dept_goals_doc', methods=['POST'])
@auth_check
def get_vision_dpt_goals():
try:
# Check Content-Type
if 'multipart/form-data' not in request.content_type:
return jsonify({"error": "Invalid Content-Type", "message": "Request must have Content-Type 'multipart/form-data'."}), 415
# Retrieve uploaded file
file = request.files.get('document')
if not file:
return jsonify({"error": "Missing data", "message": "Document not provided."}), 400
if file.filename == '':
return jsonify({"error": "No selected file", "message": "No file selected for upload. Please select a valid file."}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_folder = current_app.config['UPLOAD_FOLDER']
file_path = os.path.join(upload_folder, filename)
file.save(file_path)
docs = load_document(file_path)
# Use extractor to extract roles from the document
extractor = DocumentParser()
dept_goals = extractor.extract_dept_goals(docs=docs)
if not dept_goals:
return jsonify({"error": "Processing error", "message": "Error generating dept goals"}), 404
return jsonify({"response": dept_goals, "message": "Vision and dept goals successfully generated."}), 200
except Exception as e:
return jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500
@sops_bp.route('/executive/extract_vision_dept_goals-questionnaire', methods=['POST'])
@auth_check
def generate_dept_goals_quest():
try:
data = request.json
questionnaire_data = data.get('questionnaire')
if not questionnaire_data:
return make_response(jsonify({"error": "Missing data", "message": "Please provide questionnaire data"}), 400)
sop_generator = SopGeneratorExecutive()
result = sop_generator.generate_dept_goals_from_questionnaire(questionnaire_data)
if not result:
return make_response(jsonify({"error": "Processing error", "message": "Failed to generate SOPs from questionnaire."}), 500)
return make_response(jsonify({"sops": result, "message": "SOPs successfully generated from questionnaire."}), 200)
except Exception as e:
return make_response(jsonify({"error": "Processing error", "message": f"An error occurred while processing the request: {str(e)}"}), 500)
@sops_bp.route('/manager/get_roles_for_reference_workers', methods=['POST']) @sops_bp.route('/manager/get_roles_for_reference_workers', methods=['POST'])
+18
View File
@@ -12,6 +12,9 @@ class RoleSops(BaseModel):
role:str role:str
sops:Categories sops:Categories
class RoleSopssLists(BaseModel):
sops:List[RoleSops]
#class RoleSOPs(BaseModel): #class RoleSOPs(BaseModel):
# sops: SOPs # sops: SOPs
class Roles_response(BaseModel): class Roles_response(BaseModel):
@@ -28,6 +31,14 @@ class SOPsResponse(BaseModel):
mission: List[str] mission: List[str]
vission:List[str] vission:List[str]
class DeptMisiion(BaseModel):
departments: str
goals: List[str]
class VisionMissionResponse2(BaseModel):
vision: List[str]
mission: List[str]
class VisionMissionResponse(BaseModel): class VisionMissionResponse(BaseModel):
vision: List[str] vision: List[str]
mission: List[str] mission: List[str]
@@ -117,3 +128,10 @@ class RolesComparisonResponse(BaseModel):
class DeptGoal(BaseModel):
name :str
goals: List[str]
class DeptGoalsVisssion(BaseModel):
vision: List[str]
mission: List[str]
department_goals: List[DeptGoal]
+104 -56
View File
@@ -17,61 +17,26 @@ def validate_worker_prompt() -> str:
result:"validated" result:"validated"
} }
""" """
def predict_based_past_assessment_prompt(query,company_info, summary_stats): def predict_based_past_assessment_prompt(query,company_info, summary_stats):
# Extract company information from the dictionary # Extract company information from the dictionary
company_name = company_info['company_name']
company_size = company_info['company_size']
departments = company_info['departments']
# Create the prompt with the provided company info and summary statistics # Create the prompt with the provided company info and summary statistics
prompt = f""" prompt = f"""
**Prompt for the Chatbot:** **Prompt for the Chatbot:**
**Context:** **Context:**
You are an AI assistant working for {company_name}, and your primary responsibility is to provide **insights**, **predictions**, and **recommendations** based on the company's past assessment data and organizational structure. You are not allowed to respond to any queries outside of this domain. You are an AI assistant and work as assessment anaylyst, and your primary responsibility is to provide **insights**, **predictions**, and **recommendations** based on the company's past assessment data and organizational structure. You are not allowed to respond to any queries outside of this domain.
**General Company Information:** **General Company Information:**
- **Company Name**: {company_name} Company info is {company_info}
- **Company Size**: {company_size} (e.g., Small, Medium, Large)
- **Departments**:
{', '.join(departments)}
**Assessment Summary**: **Assessment Summary**:
The following is a detailed summary of past assessments at {company_name}. Use this information to provide predictions and recommendations based on trends and data points. The following is a detailed summary of past assessments will be provided. Use this information to provide predictions and recommendations based on trends and data points.
- **Open Items and Red Flags**: Past assessment summary : {summary_stats}
- Total Open Items: {summary_stats['Open Items and Red Flags']['Total Open Items']}
- Average Open Items per Assessment: {summary_stats['Open Items and Red Flags']['Average Open Items per Assessment']}
- Total Red Flags: {summary_stats['Open Items and Red Flags']['Total Red Flags']}
- Average Red Flags per Assessment: {summary_stats['Open Items and Red Flags']['Average Red Flags per Assessment']}
- Max Red Flags in a Single Assessment: {summary_stats['Open Items and Red Flags']['Max Red Flags in a Single Assessment']}
- Most Common Area with Red Flags: {summary_stats['Open Items and Red Flags']['Most Common Area with Red Flags']}
- **Assessment Frequency**:
- Weekly: {summary_stats['Assessment Frequency']['Assessment Type Breakdown'].get('Weekly', 0) * 100}%
- Bi-Weekly: {summary_stats['Assessment Frequency']['Assessment Type Breakdown'].get('Bi-Weekly', 0) * 100}%
- Quarterly: {summary_stats['Assessment Frequency']['Assessment Type Breakdown'].get('Quarterly', 0) * 100}%
- Average Time Between Assessments: {summary_stats['Assessment Frequency']['Average Time Between Assessments']} days
- Average Assessment Duration: {summary_stats['Assessment Frequency']['Average Assessment Duration']} days
- **Assessment Start and End Dates**:
- Longest Assessment Duration: {summary_stats['Assessment Start and End Dates']['Longest Assessment Duration (days)']} days
- Shortest Assessment Duration: {summary_stats['Assessment Start and End Dates']['Shortest Assessment Duration (days)']} days
- **Assessment Areas**:
- Most Assessed Area: {summary_stats['Assessment Areas']['Most Assessed Area']}
- Most Open Items in Area: {summary_stats['Assessment Areas']['Most Open Items in Area']}
- Area with Most Red Flags: {summary_stats['Assessment Areas']['Area with Most Red Flags']}
- **Assessment Status**:
- Completed: {summary_stats['Assessment Status']['Assessment Status Distribution'].get('Completed', 0) * 100}%
- In Progress: {summary_stats['Assessment Status']['Assessment Status Distribution'].get('In Progress', 0) * 100}%
- Incomplete: {summary_stats['Assessment Status']['Assessment Status Distribution'].get('Incomplete', 0) * 100}%
- **Assessment Admin**:
- Most Frequent Admin: {summary_stats['Assessment Admin']['Most Frequent Admin']}
- Admin with Fewest Red Flags: {summary_stats['Assessment Admin']['Admin with Fewest Red Flags']}
- Admin with Most Open Items: {summary_stats['Assessment Admin']['Admin with Most Open Items']}
**Instructions:** **Instructions:**
Use the above information to answer user queries. You should: Use the above information to answer user queries. You should:
@@ -102,38 +67,38 @@ def predict_next_n_assessments_prompt():
**Prompt for the Chatbot:** **Prompt for the Chatbot:**
**Context:** **Context:**
You are an AI assistant responsible for analyzing the past assessment data of , and your primary responsibility is to provide **predictions** for the next {n} assessments. You are an AI assistant responsible for thoroughly analyzing the past assessment data of a company. Your primary responsibility is to provide **dynamic predictions** for the next {n} assessments, considering different scenarios based on the frequency of assessments: **weekly**, **bi-weekly**, or **quarterly**. Use the company's past performance to predict the following for each of the next {n} assessments:
These assessments can occur on a **weekly**, **bi-weekly**, or **quarterly** basis. Use the company's past performance to predict the following for each of the next {n} assessments:
- **Number of Open Items**. - **Number of Open Items**.
- **Number of Red Flags**. - **Number of Red Flags**.
- **Predictions for Weekly, Bi-Weekly, and Quarterly assessments**. - **Predictions for Weekly, Bi-Weekly, and Quarterly assessments**.
input :
- company basic info **Input:**
- past assessment statitics - Company basic info
- Past assessment statistics
- N - number of next assessments to be predicted - N - number of next assessments to be predicted
**General Company Information:** **General Company Information:**
**Assessment Summary (Past Data)**: **Assessment Summary (Past Data)**:
The Detailed information on past asssessment will be provided. Use this information to make predictions for the next {n} assessments. Detailed information on past assessments will be provided. Use this information to make informed predictions for the next {n} assessments, taking into account the frequency of assessments.
**Instructions**: **Instructions**:
- Predict the number of open items and red flags for the next n assessments if they are conducted on a weekly, bi-weekly, or quarterly basis. - Analyze the historical data to predict the number of open items and red flags for the next n assessments based on whether they are conducted weekly, bi-weekly, or quarterly.
- Use the historical summary statistics provided above to guide your predictions. - Consider the implications of each frequency on the predictions, and ensure your analysis reflects the potential outcomes for each scenario.
- Return the response in the following JSON format: - Return the response in the following JSON format:
**Response Format**: **Response Format**:
{ {
"assessment 1": [ "assessment_1": [
{ {
"weekly": {"open_items": X, "red_flags": Y}}, "weekly": {"open_items": X, "red_flags": Y},
"biweekly": {{"open_items": X, "red_flags": Y}}, "biweekly": {"open_items": X, "red_flags": Y},
"quarterly": {{"open_items": X, "red_flags": Y}} "quarterly": {"open_items": X, "red_flags": Y}
} }
], ],
"assessment 2": [ "assessment_2": [
{ {
"weekly": {"open_items": X, "red_flags": Y}, "weekly": {"open_items": X, "red_flags": Y},
"biweekly": {"open_items": X, "red_flags": Y}, "biweekly": {"open_items": X, "red_flags": Y},
@@ -143,12 +108,95 @@ def predict_next_n_assessments_prompt():
// assuming N is 2 // assuming N is 2
} }
``` ```
Ensure each assessment is provided with three predictions: one for Weekly, one for Bi-Weekly, and one for Quarterly assessments. Ensure each assessment is provided with three predictions: one for Weekly, one for Bi-Weekly, and one for Quarterly assessments, and analyze the data dynamically to reflect the different scenarios.
""" """
return prompt return prompt
def predict_next_n_assessments_prompt_v2():
prompt = """
You are an AI analyst specializing in company team assessments. Using the provided summary statistics, predict the next N assessments.
**Input:**
1. Summary Statistics containing:
- Total assessments completed
- Average open/completed items per assessment
- User efficiency metrics
- Area-based statistics
- Red flag distributions
- Team completion rates
2. N: number of future assessments to predict
**Validation Rules:**
1. Ensure summary statistics are complete and consistent
2. Check number of historical assessments:
- If only 1 assessment: Mark as "invalid" with reason explaining need for more data
- If 2+ assessments: Proceed with predictions
3. Verify that area-based stats align with overall metrics
**Analysis Instructions:**
1. Use the average metrics as baseline predictions
2. Adjust predictions based on:
- Current completion rates per user
- Area-specific trends in open items and red flags
- Distribution of work across different areas
- Historical red flag patterns
3. Consider frequency impacts:
- Weekly: More frequent checks = faster issue detection and resolution
- Biweekly: Moderate item accumulation
- Quarterly: Longer accumulation period but more time for resolution
**Output Format:**
{
"state": "valid" | "invalid",
"reason": string | null, // Required if state is "invalid"
"predictions": [
{
"assessment_1": {
"weekly": {
"open_items": number,
"red_flags": number
},
"biweekly": {
"open_items": number,
"red_flags": number
},
"quarterly": {
"open_items": number,
"red_flags": number
}
}
},
// ... N predictions
]
}
NOTE: Do not add extra anything also any ``JSON`` formatter tags. Only output JSON.
Use the following guidelines for predictions:
1. Weekly predictions should be based on:
- avg_open_items_per_assessment
- completion_rate_per_user
- current area-based distribution of open items
2. Biweekly predictions should consider:
- Accumulation over two weeks
- Historical completion patterns
- Area-specific red flag frequencies
3. Quarterly predictions should account for:
- Longer-term accumulation patterns
- Team efficiency metrics
- Historical area-based completion rates
Make predictions that reflect realistic patterns based on the team's performance metrics and area-specific challenges.
"""
return prompt
def recommend_assessment_frequency_prompt(): def recommend_assessment_frequency_prompt():
return ''' return '''
You are provided with the Standard Operating Procedures (SOPs) for various roles within a company, along with options (e.g., ['weekly', 'monthly', 'biweekly']) for how frequently assessments should be performed. Your task is to recommend the best assessment type and frequency (weekly, biweekly, or quarterly) for all employees, based on the overall nature of the SOPs. You are provided with the Standard Operating Procedures (SOPs) for various roles within a company, along with options (e.g., ['weekly', 'monthly', 'biweekly']) for how frequently assessments should be performed. Your task is to recommend the best assessment type and frequency (weekly, biweekly, or quarterly) for all employees, based on the overall nature of the SOPs.
+8 -6
View File
@@ -24,6 +24,8 @@ def get_questions_prompt():
3. All questions are "yes" or "no" questions nothing extra and precise ,not long 3. All questions are "yes" or "no" questions nothing extra and precise ,not long
4. Generate a total of at least 20 questions all rounda based on the sops and roles for each frequency number 4. Generate a total of at least 20 questions all rounda based on the sops and roles for each frequency number
5. make sure the questions are up to 20 for the current frequency 5. make sure the questions are up to 20 for the current frequency
NOTE: !!! MAKE SURE YOU CORRECTLY ATTACH "assigned_to" AS THE ID OF THE MEMBER OF THE ROLE AS STATED IN THE SOP. CHECK MEMBERS UNDER THE ROLE IN THE PROVIDED SOP AND USE THE CORRECT ID OF THE MEMBER, DO NOT USE MEMBER iD THAT IS NOT PROVIDED AS "assigned_to" pls !!!
Example response: Example response:
questions questions
@@ -34,7 +36,7 @@ def get_questions_prompt():
"frequency_number": "2", "frequency_number": "2",
"questions": [ "questions": [
{ {
"assigned_to": "name", "assigned_to": "id",----id of the member attached to the role, check the member unde the role and attacj the id here
"role": "person role", "role": "person role",
"question": "e.g., Is the internal project team being followed according to the SOP?" "question": "e.g., Is the internal project team being followed according to the SOP?"
"area_tag":"timeline", "area_tag":"timeline",
@@ -47,7 +49,7 @@ def get_questions_prompt():
"frequency_number": "3", "frequency_number": "3",
"questions": [ "questions": [
{ {
"assigned_to": "name", "assigned_to": "id",----id of the member attached to the role, check the member unde the role and attacj the id here
"role": "person role", "role": "person role",
"question": "e.g., Have communication protocols been followed for the task at hand?". "question": "e.g., Have communication protocols been followed for the task at hand?".
"area_tag":"communication", "area_tag":"communication",
@@ -125,7 +127,7 @@ def get_questions_prompt_v3():
5. Tag each question with the provided assigned_to ID (the ID of the person in charge) (e.g., 1, 2, 3, 4, 5). 5. Tag each question with the provided assigned_to ID (the ID of the person in charge) (e.g., 1, 2, 3, 4, 5).
6. The questions should evolve in detail as assessments progress over time. 6. The questions should evolve in detail as assessments progress over time.
7. For each frequency_number, generate at least 15 - 20 questions. 7. For each frequency_number, generate at least 15 - 20 questions.
NOTE: !!! MAKE SURE YOU CORRECTLY ATTACH "assigned_to" AS THE ID OF THE MEMBER OF THE ROLE AS STATED IN THE SOP. CHECK MEMBERS UNDER THE ROLE IN THE PROVIDED SOP AND USE THE CORRECT ID OF THE MEMBER, DO NOT USE MEMBER iD THAT IS NOT PROVIDED AS "assigned_to" pls !!!
Provide the response in the following JSON format: Provide the response in the following JSON format:
{ {
"questions": { "questions": {
@@ -134,10 +136,10 @@ def get_questions_prompt_v3():
"frequency_number": 1, "frequency_number": 1,
"items": [ "items": [
{ {
"area_tag": 5, "area_tag": id of the rea tag,
"assigned_to": 8, "assigned_to": "id",----id of the member attached to the role, check the member unde the role and attacj the id here ,
"questions": "Has the content calendar been developed and shared with the team?", "questions": "Has the content calendar been developed and shared with the team?",
"role": 4 "role": 4"role id" in SOP
}, },
... ...
] ]
+224 -17
View File
@@ -18,6 +18,7 @@ def get_roles_extraction_from_questionnaire():
return '''Your task is to extract the "Roles" from the provided questionnaire responses. return '''Your task is to extract the "Roles" from the provided questionnaire responses.
You must identify and categorize the roles based on the information provided. You must identify and categorize the roles based on the information provided.
Instructions: Instructions:
1. **Roles**: Extract the roles mentioned in the questionnaire. 1. **Roles**: Extract the roles mentioned in the questionnaire.
2. **Vision**: If applicable, extract the vision of the company or organization as it relates to the roles. 2. **Vision**: If applicable, extract the vision of the company or organization as it relates to the roles.
@@ -48,20 +49,53 @@ def get_sop_personalassessment_from_questionnaire():
Provide the generated SOPs based on the questionnaire responses.''' Provide the generated SOPs based on the questionnaire responses.'''
def get_sop_personalassessment_from_area_role(role,areas,sop_types): def get_sop_personalassessment_from_area_role(role,sop_types,qna=None,areas=None):
return f"""Your job is to generate Standard Operating Procedures (SOPs) for the role of "{role}" with a focus on the areas "{areas}" based on the following instructions: if not areas:
areas = "Not provided"
return f"""Your job is to generate Standard Operating Procedures (SOPs) for the role of "{role}" with a focus on the prvided area
"{areas}" based on the following instructions:
Instructions: Instructions:
Categorization: Organize the SOPs under the selected categories: a checkboxex of the three categories "must" , "shall" and "will" Categorization: Organize the SOPs under the selected categories: a checkboxex of the three categories "must" , "shall" and "will"
So use the selected sop types categories: {sop_types} So use the selected sop types categories: {sop_types} #adhere strictly to these sop types alone and make sure they are not missing
Direct Instructions: The SOPs should directly address responsibilities, objectives, and challenges related to the area of "{areas}" for the role of "{role}". Direct Instructions: The SOPs should directly address responsibilities, objectives, and challenges related to the area of "{areas}" for the role of "{role}".
Questions and Anwer context: If extra questions and answers answered based on role is the provided, use that as more context(related to this role)
Contextual Inference: If SOPs for the area are not explicitly stated, infer them from the role and area context provided. Contextual Inference: If SOPs for the area are not explicitly stated, infer them from the role and area context provided.
Empty Lists: If no SOPs are generated, return an empty list for each category.
Format: The SOPs should be direct and concise. Format: The SOPs should be direct and concise.
provided questions and answer: {qna}
INSTRUCTIONS : STRICTLY ADHERE TO THESE INSTRUCTIONS
NOTE: IF AREAS ARE NOT PROVIDED (AREA IS "NOT PROVIDED"), INTUITIVELY PROVIDE THE SOP BASED ON THE ROLE NAME.
NOTE: MAKE SURE SOPS ARE NOT MISSING FOR THE PROVIDED TYPES.
NOTE: FOR SOP TYPES NOT SELECTED RETURN AN EMPTY LIST and not "null" E.G IF "SHALL" AND "WILL" ARE SELECTED BUT "MUST" IS NOT AMONG, MUST WILL BE AN EMPTY LIST
""" """
def get_sop_personalassessment_from_area_rolev2():
return f"""Your job is to generate Standard Operating Procedures (SOPs) for the information provided on different roles role of with a focus on the prvided areas
based on the following instructions:
Instructions:
Categorization: Organize the SOPs under the selected categories: a checkboxex of the three categories "must" , "shall" and "will"
So use the selected sop types categories for the specified role : #adhere strictly to these sop types alone and make sure they are not missing
Direct Instructions: The SOPs should directly address responsibilities, objectives, and challenges related to the area of specified to the specific role".
Questions and Anwer context: If extra questions and answers answered based on roles is the provided, use that as more context to generate the sop
Contextual Inference: If SOPs for the area are not explicitly stated, infer them from the role and area context provided.
Format: The SOPs should be direct and concise.(5-7) bullet point per sop type is okay for each role but 3-4 on average is very good
INSTRUCTIONS : STRICTLY ADHERE TO THESE INSTRUCTIONS
NOTE: IF AREAS ARE NOT PROVIDED (AREA IS "NOT PROVIDED"), INTUITIVELY PROVIDE THE SOP BASED ON THE ROLE NAME.
NOTE: MAKE SURE SOPS ARE NOT MISSING FOR THE PROVIDED TYPES.
NOTE: FOR SOP TYPES NOT SELECTED RETURN AN EMPTY LIST and not "null" E.G IF "SHALL" AND "WILL" ARE SELECTED BUT "MUST" IS NOT AMONG, MUST WILL BE AN EMPTY LIST
NOTE !!!: IF A ROLE POINTS TO A SPECIFIC SOP TYPE (E.G., "SHALL" AND "MUST"), THESE TWO MUST NEVER BE EMPTY FOR THAT ROLE.
: FORMAT: SOPS SHOULD BE CLEAR, DIRECT, AND CONCISE. EACH ROLE SHOULD HAVE 5-7 BULLET POINTS PER SOP TYPE ("WILL," "SHALL," OR "MUST"). FOR COMPLEX ROLES, EACH SOP TYPE MAY HAVE A MAXIMUM OF 7-10 BULLET POINTS, NOT TOTAL ACROSS ALL TYPES, BUT PER SOP TYPE.
"""
def get_sop_executive_from_vision_goals(executive): def get_sop_executive_from_vision_goals(executive):
return f"""Your task is to generate Standard Operating Procedures (SOPs) for the executive namely: {executive}, based on the provided vision and goals/mission. return f"""Your task is to generate Standard Operating Procedures (SOPs) for the executive namely: {executive}, based on the provided vision and goals/mission.
@@ -86,6 +120,8 @@ def get_sop_executive_from_vision_goals(executive):
def get_vision_mission_extraction_from_doc(): def get_vision_mission_extraction_from_doc():
return """Extract vision and mission statements from the document: return """Extract vision and mission statements from the document:
You are provided with an organization document , your goal is to extract vision and mission(or goals) from the document
1. Analyze for explicit or implicit statements. 1. Analyze for explicit or implicit statements.
2. Vision: Long-term aspirations or ideal future state. 2. Vision: Long-term aspirations or ideal future state.
3. Mission: Organization's purpose, core functions, or primary objectives. 3. Mission: Organization's purpose, core functions, or primary objectives.
@@ -93,9 +129,31 @@ def get_vision_mission_extraction_from_doc():
5. Infer from context if not explicit. 5. Infer from context if not explicit.
6. Format as two lists: vision and mission. 6. Format as two lists: vision and mission.
7. Return empty list if none found for either category. 7. Return empty list if none found for either category.
8. If vision and mission is found in the document , extract them as it is ,no changes
NOTE: MAKE SURE YOU EXTRACT EVERY INFORMATION FOUND FOR VISION AND GOALS FROM THE DOCUMENT.DO NOT OMIT ANY
PROVIDE EXTRACTED OR INFERRED VISION AND MISSION STATEMENTS."""
Provide extracted or inferred vision and mission statements."""
def get_vision_mission_extraction_from_doc2():
return """
You are provided with a organization document and the departments in the organization and your role is to extract the vision and mission(alo called goals) from the document
If the vision and mission are clearly stated in the document
- Extract the vision of the organization as they are
- Extract the goals(mission) of each the company based on the provided goals of the department and overall questionairre response
if the vision and mission are not clearly stated in the document
- Infer the vision and mission from the context of the document
- Analyze for explicit or implicit statements.
2. Vision: Long-term aspirations or ideal future state.
3. Mission: Organization's purpose, core functions, or primary objectives.
4. Include multiple statements if found.
5. Infer from context if not explicit.
7. Return empty list if none found for either category.
8. If vision and mission is found in the document , extract them as it is ,no changes
NOTED: if the goal(mission) and vision cant not be found at all, make it empty please
NOTE: MAKE SURE YOU EXTRACT EVERY INFORMATION FOUND FOR VISION AND GOALS FROM THE DOCUMENT.DO NOT OMIT ANY
"""
''' def get_sop_executive_for_managers(): ''' def get_sop_executive_for_managers():
@@ -114,6 +172,26 @@ def get_vision_mission_extraction_from_doc():
Provide the extracted sections exactly as they appear in the document. Provide the extracted sections exactly as they appear in the document.
''' '''
def get_vision_mission_extraction_from_questionnaire_executive():
return """
You are provided with an organization's response from a questionnaire, and your role is to extract the vision and mission (also called goals) from the questionnaire response:
- Generate the vision(at least one paragraph)of the organization based on the questionnaire and
- Generate the goals (mission) of the company based on the provided departmental goals and overall questionairre response
If the vision and mission are not clearly stated in the questionnaire:
- Infer the vision and mission from the context of the questionnaire.
- Analyze for explicit or implicit statements.
1. Vision: Long-term aspirations or ideal future state.
2. Mission: Organization's purpose, core functions, or primary objectives.
3. Include multiple statements if found.
4. Infer from context if not explicitly stated.
5. Return an empty list if none are found for either category.
NOTE: If the goal and mission of a can not be gotten from the questionaire response, make it empty.
NOTE: Ensure you extract every piece of information found for the vision and goals from the questionnaire. DO NOT OMIT ANYTHING.
"""
def get_departments_managers_workers_extraction_prompt(): def get_departments_managers_workers_extraction_prompt():
@@ -215,49 +293,132 @@ def get_sop_for_department_managers():
''' '''
def get_sop_executive_from_questionnaire():
return '''Generate Standard Operating Procedures (SOPs) for specific executives and department managers based on the provided questionnaire responses.
Format the response as a JSON object with the following structure:
{
"executives": {
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
},
"departments": [
{
"name": "Department Name",
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
},
{
"name": "Department Name",
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
}
]
}
Ensure that each specified department has its own set of SOPs and if managers are empty arrays- make sop empty for that department
'''
def get_sop_executive_from_questionnaire(): def get_sop_executive_from_questionnaire():
return '''Generate Standard Operating Procedures (SOPs) for specific executives and department managers based on the provided questionnaire responses. return '''Generate Standard Operating Procedures (SOPs) for specific executives and department managers based on the provided questionnaire responses.
Instructions: Instructions:
1. Use the organizational vision and strategic goals to create overarching SOPs for each executive. 1. Use the organizational vision and strategic goals and department strategic goals to create overarching SOPs for the executives.
2. Use the departmental strategic goals to create specific SOPs for each department's managers. 2. Use the departmental strategic goals and team questionnaire to create specific SOPs for each department.
3. Categorize all SOPs into "must," "shall," and "will" categories. 3. Categorize all SOPs into "must," "shall," and "will" categories.
4. Ensure SOPs are actionable, clear, and directly related to the provided information. 4. Ensure SOPs are actionable, clear, and directly related to the provided information.
5. For executives, focus on high-level, strategic SOPs that align with the overall vision and goals. 5. For executives, focus on high-level, strategic SOPs that align with the overall vision and goals.
6. For department managers, create department-specific SOPs based on their strategic goals. 6. For departments, create SOPs based on their department strategic goals and team questionaiire response
7. Only generate SOPs for the specified departments.
Format the response as a JSON object with the following structure: Format the response as a JSON object with the following structure:
{ {
"executives": { "executives": [{
"Executive Name 1": { "role":"exceutive role name":
"sops": {
"must": ["SOP 1", "SOP 2", ...], "must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...], "shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...] "will": ["SOP 1", "SOP 2", ...]
}, },
"Executive Name 2": { {"role":"exceutitve role 2 name"
"sops": {
"must": ["SOP 1", "SOP 2", ...], "must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...], "shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...] "will": ["SOP 1", "SOP 2", ...]}
}, ],
... ...
}, },
"departments": [ "departments": [
{ {
"name": "Department Name", "name": "Department Name",
"managers": { "managers": [
"role":"manager role name",
"sops":{
"must": ["SOP 1", "SOP 2", ...], "must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...], "shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...] "will": ["SOP 1", "SOP 2", ...]
} },
{
"name": "Department Name 2",
"managers": [
"role":"manager role name",
"sops":{
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
}]
}, },
... ...
] ]
} }
Ensure that each specified department has its own set of SOPs. Ensure that each specified department has its own set of SOPs.if managers are not provided or empty array for a department, make sop for that department empty--> do not form managers and create sop
''' '''
def get_sop_executive_from_questionnaire2():
return '''Generate Standard Operating Procedures (SOPs) for specific department managers based on the provided questionnaire responses.
Instructions:
2. Use the departmental strategic goals and team questionnaire and role bases answers if provided to create specific SOPs for each department.
3. Categorize all SOPs into "must," "shall," and "will" categories.
4. Ensure SOPs are actionable, clear, and directly related to the provided information.
5. For executives, focus on high-level, strategic SOPs that align with the overall vision and goals.
6. For departments, create SOPs based on their department strategic goals and team questionaiire response
Format the response as a JSON object with the following structure:
{"departments": [
{
"name": "Department Name",
"managers": [
"role":"manager role name",
"sops":{
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
},
{
"name": "Department Name 2",
"managers": [
"role":"manager role name",
"sops":{
"must": ["SOP 1", "SOP 2", ...],
"shall": ["SOP 1", "SOP 2", ...],
"will": ["SOP 1", "SOP 2", ...]
}]
},
...
]
}
Ensure that each specified department has its own set of SOPs.if managers are not provided or empty array for a department, make sop for that department empty--> do not form managers and create sop
'''
def get_roles_reference_comparison(): def get_roles_reference_comparison():
prompt = """ prompt = """
You are tasked with comparing a list of reference roles with the extracted roles from a document. You are tasked with comparing a list of reference roles with the extracted roles from a document.
@@ -348,3 +509,49 @@ def get_sop_for_department_managers():
''' '''
def get_dept_vision_mission_extraction_from_doc2():
return """
You are provided with an organizational document and a list of departments within the organization. Your role is to extract the vision of the organization and the goals/mission of each department from the document. Follow these guidelines:
If the vision and mission are clearly stated in the document:
Extract the vision of the organization exactly as stated, without modification.
Extract the goals/mission of each department exactly as provided in the document, ensuring no information is omitted.
If the vision and mission are not clearly stated:
Infer the vision and mission from the context of the document by analyzing explicit or implicit statements.
Vision: Identify the long-term aspirations or ideal future state of the organization.
Mission: Identify the organization's purpose, core functions, or primary objectives, including departmental goals where applicable.
Additional Instructions:
If multiple statements for vision or mission are found, include all relevant statements.
If vision and mission are entirely absent, return an empty list for the missing category.
Ensure all extracted or inferred information is complete and accurately reflects the document's content.
Do not omit any relevant details for vision and mission found in the document.
Note: If the vision or mission is found in the document, extract it exactly as it appears without making any changes.
"""
def get_dept_vision_mission_extraction_from_questionaiire():
return """
You are provided with an questionaiire response within the organization. Your role is to extract the vision of the organization and the goals/mission of each department from the document. Follow these guidelines:
If the vision and mission are clearly stated in the document:
Extract the vision of the organization exactly as stated, without modification.
Extract the goals/mission of each department exactly as provided in the document, ensuring no information is omitted.
If the vision and mission are not clearly stated:
Infer the vision and mission from the context of the document by analyzing explicit or implicit statements.
Vision: Identify the long-term aspirations or ideal future state of the organization.
Mission: Identify the organization's purpose, core functions, or primary objectives, including departmental goals where applicable.
Additional Instructions:
If multiple statements for vision or mission are found, include all relevant statements.
If vision and mission are entirely absent, return an empty list for the missing category.
Ensure all extracted or inferred information is complete and accurately reflects the respone's content.
Do not omit any relevant details for vision and mission found in the document.
"""
+105 -21
View File
@@ -9,9 +9,70 @@ from src.models.sop_response_schemas import *
from src.models.bot_response_schema import * from src.models.bot_response_schema import *
from scripts.assessment_data import generate_summary_stats_v2 from scripts.assessment_data import generate_summary_stats_v2
from dotenv import load_dotenv from dotenv import load_dotenv
from scripts.statistics_data import (generate_summary_stats,create_json_body,fetch_data)
load_dotenv() load_dotenv()
import random
import json
from datetime import datetime, timedelta
import pandas as pd
# Define possible areas
areas = [
"Agile Methodologies", "API Development", "Code Review", "Collaboration with Cross-Functional Teams",
"Continuous Integration/Continuous Deployment (CI/CD)", "Debugging Techniques", "Documentation",
"Performance Optimization", "System Design", "Unit Testing", "Version Control"
]
# Generate random dates
def random_date():
start_date = datetime(2024, 1, 1)
return (start_date + timedelta(days=random.randint(1, 300))).strftime('%Y-%m-%dT00:00:00.000Z')
# Generate dummy assessments
def generate_dummy_data(num_assessments=50, max_users_per_assessment=5):
data = []
for i in range(1, num_assessments + 1):
assessment_id = str(i)
assessment_name = f"Assessment {i}"
start_date = random_date()
red_flags = random.randint(0, 5)
open_items = random.randint(0, 50)
completed_items = random.randint(0, 50)
total_assigned_items = open_items + completed_items
# Generate random users for each assessment
user_details = []
num_users = random.randint(1, max_users_per_assessment)
for _ in range(num_users):
user_name = f"User_{random.randint(1, 100)}"
user_total_items = random.randint(1, 50)
user_completed_items = random.randint(0, user_total_items)
area_list = random.sample(areas, random.randint(1, 5))
user_details.append({
"name": user_name,
"total_assigned_items": user_total_items,
"completed_items": user_completed_items,
"area_list": area_list
})
data.append({
"assessment_id": assessment_id,
"red_flags": red_flags,
"open_items": open_items,
"completed_items": completed_items,
"total_assigned_items": total_assigned_items,
"assessment_name": assessment_name,
"start_date": start_date,
"user_details": user_details
})
return {"error": False, "data": data}
# Generate dummy data
dummy_data = generate_dummy_data(num_assessments=100, max_users_per_assessment=5)
#print(dummy_data)
#SopGeneratorDocument #SopGeneratorDocument
class Chatbot: class Chatbot:
def __init__(self): def __init__(self):
@@ -123,10 +184,14 @@ class Chatbot:
""" """
try: try:
# Define the path to the company's assessment data (stored as a CSV) # Define the path to the company's assessment data (stored as a CSV)
data_path = os.path.join('data', 'raw', 'erp_company_assessment', f'{companyid}_raw_data.csv') json_body_area = create_json_body("problematic-areas", companyid)
json_body_assessment = create_json_body("user-stats-by-assessment", companyid)
# Fetching data
problematic_areas_data = fetch_data(json_body_area)
assessment_data = fetch_data(json_body_assessment)
# Generate summary statistics from the company's assessment data summary_stats = generate_summary_stats(assessment_data, problematic_areas_data)
summary_stats = generate_summary_stats_v2(file_path=data_path) print(summary_stats)
# Generate the prompt using the company info and the summary statistics # Generate the prompt using the company info and the summary statistics
@@ -164,7 +229,7 @@ class Chatbot:
return None return None
def predict_next_n_assessment(self, company_info, companyid, N) -> AssessmentPredictionsResponse: def predict_next_n_assessment(self,companyid, N) -> AssessmentPredictionsResponse:
""" """
This method generates predictions based on past assessment data of a company. It queries the backend for the This method generates predictions based on past assessment data of a company. It queries the backend for the
company's assessment data, generates a prompt, and then uses the GPT-4 model to return predictions based on the query. company's assessment data, generates a prompt, and then uses the GPT-4 model to return predictions based on the query.
@@ -176,51 +241,62 @@ class Chatbot:
:return: Result containing the prediction result or None if an error occurs. :return: Result containing the prediction result or None if an error occurs.
""" """
try: try:
json_body_area = create_json_body("problematic-areas", companyid)
json_body_assessment = create_json_body("user-stats-by-assessment", companyid)
# Fetching data
problematic_areas_data = fetch_data(json_body_area)
assessment_data = fetch_data(json_body_assessment)
summary_stats = generate_summary_stats(assessment_data, problematic_areas_data)
print(summary_stats)
# Define the path to the company's assessment data (stored as a CSV) # Define the path to the company's assessment data (stored as a CSV)
data_path = os.path.join('data', 'raw', 'erp_company_assessment', f'{companyid}_raw_data.csv') #data_path = os.path.join('data', 'raw', 'erp_company_assessment', f'{companyid}_raw_data.csv')
# Generate summary statistics from the company's assessment data # Generate summary statistics from the company's assessment data
summary_stats = generate_summary_stats_v2(file_path=data_path) #summary_stats = generate_summary_stats_v2(file_path=data_path)
# Generate the prompt using the company info and the summary statistics # Generate the prompt using the company info and the summary statistics
prompt = predict_next_n_assessments_prompt() prompt = predict_next_n_assessments_prompt_v2()
# Interact with GPT-4 model to get a response # Interact with GPT-4 model to get a response
MODEL = "gpt-4o"
response = self.client.beta.chat.completions.parse( response = self.client.beta.chat.completions.parse(
model=self.model, model=MODEL,
messages=[ messages=[
{ {
"role": "system", "role": "system",
"content": f"{prompt}" "content": f"{prompt}"
}, },
{ {
"role": "user", "role": "user",
"content": f"company info: {company_info}--> N-value is {N} ", "content": f"Summary stattistics of company past assessment: {summary_stats}",
}, },
{ {
"role": "user", "role": "user",
"content": f"Summary stats: {summary_stats}", "content": f"Number of next assessments to predict: {N}",
} }
], ],
response_format=AssessmentPredictionsResponse, #response_format=AssessmentPredictionsResponse,
max_tokens=1024, max_tokens=1024,
temperature=0.1 temperature=0.1
) )
# Extract the response from the GPT-4 model # Extract the response from the GPT-4 model
extracted_text = json.loads(response.choices[0].message.content) preds = json.loads(response.choices[0].message.content)
# Initialize dictionary to store assessments with dynamic names # Initialize dictionary to store assessments with dynamic names
predictions = {} #predictions = {}
# Loop through the predicted assessments and rename them dynamically # Loop through the predicted assessments and rename them dynamically
for i in range(N): #for i in range(N):
assessment_key = f"assessment_{i + 1}" #assessment_key = f"assessment_{i + 1}"
predictions[assessment_key] = extracted_text["predictions"][i]['AssessmentN'] #predictions[assessment_key] = extracted_text["predictions"][i]['AssessmentN']
# Return the dynamically named assessments # Return the dynamically named assessments
return predictions return preds
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"An error occurred: {e}")
@@ -279,10 +355,18 @@ class Chatbot:
""" """
try: try:
# Define the path to the company's assessment data (stored as a CSV) # Define the path to the company's assessment data (stored as a CSV)
data_path = os.path.join('data', 'raw', 'erp_company_assessment', f'{companyid}_raw_data.csv') #data_path = os.path.join('data', 'raw', 'erp_company_assessment', f'{companyid}_raw_data.csv')
# Generate summary statistics from the company's assessment data # Generate summary statistics from the company's assessment data
summary_stats = generate_summary_stats_v2(file_path=data_path) #summary_stats = generate_summary_stats_v2(file_path=data_path)
json_body_area = create_json_body("problematic-areas", companyid)
json_body_assessment = create_json_body("user-stats-by-assessment", companyid)
# Fetching data
problematic_areas_data = fetch_data(json_body_area)
assessment_data = fetch_data(json_body_assessment)
summary_stats = generate_summary_stats(assessment_data, problematic_areas_data)
# Generate the prompt using the company info and the summary statistics # Generate the prompt using the company info and the summary statistics
prompt = predict_goal_achievement_probability_prompt() prompt = predict_goal_achievement_probability_prompt()
@@ -301,7 +385,7 @@ class Chatbot:
}, },
{ {
"role": "user", "role": "user",
"content": f"Summary stats: {summary_stats}", "content": f"Summary stats of past assement: {summary_stats}",
} }
], ],
response_format=AchievementPrediction, response_format=AchievementPrediction,
+37 -1
View File
@@ -59,7 +59,7 @@ class DocumentParser:
def extract_vision_mission(self, docs) -> VisionMissionResponse: def extract_vision_mission(self, docs) -> VisionMissionResponse:
""" """
Extracts Vision, Mission, and SOPs categorized into 'must,' 'shall,' and 'will' from the document. Extracts Vision, Mission' from the document.
:param docs: The document(s) from which to extract information. :param docs: The document(s) from which to extract information.
:return: VisionMissionResponse containing the vision, mission, and role-specific SOPs. :return: VisionMissionResponse containing the vision, mission, and role-specific SOPs.
@@ -93,6 +93,42 @@ class DocumentParser:
except: except:
return False return False
def extract_vision_mission2(self, docs) -> VisionMissionResponse2:
"""
Extracts Vision, Mission, and SOPs categorized into 'must,' 'shall,' and 'will' from the document.
:param docs: The document(s) from which to extract information.
:return: VisionMissionResponse containing the vision, mission, and role-specific SOPs.
"""
try:
docs_text = self._extract_text_from_docs(docs)
prompt = get_vision_mission_extraction_from_doc2()
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": f'''{prompt}'''
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=VisionMissionResponse2,
max_tokens=10000,
temperature=0.1
)
# Parse the response from the LLM
extracted_text = json.loads(response.choices[0].message.content)
return extracted_text
except:
return False
'''def extract_departments_and_managers(self, docs): '''def extract_departments_and_managers(self, docs):
""" """
Extract departments and managerial roles from the document. Extract departments and managerial roles from the document.
+76
View File
@@ -97,6 +97,82 @@ class DocumentParser:
except: except:
return False return False
def extract_vision_mission2(self, docs,departments) -> VisionMissionResponse2:
"""
Extracts Vision, Mission, and SOPs categorized into 'must,' 'shall,' and 'will' from the document.
:param docs: The document(s) from which to extract information.
:return: VisionMissionResponse containing the vision, mission, and role-specific SOPs.
"""
try:
docs_text = self._extract_text_from_docs(docs)
prompt = get_vision_mission_extraction_from_doc2()
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": f'''{prompt}'''
},
{
"role": "user",
"content": f"Department to consider for the company goals generation{departments}"
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=VisionMissionResponse2,
max_tokens=10000,
temperature=0.1
)
# Parse the response from the LLM
extracted_text = json.loads(response.choices[0].message.content)
return extracted_text
except:
return False
def extract_dept_goals(self, docs) -> VisionMissionResponse2:
"""
Extracts Vision, Mission, and SOPs categorized into 'must,' 'shall,' and 'will' from the document.
:param docs: The document(s) from which to extract information.
:return: VisionMissionResponse containing the vision, mission, and role-specific SOPs.
"""
try:
docs_text = self._extract_text_from_docs(docs)
prompt = get_vision_mission_extraction_from_doc2()
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{
"role": "system",
"content": f'''{prompt}'''
},
{
"role": "user",
"content": [{"type": "text", "text": text} for text in docs_text],
}
],
response_format=DeptGoalsVisssion,
max_tokens=10000,
temperature=0.1
)
# Parse the response from the LLM
extracted_text = json.loads(response.choices[0].message.content)
return extracted_text
except:
return False
def get_roles(self, docs): def get_roles(self, docs):
# Extract the text content from the Document objects # Extract the text content from the Document objects
docs_text = [doc.page_content for doc in docs] docs_text = [doc.page_content for doc in docs]
+142 -45
View File
@@ -7,6 +7,13 @@ from src.prompts.sops import *
from src.models.sop_response_schemas import * from src.models.sop_response_schemas import *
from src.services.sop_document_parser import DocumentParser from src.services.sop_document_parser import DocumentParser
from dotenv import load_dotenv from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import json
from openai import OpenAI
import os
import time
from collections import deque
load_dotenv() load_dotenv()
@@ -16,6 +23,9 @@ class SopPersonalAssessment:
self.api_key = os.getenv("OPENAI_API_KEY") self.api_key = os.getenv("OPENAI_API_KEY")
self.client = OpenAI(api_key=self.api_key) self.client = OpenAI(api_key=self.api_key)
self.model = "gpt-4o-mini" self.model = "gpt-4o-mini"
self.rpm_limit = 3 # Requests per minute limit
self.batch_size = 3 # Process 3 roles at a time
self.retry_delays = [4, 8, 16, 32]
# Existing methods... # Existing methods...
@@ -73,41 +83,46 @@ class SopPersonalAssessment:
return False return False
def generate_sops_by_role_and_area(self, roles: List[dict]) -> RoleSops: def generate_sops_by_role_and_area(self, roles,qna) -> RoleSops:
try: try:
sops_by_role = []
for role_info in roles:
role = role_info['role']
sop_types = role_info['sop_types'] # List of SOP types: ["will", "shall", "must"]
areas = role_info['areas'] # List of areas: ["communication", "development", etc.]
prompt = get_sop_personalassessment_from_area_role(role,areas,sop_types) MODEL = "gpt-4o-mini"
prompt = get_sop_personalassessment_from_area_rolev2()
response = self.client.beta.chat.completions.parse( response = self.client.beta.chat.completions.parse(
model=self.model, model=MODEL,
messages=[ messages=[
{ {
"role": "system", "role": "system",
"content": f'''{prompt} "content": f'''{prompt}
''', ''',
},
{
"role": "user",
"content": f'''povided Roles Data {roles}
''',
},
{
"role": "user",
"content": f'''Extra question and answers for more context {qna}
''',
} }
], ],
response_format=RoleSops, response_format=RoleSopssLists,
max_tokens=16000, max_tokens=6000,
temperature=0.1 temperature=0.1
) )
extracted_text = json.loads(response.choices[0].message.content) extracted_text = json.loads(response.choices[0].message.content)
# You can customize this to generate SOPs based on the role, SOP types, and areas # You can customize this to generate SOPs based on the role, SOP types, and areas
sops_by_role.append(extracted_text)
return extracted_text["sops"]
except Exception as e:
return sops_by_role print(f"Error occurred: {str(e)}")
except:
return False return False
def generate_roles_from_questionnaire(self, questionnaire_data: List[dict]) -> Roles_response: def generate_roles_from_questionnaire(self, questionnaire_data: List[dict]) -> Roles_response:
try: try:
@@ -165,6 +180,7 @@ class SopGeneratorExecutive:
Find the mission and vision provided below Find the mission and vision provided below
Vision: {vision} Vision: {vision}
Goals: {goals} Goals: {goals}
''' '''
response = self.client.beta.chat.completions.parse( response = self.client.beta.chat.completions.parse(
@@ -232,9 +248,82 @@ class SopGeneratorExecutive:
return False return False
def generate_sops_from_questionnaire(self, questionnaire_data: dict, executives: List[str], managers: List[str], departments: List[str]): def generate_sops_from_questionnaire(self, questionnaire_data: dict, executives: List[str]):
try: try:
print("Generating SOPs from questionnaire...")
prompt = get_sop_executive_from_questionnaire() prompt = get_sop_executive_from_questionnaire()
print(f"Prompt: {prompt}")
# Prepare the questionnaire data for the prompt
user_content = json.dumps(questionnaire_data, indent=2)
print(f"User content: {user_content}")
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate SOPs based on this questionnaire response:\n{user_content}\n\nExecutives to consider: {', '.join(executives)}"}
],
response_format={"type": "json_object"},
max_tokens=16000,
temperature=0.1
)
print("Response received from API.")
sops_data = json.loads(response.choices[0].message.content)
print(f"SOPs data: {sops_data}")
# Process executive SOPs
return {
"response": sops_data
}
except Exception as e:
print(f"Error in generate_sops_from_questionnaire: {str(e)}")
return False
def generate_vision_mission_from_questionnaire(self, questionnaire_data: dict):
"""
Generate vision and mission based on the questionnaire data provided in the request body.
The request body is expected to contain plain-text information for vision and mission."""
try:
print("Generating vsion and mission from questionnaire...")
prompt = get_vision_mission_extraction_from_questionnaire_executive()
print(f"Prompt: {prompt}")
# Prepare the questionnaire data for the prompt
user_content = json.dumps(questionnaire_data, indent=2)
print(f"User content: {user_content}")
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"questionnaire response:\n{user_content}"}
],
response_format=VisionMissionResponse2,
max_tokens=16000,
temperature=0.1
)
print("Response received from API.")
response = json.loads(response.choices[0].message.content)
return response
except Exception as e:
print(f"Error in generate_sops_from_questionnaire: {str(e)}")
return False
def generate_dept_goals_from_questionnaire(self, questionnaire_data):
try:
print("Generating SOPs from questionnaire...")
prompt = get_dept_vision_mission_extraction_from_questionaiire()
print(f"Prompt: {prompt}")
# Prepare the questionnaire data for the prompt # Prepare the questionnaire data for the prompt
user_content = json.dumps(questionnaire_data, indent=2) user_content = json.dumps(questionnaire_data, indent=2)
@@ -243,45 +332,54 @@ class SopGeneratorExecutive:
model=self.model, model=self.model,
messages=[ messages=[
{"role": "system", "content": prompt}, {"role": "system", "content": prompt},
{"role": "user", "content": f"Generate SOPs based on this questionnaire:\n{user_content}\n\nExecutives to consider: {', '.join(executives)}\nManagers to consider: {', '.join(managers)}\nDepartments to consider: {', '.join(departments)}"} {"role": "user", "content": f"Generate based on this questionnaire response:\n{user_content}\n"}
],
response_format=DeptGoalsVisssion,
max_tokens=16000,
temperature=0.1
)
print("Response received from API.")
sops_data = json.loads(response.choices[0].message.content)
# Process executive SOPs
return sops_data
except Exception as e:
print(f"Error in generate_sops_from_questionnaire: {str(e)}")
return False
def generate_sops_from_questionnaire2(self, questionnaire_data: dict):
try:
print("Generating SOPs from questionnaire...")
prompt = get_sop_executive_from_questionnaire2()
print(f"Prompt: {prompt}")
# Prepare the questionnaire data for the prompt
user_content = json.dumps(questionnaire_data, indent=2)
print(f"User content: {user_content}")
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": f"Generate SOPs based on this questionnaire response:\n{user_content}\n"}
], ],
response_format={"type": "json_object"}, response_format={"type": "json_object"},
max_tokens=16000, max_tokens=16000,
temperature=0.1 temperature=0.1
) )
print("Response received from API.")
sops_data = json.loads(response.choices[0].message.content) sops_data = json.loads(response.choices[0].message.content)
print(f"SOPs data: {sops_data}")
# Process executive SOPs # Process executive SOPs
executive_sops = {}
for executive in executives:
if executive in sops_data['executives']:
executive_sops[executive] = Categories(**sops_data['executives'][executive])
else:
executive_sops[executive] = Categories()
# Process department manager SOPs
departments_with_sops = []
for dept_name in departments:
dept_data = next((d for d in sops_data['departments'] if d['name'].lower() == dept_name.lower()), None)
if dept_data:
managers_with_sops = [
ManagerWithSOPs(
title=manager,
sops=ManagerSOPs(**dept_data['managers'])
)
for manager in managers
]
if managers_with_sops:
departments_with_sops.append(DepartmentManagerSOPs(
name=dept_name,
managers=managers_with_sops
))
return { return {
"executive_sops": executive_sops, "response": sops_data
"department_sops": ExecutiveManagerSOPsResponse(departments=departments_with_sops)
} }
except Exception as e: except Exception as e:
@@ -289,7 +387,6 @@ class SopGeneratorExecutive:
return False return False
class SopGeneratorManager: class SopGeneratorManager:
def __init__(self): def __init__(self):
self.api_key = os.getenv("OPENAI_API_KEY") self.api_key = os.getenv("OPENAI_API_KEY")
+133 -71
View File
@@ -1,78 +1,140 @@
import os import requests
import json import pandas as pd
import asyncio
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from src.prompts.sops import *
from src.models.questions_response import *
from src.services.sop_document_parser import DocumentParser
from src.prompts.questions import *
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
import os
class QuestionsGenerator: DATA_KEY = os.getenv("AI_DATA_KEY")
def __init__(self): # Constants for API requests
self.api_key = os.getenv("OPENAI_API_KEY") URL = "https://erpai.mkdlabs.com//v3/api/custom/erpai/common/get-data-ai"
self.client = AsyncOpenAI(api_key=self.api_key) HEADERS = {
self.model = "gpt-4o-mini" "x-project": DATA_KEY # Replace with your actual key
}
async def generate_single_frequency_questions(self, docs, assessment_type, frequency_number, total_duration): # JSON bodies for API requests
prompt = get_questions_prompt_v3() def create_json_body(area_type, company_id):
frequency_label = f"{assessment_type} number : {frequency_number}" return {
"type": area_type,
response = await self.client.chat.completions.create( "options": {
model=self.model, "company_id": company_id
messages=[ }
{"role": "system", "content": prompt},
{"role": "user", "content": f"The SOPs are provided below."},
{"role": "user", "content": json.dumps(docs)},
{"role": "user", "content": f"Assessment Type: {assessment_type}"},
{"role": "user", "content": f"Current Frequency Number to generate: {frequency_label}"},
{"role": "user", "content": f"Duration: {total_duration}"}
],
temperature=0.1,
response_format={ "type": "json_object" },
max_tokens=10000
)
questions_json = json.loads(response.choices[0].message.content)
return questions_json
async def generate_questions(self, input_data: Dict) -> AssessmentQuestions:
try:
sops = input_data['sops']
assessment_type = input_data['assessment_type']
total_duration = input_data['duration']
chunk_size = 1000
docs_text = [sops[i:i + chunk_size] for i in range(0, len(sops), chunk_size)]
docs = [{"type": "text", "text": text} for text in docs_text]
tasks = []
for frequency_number in range(1, total_duration + 1):
task = self.generate_single_frequency_questions(docs, assessment_type, frequency_number, total_duration)
tasks.append(task)
all_questions = await asyncio.gather(*tasks)
return AssessmentQuestions(questions=Questions(questions=all_questions))
except Exception as e:
print(f"An error occurred: {e}")
return None
# Usage
async def main():
generator = QuestionsGenerator()
input_data = {
"sops": "Your SOP text here",
"assessment_type": "weekly",
"duration": 4
} }
result = await generator.generate_questions(input_data)
print(result) # Function to fetch data from the API
def fetch_data(json_body):
json_body["options"]["company_id"] = json_body["options"].get("company_id") # Ensure company_id is included
response = requests.post(URL, headers=HEADERS, json=json_body)
response.raise_for_status() # Raise an error for bad responses
return response.json()
def convert_assessment_data_to_dataframe(assessment_data):
df_assessment = []
for assessment in assessment_data.get("data", []):
assessment_id = assessment["assessment_id"]
assessment_name = assessment["assessment_name"]
start_date = assessment["start_date"]
open_items = assessment["open_items"]
completed_items = assessment["completed_items"]
total_assigned_items = assessment["total_assigned_items"]
red_flags = assessment["red_flags"]
for user in assessment.get("user_details", []):
user_name = user["name"]
user_total_items = user["total_assigned_items"]
user_completed_items = user["completed_items"]
for area in user.get("area_list", []):
df_assessment.append({
"assessment_id": assessment_id,
"assessment_name": assessment_name,
"start_date": start_date,
"open_items_overall": open_items,
"completed_items_overall": completed_items,
"total_assigned_items_overall": total_assigned_items,
"user_name": user_name,
"user_total_assigned_items": user_total_items,
"user_completed_items": user_completed_items,
"area": area,
"red_flags": red_flags
})
return pd.DataFrame(df_assessment)
# Convert to DataFrame
# Summary statistics for overall assessment level
def generate_summary_statistics(df):
total_assessments = df['assessment_id'].nunique()
avg_open_items = df.groupby('assessment_id')['open_items_overall'].mean().mean()
avg_completed_items = df.groupby('assessment_id')['completed_items_overall'].mean().mean()
avg_total_assigned_items = df.groupby('assessment_id')['total_assigned_items_overall'].mean().mean()
avg_red_flags = df['red_flags'].mean()
total_users = df['user_name'].nunique()
avg_user_total_items = df.groupby('user_name')['user_total_assigned_items'].mean().mean()
avg_user_completed_items = df.groupby('user_name')['user_completed_items'].mean().mean()
completion_rate_per_user = (df['user_completed_items'].sum() / df['user_total_assigned_items'].sum()) * 100 if df['user_total_assigned_items'].sum() > 0 else 0
area_summary = df['area'].value_counts()
return {
"total_assessments": total_assessments,
"avg_open_items_per_assessment": avg_open_items,
"avg_completed_items_per_assessment": avg_completed_items,
"avg_total_assigned_items_per_assessment": avg_total_assigned_items,
"avg_red_flags": avg_red_flags,
"total_users": total_users,
"avg_user_total_assigned_items": avg_user_total_items,
"avg_user_completed_items": avg_user_completed_items,
"completion_rate_per_user": completion_rate_per_user,
"area_summary": area_summary.to_dict()
}
# Additional statistics for efficiency and areas
def generate_extended_statistics(df):
df['user_completion_rate'] = (df['user_completed_items'] / df['user_total_assigned_items']).fillna(0) * 100
top_5_efficient_users = df.groupby('user_name')['user_completion_rate'].mean().nlargest(5).to_dict()
bottom_5_least_efficient_users = df.groupby('user_name')['user_completion_rate'].mean().nsmallest(5).to_dict()
df['uncompleted_items'] = df['user_total_assigned_items'] - df['user_completed_items']
areas_with_most_uncompleted_items = df.groupby('area')['uncompleted_items'].sum().nlargest(5).to_dict()
return {
"top_5_efficient_users": top_5_efficient_users,
"bottom_5_least_efficient_users": bottom_5_least_efficient_users,
"areas_with_most_uncompleted_items": areas_with_most_uncompleted_items
}
# Generate statistics for problematic areas
def generate_problematic_area_statistics(df):
total_open_items = df.groupby('name')['open_items'].sum().sort_values(ascending=False)
total_red_flags = df.groupby('name')['red_flags'].sum().sort_values(ascending=False)
return pd.DataFrame({
"total_open_items": total_open_items,
"total_red_flags": total_red_flags
}).fillna(0)
def generate_summary_stats(assessment_data, area_data):
assessment_df = convert_assessment_data_to_dataframe(assessment_data)
problematic_area_df = pd.DataFrame(area_data.get("data", []))
summary_stats = generate_summary_statistics(assessment_df)
extended_stats = generate_extended_statistics(assessment_df)
summary_stats["users(Workers) based stats"] = extended_stats
problematic_stats = generate_problematic_area_statistics(problematic_area_df)
summary_stats["Area based stats"] = problematic_stats.to_dict(orient='index')
return summary_stats
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) from src.services.chatbot import Chatbot
bot = Chatbot()
res = bot.predict_next_n_assessment(companyid=12,N=3)
print(res)