Files
ds_drone_bot/src/llm/agent/flight_assesment.py
T
2025-08-01 19:33:30 +01:00

91 lines
3.0 KiB
Python

import os
import json
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from src.prompts.templates.flght_prompt import flight_prompt
from src.config.llm_config import LlmConfig
from config import Config
from src.components.data_extraction.weather_data import DroneWeatherDataExtractor
from logger import logger
class DroneAssessmentAgent:
def __init__(self):
self.llm = ChatOpenAI(
api_key=Config.OPENAI_API_KEY,
model=LlmConfig.openai.models.gpt_4o,
temperature=0.3
)
self.weather_extractor = DroneWeatherDataExtractor()
async def run(self, booking_form: str) -> dict:
"""
Run the drone environmental & safety assessment agent.
Args:
booking_form (str): Structured booking form input
Returns:
dict: AI-generated structured output
"""
logger.info("Starting DroneAssessmentAgent run...")
try:
# Step 1: Fetch weather data
weather_data = await self.weather_extractor.extract_booking_weather_data(booking_form)
booking_form_text = json.dumps(booking_form)
# Step 2: Generate prompt
prompt = flight_prompt(booking_form_text, weather_data)
messages = [
SystemMessage(content=prompt),
HumanMessage(content=booking_form_text)
]
# Step 3: Invoke LLM
logger.debug("Sending prompt to LLM...")
response = self.llm.invoke(messages)
if hasattr(response, "content"):
response_text = response.content.strip()
logger.info("Received LLM response.")
logger.debug(f"LLM Raw Output (first 300 chars):\n{response_text[:300]}")
# Extract JSON block
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == -1:
raise ValueError("No JSON object found in output")
json_str = response_text[start_idx:end_idx]
return json.loads(json_str)
else:
raise ValueError("LLM returned no usable content")
except Exception as e:
logger.exception("Error in DroneAssessmentAgent")
return {
"error": str(e),
"raw_response": response.content if 'response' in locals() and hasattr(response, "content") else None
}
async def main():
"""Async main function to test the drone assessment agent."""
from test2 import booking_form_input
logger.info("Launching DroneAssessmentAgent from main()...")
agent = DroneAssessmentAgent()
result = await agent.run(booking_form_input)
logger.info("Drone assessment completed.")
logger.debug("Final structured output:")
logger.debug(json.dumps(result, indent=2))
if __name__ == "__main__":
asyncio.run(main())