import os import json import asyncio from openai import AsyncOpenAI from pydantic import BaseModel, Field from typing import List, Dict, Optional from src.prompts.sops import * from src.models.questions_response import * from src.services.sop_document_parser import DocumentParser from src.prompts.questions import * from dotenv import load_dotenv load_dotenv() class QuestionsGenerator: def __init__(self): self.api_key = os.getenv("OPENAI_API_KEY") self.client = AsyncOpenAI(api_key=self.api_key) self.model = "gpt-4o-mini" async def generate_single_frequency_questions(self, docs, assessment_type, frequency_number, total_duration): prompt = get_questions_prompt_v3() frequency_label = f"{assessment_type} number : {frequency_number}" response = await self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": f"The SOPs are provided below."}, {"role": "user", "content": json.dumps(docs)}, {"role": "user", "content": f"Assessment Type: {assessment_type}"}, {"role": "user", "content": f"Current Frequency Number to generate: {frequency_label}"}, {"role": "user", "content": f"Duration: {total_duration}"} ], temperature=0.1, response_format={ "type": "json_object" }, max_tokens=10000 ) questions_json = json.loads(response.choices[0].message.content) return questions_json async def generate_questions(self, input_data: Dict) -> AssessmentQuestions: try: sops = input_data['sops'] assessment_type = input_data['assessment_type'] total_duration = input_data['duration'] chunk_size = 1000 docs_text = [sops[i:i + chunk_size] for i in range(0, len(sops), chunk_size)] docs = [{"type": "text", "text": text} for text in docs_text] tasks = [] for frequency_number in range(1, total_duration + 1): task = self.generate_single_frequency_questions(docs, assessment_type, frequency_number, total_duration) tasks.append(task) all_questions = await asyncio.gather(*tasks) return AssessmentQuestions(questions=Questions(questions=all_questions)) except Exception as e: print(f"An error occurred: {e}") return None # Usage async def main(): generator = QuestionsGenerator() input_data = { "sops": "Your SOP text here", "assessment_type": "weekly", "duration": 4 } result = await generator.generate_questions(input_data) print(result) if __name__ == "__main__": asyncio.run(main())