import os from typing import Optional from fastapi import FastAPI, HTTPException, Security, Depends, Request from fastapi.security import APIKeyHeader from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from dotenv import load_dotenv from utils.document_loader import load_document import json from pydantic import BaseModel from src.llm import ai_chat from langchain_openai import ChatOpenAI import requests import tempfile from scripts.generate_pdf import create_pdf from scripts.generate_theme import generate_theme from scripts.generate_quiz import generate_quiz from typing import Dict, Any from fastapi.responses import Response from datetime import datetime from fastapi import HTTPException from pydantic import BaseModel from typing import Optional, Union, Dict, Any import os import requests import os from PyPDF2 import PdfReader from config import QUIZ_TYPES from config import Config import logging import time # Load environment variables load_dotenv() from config import Config API_KEY = Config.API_KEY_ACCESS # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Also configure uvicorn logger uvicorn_logger = logging.getLogger("uvicorn.access") uvicorn_logger.setLevel(logging.INFO) base_path = os.path.join("data", "config_files") QUESTIONS_PATH = os.path.join(base_path, "questions.json") THEME_CONTEXT_PATH = os.path.join(base_path, "theme_context.json") backend_base_url = Config.BACKEND_BASE_URL with open(THEME_CONTEXT_PATH, "r", encoding="utf-8") as f: themes = json.load(f) # Initialize FastAPI app app = FastAPI( title="Fire Fighter Interview API", description="API For fire fighter", version="1.0.0" ) # Add request logging middleware @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() # Log incoming request (using both logger and print for visibility) log_msg = f"🔥 INCOMING REQUEST: {request.method} {request.url}" logger.info(log_msg) print(log_msg) headers_msg = f"🔥 Headers: {dict(request.headers)}" logger.info(headers_msg) print(headers_msg) # Get request body for POST requests if request.method == "POST": body = await request.body() body_msg = f"🔥 Request Body: {body.decode('utf-8') if body else 'Empty'}" logger.info(body_msg) print(body_msg) # Re-create request with body for downstream processing async def receive(): return {"type": "http.request", "body": body} request._receive = receive response = await call_next(request) # Log response process_time = time.time() - start_time response_msg = f"🔥 RESPONSE: {response.status_code} - Time: {process_time:.4f}s" logger.info(response_msg) print(response_msg) return response # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Setup API key authentication api_key_header = APIKeyHeader(name="Authorization", auto_error=False) async def get_api_key(api_key_header: str = Security(api_key_header)) -> str: """Validate API key from header""" if not api_key_header or not api_key_header.startswith('Bearer '): raise HTTPException( status_code=401, detail={"error": "Unauthorized", "message": "API key is missing or invalid."} ) token = api_key_header.split(' ')[1] if token != API_KEY: raise HTTPException( status_code=401, detail={"error": "Unauthorized", "message": "API key does not match."} ) return token class ChatRequest(BaseModel): resume_url: Optional[str] = None query: str=None chat_id: int theme_id: Optional[int] = 1 full_history_url: Optional[str] = None form_id:Optional[int] = None feedback: Optional[str] = None generate_theme:str="NO" class ChatResponse(BaseModel): message: str end: bool pop_theme_generation:bool error: Optional[str] = None class GeneratePDFRequest(BaseModel): resume_url: Optional[str] = None chat_id: int theme_id: Optional[int] = 1 full_history_url: Optional[str] = None form_id:Optional[int] = None generate_theme:str="YES" class QuizRequest(BaseModel): pdf_url: str quiz_type: int # 1, 2, or 3 corresponding to QUIZ_TYPES class QuizResponse(BaseModel): success: bool message: str quiz_data: Optional[Dict[str, Any]] = None error: Optional[str] = None async def extract_pdf_text(pdf_url: str) -> Union[str, None]: """Extract text from PDF and handle potential errors.""" try: response = requests.get(pdf_url) response.raise_for_status() # Create a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf: temp_pdf.write(response.content) temp_path = temp_pdf.name # Extract text from PDF reader = PdfReader(temp_path) text = "\n\n".join( page.extract_text() for page in reader.pages if page.extract_text() ) # Clean up temporary file os.unlink(temp_path) if not text.strip(): return None return text except requests.RequestException as e: raise HTTPException( status_code=400, detail=f"Error downloading PDF: {str(e)}" ) except Exception as e: raise HTTPException( status_code=500, detail=f"Error processing PDF: {str(e)}" ) @app.post("/rescue-career/chat") async def chat_endpoint( request: ChatRequest, api_key: str = Depends(get_api_key) ): try: # Validate theme print(f"Received request with theme_id: {request.theme_id}") # Debugging print matching_themes = [t for t in themes if t["id"] == request.theme_id] if not matching_themes: print(f"No theme found with ID: {request.theme_id}") # Debugging print raise HTTPException( status_code=400, detail=f"No theme found with ID {request.theme_id}" ) print(f"Validated theme ID: {request.theme_id}") # Print statement added resume_docs = "" if request.resume_url: print(f"Loading resume from URL: {request.resume_url}") # Debugging print docs = load_document(request.resume_url) if not docs: print("Invalid resume URL: Unable to fetch document") # Debugging print raise HTTPException( status_code=400, detail="Invalid resume URL: Unable to fetch document" ) resume_docs = "\n".join(f"- {doc.page_content}" for doc in docs) print(f"Loaded resume documents: {resume_docs[:100]}") # Debugging print full_history_docs = "" if request.full_history_url: print(f"Loading full history from URL: {request.full_history_url}") # Debugging print docs = load_document(request.full_history_url) if not docs: print("Invalid full history URL: Unable to fetch document") # Debugging print raise HTTPException( status_code=400, detail="Invalid full history URL: Unable to fetch document" ) full_history_docs = "\n".join(f"- {doc.page_content}" for doc in docs) print(f"Loaded full history documents: {full_history_docs[:100]}") # Debugging print form_response_docs = "" if request.form_id: print(f"Fetching form response for form_id: {request.form_id}") # Debugging print try: x_api_key = os.getenv("BACKEND_XAPI_KEY") url = f"{backend_base_url}/v3/api/custom/theme-document/answer/{request.form_id}?x-project={x_api_key}" result = requests.get(url) result.raise_for_status() # Ensure we raise an error for bad responses form_response = result.json()["data"] # Return response in JSON format form_response_docs = "\n".join(f"- {form_response}") print(f"Fetched form response: {form_response}") # Debugging print except requests.RequestException as e: print(f"Error fetching onboarding data: {str(e)}") # Debugging print raise HTTPException( status_code=400, detail="Unable to fetch onboarding data" ) # Parse response print("Parsing AI response...") # Debugging print query = request.query if not query: query = "Let's get started" response = ai_chat( query=query, conversation_id=request.chat_id, theme_id=request.theme_id, resume=resume_docs, full_history=full_history_docs, form_response=form_response_docs ) print(response) return ChatResponse( message=response.get("message", ""), end=response.get("end", "no") == "yes", pop_theme_generation=response.get("pop_theme_generation","no") == "yes", error=None ) except Exception as e: print(f"Error processing chat request: {str(e)}") # Print statement added raise HTTPException( status_code=500, detail=f"Error processing chat request: {str(e)}" ) @app.post("/rescue-career/generate-theme") async def generate_pdf_endpoint( request: GeneratePDFRequest, api_key: str = Depends(get_api_key) ): try: print(f"Received request with theme_id: {request.theme_id}") # Debugging print matching_themes = [t for t in themes if t["id"] == request.theme_id] if not matching_themes: print(f"No theme found with ID: {request.theme_id}") # Debugging print raise HTTPException( status_code=400, detail=f"No theme found with ID {request.theme_id}" ) print(f"Validated theme ID: {request.theme_id}") # Print statement added resume_docs = "" if request.resume_url: print(f"Loading resume from URL: {request.resume_url}") # Debugging print docs = load_document(request.resume_url) if not docs: print("Invalid resume URL: Unable to fetch document") # Debugging print raise HTTPException( status_code=400, detail="Invalid resume URL: Unable to fetch document" ) resume_docs = "\n".join(f"- {doc.page_content}" for doc in docs) print(f"Loaded resume documents: {resume_docs[:100]}") # Debugging print full_history_docs = "" if request.full_history_url: print(f"Loading full history from URL: {request.full_history_url}") # Debugging print docs = load_document(request.full_history_url) if not docs: print("Invalid full history URL: Unable to fetch document") # Debugging print raise HTTPException( status_code=400, detail="Invalid full history URL: Unable to fetch document" ) full_history_docs = "\n".join(f"- {doc.page_content}" for doc in docs) print(f"Loaded full history documents: {full_history_docs[:100]}") # Debugging print form_response_docs = "" if request.form_id: print(f"Fetching form response for form_id: {request.form_id}") # Debugging print try: x_api_key = Config.BACKEND_XAPI_KEY url = f"{backend_base_url}/v3/api/custom/theme-document/answer/{request.form_id}?x-project={x_api_key}" result = requests.get(url) result.raise_for_status() # Ensure we raise an error for bad responses form_response = result.json()["data"] # Return response in JSON format form_response_docs = "\n".join(f"- {form_response}") print(f"Fetched form response: {form_response}") # Debugging print except requests.RequestException as e: print(f"Error fetching onboarding data: {str(e)}") # Debugging print raise HTTPException( status_code=400, detail="Unable to fetch onboarding data" ) # Here you would fetch the conversation data using the conversation_id # This is a placeholder - replace with your actual conversation data fetching logic # Get AI-generated theme content # Get AI-generated theme content response = ai_chat( query="NOW GENERATE THE STARTPOP FRAMEWORK", conversation_id=request.chat_id, theme_id=request.theme_id, resume=resume_docs, full_history=full_history_docs, form_response=form_response_docs, generate_theme="YES" ) print(f"AI Response for theme: {response}") # Ensure AI response is valid if not isinstance(response, str): raise HTTPException(status_code=500, detail="Invalid AI response format") # Generate PDF response_data = json.loads(response) pdf_content = create_pdf(response_data) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = f"theme_{timestamp}.pdf" # Save the PDF locally temporarily with open(file_path, "wb") as file: file.write(pdf_content) # Upload the PDF to S3 using the API upload_url = f"{backend_base_url}/v3/api/custom/theme/doc-upload?x-project={x_api_key}" with open(file_path, 'rb') as file: files = {'file': file} upload_response = requests.post(upload_url, files=files) # Check if the upload was successful if upload_response.status_code != 200: raise HTTPException(status_code=upload_response.status_code, detail="File upload to S3 failed: " + upload_response.text) upload_data = upload_response.json() # Get the response in JSON format # Extract the uploaded file URL theme_url = upload_data.get("url") # Adjust this key based on the actual API response structure if not theme_url: raise HTTPException(status_code=500, detail="Failed to retrieve theme URL from upload response") # Clean up the temporary file os.remove(file_path) # Return JSON response with theme URL and text return { "theme_url": theme_url, "theme_text": response_data } except Exception as e: print(f"Error generating theme: {str(e)}") raise HTTPException(status_code=500, detail=f"Error: {str(e)}") @app.post("/rescue-career/generate-quiz", response_model=QuizResponse) async def generate_quiz_endpoint( request: QuizRequest, api_key: str = Depends(get_api_key) ): """Generate quiz based on PDF content and quiz type.""" # Validate quiz type if request.quiz_type not in QUIZ_TYPES: raise HTTPException( status_code=400, detail=f"Invalid quiz type. Must be one of: {list(QUIZ_TYPES)}" ) try: # Extract text from PDF pdf_text = await extract_pdf_text(request.pdf_url) if not pdf_text: return QuizResponse( success=False, message="PDF extraction completed but no text content found", error="Empty PDF content" ) # Generate quiz using the existing function quiz_data = generate_quiz( startpop_pdf=pdf_text, quiz_type=request.quiz_type ) if not quiz_data: return QuizResponse( success=False, message="Quiz generation failed", error="Unable to generate quiz from the provided content" ) return QuizResponse( success=True, message="Quiz generated successfully", quiz_data=quiz_data ) except HTTPException as he: raise he except Exception as e: raise HTTPException( status_code=500, detail=f"Unexpected error during quiz generation: {str(e)}" ) @app.get("/health") async def health_check(): """Health check endpoint to verify the service is running.""" return {"status": "healthy", "timestamp": datetime.now().isoformat()} @app.on_event("startup") async def startup_event(): """Initialize required components on startup""" pass if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=5042, reload=True)