Files
ds-fire-fighter/app.py
T
2025-02-12 00:12:02 +01:00

438 lines
16 KiB
Python

import os
from typing import Optional
from fastapi import FastAPI, HTTPException, Security, Depends
from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from utils.document_loader import load_document
import json
from pydantic import BaseModel
from src.llm import ai_chat
from langchain_openai import ChatOpenAI
import requests
import tempfile
from scripts.generate_pdf import create_pdf
from scripts.generate_theme import generate_theme
from scripts.generate_quiz import generate_quiz
from typing import Dict, Any
from fastapi.responses import Response
from datetime import datetime
from fastapi import HTTPException
from pydantic import BaseModel
from typing import Optional, Union, Dict, Any
import os
import requests
import os
from PyPDF2 import PdfReader
from config import QUIZ_TYPES
# Load environment variables
load_dotenv()
API_KEY = os.getenv("API_KEY_ACCESS")
base_path = os.path.join("data", "config_files")
QUESTIONS_PATH = os.path.join(base_path, "questions.json")
THEME_CONTEXT_PATH = os.path.join(base_path, "theme_context.json")
with open(THEME_CONTEXT_PATH, "r", encoding="utf-8") as f:
themes = json.load(f)
# Initialize FastAPI app
app = FastAPI(
title="Fire Fighter Interview API",
description="API For fire fighter",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup API key authentication
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
async def get_api_key(api_key_header: str = Security(api_key_header)) -> str:
"""Validate API key from header"""
if not api_key_header or not api_key_header.startswith('Bearer '):
raise HTTPException(
status_code=401,
detail={"error": "Unauthorized", "message": "API key is missing or invalid."}
)
token = api_key_header.split(' ')[1]
if token != API_KEY:
raise HTTPException(
status_code=401,
detail={"error": "Unauthorized", "message": "API key does not match."}
)
return token
class ChatRequest(BaseModel):
resume_url: Optional[str] = None
query: str=None
chat_id: int
theme_id: Optional[int] = 1
full_history_url: Optional[str] = None
form_id:Optional[int] = None
feedback: Optional[str] = None
generate_theme:str="NO"
class ChatResponse(BaseModel):
message: str
end: bool
pop_theme_generation:bool
error: Optional[str] = None
class GeneratePDFRequest(BaseModel):
resume_url: Optional[str] = None
chat_id: int
theme_id: Optional[int] = 1
full_history_url: Optional[str] = None
form_id:Optional[int] = None
generate_theme:str="YES"
class QuizRequest(BaseModel):
pdf_url: str
quiz_type: int # 1, 2, or 3 corresponding to QUIZ_TYPES
class QuizResponse(BaseModel):
success: bool
message: str
quiz_data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
async def extract_pdf_text(pdf_url: str) -> Union[str, None]:
"""Extract text from PDF and handle potential errors."""
try:
response = requests.get(pdf_url)
response.raise_for_status()
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf:
temp_pdf.write(response.content)
temp_path = temp_pdf.name
# Extract text from PDF
reader = PdfReader(temp_path)
text = "\n\n".join(
page.extract_text() for page in reader.pages if page.extract_text()
)
# Clean up temporary file
os.unlink(temp_path)
if not text.strip():
return None
return text
except requests.RequestException as e:
raise HTTPException(
status_code=400,
detail=f"Error downloading PDF: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing PDF: {str(e)}"
)
@app.post("/rescue-career/chat")
async def chat_endpoint(
request: ChatRequest,
api_key: str = Depends(get_api_key)
):
try:
# Validate theme
print(f"Received request with theme_id: {request.theme_id}") # Debugging print
matching_themes = [t for t in themes if t["id"] == request.theme_id]
if not matching_themes:
print(f"No theme found with ID: {request.theme_id}") # Debugging print
raise HTTPException(
status_code=400,
detail=f"No theme found with ID {request.theme_id}"
)
print(f"Validated theme ID: {request.theme_id}") # Print statement added
resume_docs = ""
if request.resume_url:
print(f"Loading resume from URL: {request.resume_url}") # Debugging print
docs = load_document(request.resume_url)
if not docs:
print("Invalid resume URL: Unable to fetch document") # Debugging print
raise HTTPException(
status_code=400,
detail="Invalid resume URL: Unable to fetch document"
)
resume_docs = "\n".join(f"- {doc.page_content}" for doc in docs)
print(f"Loaded resume documents: {resume_docs}") # Debugging print
full_history_docs = ""
if request.full_history_url:
print(f"Loading full history from URL: {request.full_history_url}") # Debugging print
docs = load_document(request.full_history_url)
if not docs:
print("Invalid full history URL: Unable to fetch document") # Debugging print
raise HTTPException(
status_code=400,
detail="Invalid full history URL: Unable to fetch document"
)
full_history_docs = "\n".join(f"- {doc.page_content}" for doc in docs)
print(f"Loaded full history documents: {full_history_docs}") # Debugging print
form_response_docs = ""
if request.form_id:
print(f"Fetching form response for form_id: {request.form_id}") # Debugging print
try:
x_api_key = os.getenv("BACKEND_XAPI_KEY")
url = f"{os.getenv('BACKEND_BASE_URL')}/v3/api/custom/theme-document/answer/{request.form_id}?x-project={x_api_key}"
result = requests.get(url)
result.raise_for_status() # Ensure we raise an error for bad responses
form_response = result.json()["data"] # Return response in JSON format
form_response_docs = "\n".join(f"- {form_response}")
print(f"Fetched form response: {form_response}") # Debugging print
except requests.RequestException as e:
print(f"Error fetching onboarding data: {str(e)}") # Debugging print
raise HTTPException(
status_code=400,
detail="Unable to fetch onboarding data"
)
# Parse response
print("Parsing AI response...") # Debugging print
query = request.query
if not query:
query = "Let's get started"
response = ai_chat(
query=query,
conversation_id=request.chat_id,
theme_id=request.theme_id,
resume=resume_docs,
full_history=full_history_docs,
form_response=form_response_docs
)
print(response)
return ChatResponse(
message=response.get("message", ""),
end=response.get("end", "no") == "yes",
pop_theme_generation=response.get("pop_theme_generation","no") == "yes",
error=None
)
except Exception as e:
print(f"Error processing chat request: {str(e)}") # Print statement added
raise HTTPException(
status_code=500,
detail=f"Error processing chat request: {str(e)}"
)
@app.post("/rescue-career/generate-theme")
async def generate_pdf_endpoint(
request: GeneratePDFRequest,
api_key: str = Depends(get_api_key)
):
try:
print(f"Received request with theme_id: {request.theme_id}") # Debugging print
matching_themes = [t for t in themes if t["id"] == request.theme_id]
if not matching_themes:
print(f"No theme found with ID: {request.theme_id}") # Debugging print
raise HTTPException(
status_code=400,
detail=f"No theme found with ID {request.theme_id}"
)
print(f"Validated theme ID: {request.theme_id}") # Print statement added
resume_docs = ""
if request.resume_url:
print(f"Loading resume from URL: {request.resume_url}") # Debugging print
docs = load_document(request.resume_url)
if not docs:
print("Invalid resume URL: Unable to fetch document") # Debugging print
raise HTTPException(
status_code=400,
detail="Invalid resume URL: Unable to fetch document"
)
resume_docs = "\n".join(f"- {doc.page_content}" for doc in docs)
print(f"Loaded resume documents: {resume_docs}") # Debugging print
full_history_docs = ""
if request.full_history_url:
print(f"Loading full history from URL: {request.full_history_url}") # Debugging print
docs = load_document(request.full_history_url)
if not docs:
print("Invalid full history URL: Unable to fetch document") # Debugging print
raise HTTPException(
status_code=400,
detail="Invalid full history URL: Unable to fetch document"
)
full_history_docs = "\n".join(f"- {doc.page_content}" for doc in docs)
print(f"Loaded full history documents: {full_history_docs}") # Debugging print
form_response_docs = ""
if request.form_id:
print(f"Fetching form response for form_id: {request.form_id}") # Debugging print
try:
x_api_key = os.getenv("BACKEND_XAPI_KEY")
url = f"{os.getenv('BACKEND_BASE_URL')}/v3/api/custom/theme-document/answer/{request.form_id}?x-project={x_api_key}"
result = requests.get(url)
result.raise_for_status() # Ensure we raise an error for bad responses
form_response = result.json()["data"] # Return response in JSON format
form_response_docs = "\n".join(f"- {form_response}")
print(f"Fetched form response: {form_response}") # Debugging print
except requests.RequestException as e:
print(f"Error fetching onboarding data: {str(e)}") # Debugging print
raise HTTPException(
status_code=400,
detail="Unable to fetch onboarding data"
)
# Here you would fetch the conversation data using the conversation_id
# This is a placeholder - replace with your actual conversation data fetching logic
# Get AI-generated theme content
# Get AI-generated theme content
response = ai_chat(
query="NOW GENERATE THE STARTPOP FRAMEWORK",
conversation_id=request.chat_id,
theme_id=request.theme_id,
resume=resume_docs,
full_history=full_history_docs,
form_response=form_response_docs,
generate_theme="YES"
)
print(f"AI Response for theme: {response}")
# Ensure AI response is valid
if not isinstance(response, str):
raise HTTPException(status_code=500, detail="Invalid AI response format")
# Generate PDF
response_data = json.loads(response)
pdf_content = create_pdf(response_data)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = f"theme_{timestamp}.pdf"
# Save the PDF locally temporarily
with open(file_path, "wb") as file:
file.write(pdf_content)
# Upload the PDF to S3 using the API
upload_url = f"{os.getenv('BACKEND_BASE_URL')}/v3/api/custom/theme/doc-upload?x-project={x_api_key}"
with open(file_path, 'rb') as file:
files = {'file': file}
upload_response = requests.post(upload_url, files=files)
# Check if the upload was successful
if upload_response.status_code != 200:
raise HTTPException(status_code=upload_response.status_code, detail="File upload to S3 failed: " + upload_response.text)
upload_data = upload_response.json() # Get the response in JSON format
# Extract the uploaded file URL
theme_url = upload_data.get("url") # Adjust this key based on the actual API response structure
if not theme_url:
raise HTTPException(status_code=500, detail="Failed to retrieve theme URL from upload response")
# Clean up the temporary file
os.remove(file_path)
# Return JSON response with theme URL and text
return {
"theme_url": theme_url,
"theme_text": response_data
}
except Exception as e:
print(f"Error generating theme: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.post("/rescue-career/generate-quiz", response_model=QuizResponse)
async def generate_quiz_endpoint(
request: QuizRequest,
api_key: str = Depends(get_api_key)
):
"""Generate quiz based on PDF content and quiz type."""
# Validate quiz type
if request.quiz_type not in QUIZ_TYPES:
raise HTTPException(
status_code=400,
detail=f"Invalid quiz type. Must be one of: {list(QUIZ_TYPES)}"
)
try:
# Extract text from PDF
pdf_text = await extract_pdf_text(request.pdf_url)
if not pdf_text:
return QuizResponse(
success=False,
message="PDF extraction completed but no text content found",
error="Empty PDF content"
)
# Generate quiz using the existing function
quiz_data = generate_quiz(
startpop_pdf=pdf_text,
quiz_type=request.quiz_type
)
if not quiz_data:
return QuizResponse(
success=False,
message="Quiz generation failed",
error="Unable to generate quiz from the provided content"
)
return QuizResponse(
success=True,
message="Quiz generated successfully",
quiz_data=quiz_data
)
except HTTPException as he:
raise he
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unexpected error during quiz generation: {str(e)}"
)
async def get_conversation_data(conversation_id: str) -> dict:
"""
Fetch conversation data using the conversation ID.
Replace this with your actual implementation to fetch conversation data.
"""
try:
storage_path = "conversations.json"
with open(storage_path, 'r') as f:
convs = json.load(f)
convs_id = convs[conversation_id]
return convs_id
except Exception as e:
print(f"Error fetching conversation data: {e}")
return None
@app.on_event("startup")
async def startup_event():
"""Initialize required components on startup"""
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=5042, reload=True)