added flight assessment agent

This commit is contained in:
OwusuBlessing
2025-08-01 19:34:03 +01:00
parent d391e966cb
commit a185815e24
3 changed files with 0 additions and 455 deletions
-329
View File
@@ -1,329 +0,0 @@
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import re
class DroneWeatherDataExtractor:
"""
Extract location data from drone booking forms and fetch structured
weather information for later use in analysis systems.
"""
def __init__(self):
self.base_url = "https://api.open-meteo.com/v1/forecast"
def extract_coordinates(self, booking_data: Dict) -> Tuple[float, float]:
"""Extract latitude and longitude from booking form data."""
try:
# Try to get from GPS coordinates field
gps_coords = booking_data.get("site_information", {}).get("gps_coordinates", {})
if isinstance(gps_coords, dict):
lat_str = gps_coords.get("latitude", "")
lng_str = gps_coords.get("longitude", "")
# Parse coordinate strings like "53.4408° N" and "2.2426° W"
lat_match = re.search(r'([\d.]+)', lat_str)
lng_match = re.search(r'([\d.]+)', lng_str)
if lat_match and lng_match:
latitude = float(lat_match.group(1))
longitude = float(lng_match.group(1))
# Handle direction indicators
if 'S' in lat_str.upper():
latitude = -latitude
if 'W' in lng_str.upper():
longitude = -longitude
return latitude, longitude
# Fallback: try to extract from site_location string
site_location = booking_data.get("form", {}).get("site_location", "")
coord_pattern = r'GPS: ([\d.]+)° ([NS]), ([\d.]+)° ([EW])'
match = re.search(coord_pattern, site_location)
if match:
lat, lat_dir, lng, lng_dir = match.groups()
latitude = float(lat) if lat_dir == 'N' else -float(lat)
longitude = float(lng) if lng_dir == 'E' else -float(lng)
print(f"Extracted coordinates: {latitude}, {longitude}")
return latitude, longitude
except (ValueError, KeyError) as e:
print(f"Error extracting coordinates: {e}")
raise ValueError("Could not extract valid coordinates from booking data")
def extract_booking_info(self, booking_data: Dict) -> Dict:
"""Extract relevant booking information in structured format."""
try:
# Extract date information
assigned_date = booking_data.get("job_overview", {}).get("assigned_date")
preferred_dates = booking_data.get("form", {}).get("preferred_dates")
selected_slot = booking_data.get("form", {}).get("selected_slot")
# Extract timing information
timing = booking_data.get("timing", {})
start_time = timing.get("start_time")
end_time = timing.get("end_time")
duration = timing.get("survey_duration")
# Extract site information
site_info = booking_data.get("site_information", {})
form_info = booking_data.get("form", {})
return {
"job_id": booking_data.get("job_id"),
"job_number": booking_data.get("job_overview", {}).get("job_number"),
"site_name": site_info.get("site_name"),
"region": site_info.get("region"),
"full_address": site_info.get("full_address"),
"asset_type": form_info.get("asset_type"),
"system_size": form_info.get("system_size"),
"survey_purpose": form_info.get("survey_purpose"),
"assigned_engineer": form_info.get("assigned_engineer"),
"contact_person": form_info.get("contact_person"),
"contact_phone": form_info.get("contact_phone"),
"dates": {
"assigned_date": assigned_date,
"preferred_dates": preferred_dates,
"selected_slot": selected_slot
},
"timing": {
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"buffer_time": timing.get("buffer_time")
},
"access_details": {
"access_type": form_info.get("access_type"),
"vehicle_access": form_info.get("vehicle_access"),
"access_details": form_info.get("access_details")
},
"safety_requirements": form_info.get("special_safety_requirements"),
"additional_requirements": form_info.get("additional_requirements"),
"booking_timestamp": form_info.get("booking_timestamp"),
"booking_id": form_info.get("booking_id")
}
except Exception as e:
print(f"Error extracting booking info: {e}")
return {}
def parse_target_date(self, booking_data: Dict) -> datetime:
"""Extract and parse the target date from booking data."""
try:
# Try different date fields
date_fields = [
booking_data.get("job_overview", {}).get("assigned_date"),
booking_data.get("form", {}).get("preferred_dates"),
booking_data.get("form", {}).get("selected_slot")
]
for date_field in date_fields:
if date_field:
# Handle different date formats
if "January 23, 2025" in date_field or "January 23rd, 2025" in date_field:
return datetime(2025, 1, 23)
elif "January 24" in date_field:
return datetime(2025, 1, 24)
# Default fallback
return datetime.now() + timedelta(days=1)
except Exception as e:
print(f"Error parsing date: {e}")
return datetime.now() + timedelta(days=1)
async def fetch_weather_data(self, latitude: float, longitude: float,
target_date: datetime, days_range: int = 3) -> Dict:
"""Fetch structured weather data for specific coordinates and date range."""
# Calculate date range around target date
start_date = target_date - timedelta(days=days_range//2)
end_date = target_date + timedelta(days=days_range//2)
params = {
"latitude": latitude,
"longitude": longitude,
"current": [
"temperature_2m",
"relative_humidity_2m",
"wind_speed_10m"
# "wind_direction_10m",
# "weather_code",
# "pressure_msl",
# "cloud_cover"
],
"hourly": [
"temperature_2m",
"relative_humidity_2m"
# "wind_speed_10m",
# "wind_direction_10m",
# "precipitation",
# "weather_code",
# "visibility",
# "cloud_cover",
# "pressure_msl"
],
# "daily": [
# "temperature_2m_max",
# "temperature_2m_min"]
# # "sunrise",
# "sunset",
# "precipitation_sum",
# "wind_speed_10m_max",
# "wind_direction_10m_dominant"
# ],
# "start_date": start_date.strftime("%Y-%m-%d"),
# "end_date": end_date.strftime("%Y-%m-%d"),
# "timezone": "auto"
}
# Convert lists to comma-separated strings for API
for key, value in params.items():
if isinstance(value, list):
params[key] = ",".join(value)
async with aiohttp.ClientSession() as session:
async with session.get(self.base_url, params=params) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Weather API request failed: {response.status}")
def structure_weather_data(self, raw_weather: Dict, target_date: datetime) -> Dict:
"""Structure raw weather data into organized format."""
try:
structured_data = {
"location": {
"latitude": raw_weather.get("latitude"),
"longitude": raw_weather.get("longitude"),
"elevation": raw_weather.get("elevation"),
"timezone": raw_weather.get("timezone")
},
"current_conditions": raw_weather.get("current", {}),
"hourly_forecast": {
"times": raw_weather.get("hourly", {}).get("time", []),
"temperature": raw_weather.get("hourly", {}).get("temperature_2m", []),
"humidity": raw_weather.get("hourly", {}).get("relative_humidity_2m", []),
"wind_speed": raw_weather.get("hourly", {}).get("wind_speed_10m", []),
"wind_direction": raw_weather.get("hourly", {}).get("wind_direction_10m", []),
"precipitation": raw_weather.get("hourly", {}).get("precipitation", []),
"weather_code": raw_weather.get("hourly", {}).get("weather_code", []),
"visibility": raw_weather.get("hourly", {}).get("visibility", []),
"cloud_cover": raw_weather.get("hourly", {}).get("cloud_cover", []),
"pressure": raw_weather.get("hourly", {}).get("pressure_msl", [])
},
"daily_forecast": {
"dates": raw_weather.get("daily", {}).get("time", []),
"temperature_max": raw_weather.get("daily", {}).get("temperature_2m_max", []),
"temperature_min": raw_weather.get("daily", {}).get("temperature_2m_min", []),
"sunrise": raw_weather.get("daily", {}).get("sunrise", []),
"sunset": raw_weather.get("daily", {}).get("sunset", []),
"precipitation_sum": raw_weather.get("daily", {}).get("precipitation_sum", []),
"wind_speed_max": raw_weather.get("daily", {}).get("wind_speed_10m_max", []),
"wind_direction_dominant": raw_weather.get("daily", {}).get("wind_direction_10m_dominant", [])
},
"target_date": target_date.strftime("%Y-%m-%d"),
"data_retrieved_at": datetime.now().isoformat()
}
return structured_data
except Exception as e:
return {"error": f"Error structuring weather data: {e}"}
async def extract_booking_weather_data(self, booking_data: Dict) -> Dict:
"""Main extraction function - returns structured data for external use."""
try:
# Extract all booking information
booking_info = self.extract_booking_info(booking_data)
# Extract coordinates and target date
latitude, longitude = self.extract_coordinates(booking_data)
print(f"Latitude: {latitude}, Longitude: {longitude}")
target_date = self.parse_target_date(booking_data)
# Fetch raw weather data
raw_weather = await self.fetch_weather_data(latitude, longitude, target_date)
# Structure the weather data
structured_weather = self.structure_weather_data(raw_weather, target_date)
# Return combined structured data
return {
# "booking_data": booking_info,
# "coordinates": {
# "latitude": latitude,
# "longitude": longitude
# },
# "target_date": target_date.strftime("%Y-%m-%d"),
"weather_data": structured_weather,
"extraction_metadata": {
"processed_at": datetime.now().isoformat(),
"api_endpoint": self.base_url,
"data_source": "open-meteo"
}
}
except Exception as e:
return {
"error": f"Data extraction failed: {e}",
"job_id": booking_data.get("job_id", "unknown"),
"processed_at": datetime.now().isoformat()
}
# Example usage
async def main():
"""Demonstrate the data extraction system."""
# Sample booking data
booking_form_input = {
"job_id": "1043",
"job_overview": {
"job_number": "Job #1043",
"site_name": "Hightower Solar Farm",
"assigned_date": "January 23, 2025",
"status": "Scheduled"
},
"site_information": {
"site_name": "Hightower Solar Farm",
"region": "North England",
"full_address": "Grange Lane, Manchester M34 7TF",
"gps_coordinates": {
"latitude": "13.41° N",
"longitude": "52.52 W"
}
},
"timing": {
"start_time": "09:00 AM",
"end_time": "10:30 AM",
"survey_duration": "3045 mins",
"buffer_time": "45 mins"
},
"form": {
"asset_type": "Solar Farm",
"system_size": "5.2 MW capacity, approximately 16,000 panels across 12 hectares",
"survey_purpose": "Insurance assessment",
"assigned_engineer": "David Wilson - 0161-555-0876",
"contact_person": "Sarah Thompson",
"contact_phone": "0161-555-0234"
}
}
# Initialize extractor
extractor = DroneWeatherDataExtractor()
# Extract structured data
result = await extractor.extract_booking_weather_data(booking_form_input)
# Display structured output
print("=== STRUCTURED DATA EXTRACTION ===")
print(json.dumps(result, indent=2))
if __name__ == "__main__":
asyncio.run(main())
-48
View File
@@ -1,48 +0,0 @@
booking_form_input = {
"job_id": "1043",
"job_overview": {
"job_number": "Job #1043",
"site_name": "Hightower Solar Farm",
"assigned_date": "January 23, 2025",
"status": "Scheduled"
},
"site_information": {
"site_name": "Hightower Solar Farm",
"region": "North England",
"full_address": "Grange Lane, Manchester M34 7TF",
"gps_coordinates": {
"latitude": "53.4408° N",
"longitude": "2.2426° W"
}
},
"timing": {
"start_time": "09:00 AM",
"end_time": "10:30 AM",
"survey_duration": "3045 mins",
"buffer_time": "45 mins"
},
"form": {
"asset_type": "Solar Farm",
"asset_details": "null",
"site_name": "Hightower Solar Farm",
"site_location": "Grange Lane, Manchester M34 7TF, GPS: 53.4408° N, 2.2426° W",
"system_size": "5.2 MW capacity, approximately 16,000 panels across 12 hectares",
"access_type": "Restricted Access",
"access_details": "Main gate access code: 7841. Contact facility manager Sarah Thompson at 0161-555-0234 for entry. Follow yellow markers to operations building. Site requires visitor registration.",
"vehicle_access": "Yes - Vehicle Accessible",
"contact_person": "Sarah Thompson",
"contact_phone": "0161-555-0234",
"contact_role": "Facility Manager",
"contact_availability": "Available on-site",
"special_safety_requirements": "Hard hats, high-vis vests, and steel-toe boots mandatory. Safety induction required (20 minutes). Keep 100m distance from substation area. Weather restriction: no flights in winds >15mph.",
"survey_purpose": "Insurance assessment",
"survey_purpose_details": "Annual insurance inspection required for policy renewal. Focus on structural integrity, panel condition, and inverter housing. Insurance company requires thermal imaging and detailed photographic documentation.",
"additional_requirements": "Morning survey preferred for optimal lighting conditions. Need detailed documentation of any panel damage or hotspots. Provide georeferenced images for insurance mapping. Weather backup date: January 24th same time.",
"timing_preference": "Within 1 week",
"preferred_dates": "January 23rd morning preferred, backup January 24th",
"selected_slot": "January 23rd, 2025 - 09:00 AM - 10:30 AM",
"assigned_engineer": "David Wilson - 0161-555-0876",
"booking_timestamp": "2025-01-15T10:45:22Z",
"booking_id": "DSV-2025-0115-007"
}
}
-78
View File
@@ -1,78 +0,0 @@
import os
import json
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from src.prompts.templates.flght_prompt import flight_prompt # You must save the prompt function here
from src.config.llm_config import LlmConfig
from config import Config
from src.components.data_extraction.weather_data import DroneWeatherDataExtractor
class DroneAssessmentAgent:
def __init__(self):
self.llm = ChatOpenAI(
api_key=Config.OPENAI_API_KEY,
model=LlmConfig.openai.models.gpt_4o,
temperature=0.3
)
self.weather_extractor = DroneWeatherDataExtractor()
async def run(self, booking_form: str) -> dict:
"""
Run the drone environmental & safety assessment agent.
Args:
booking_form_text (str): Raw text of the booking form
Returns:
dict: JSON output of the AI assessment
"""
weather_data = await self.weather_extractor.extract_booking_weather_data(booking_form)
booking_form_text = json.dumps(booking_form)
prompt = flight_prompt(booking_form_text, weather_data) # Contains the instructions + JSON template
messages = [
SystemMessage(content=prompt),
HumanMessage(content=booking_form_text)
]
try:
response = self.llm.invoke(messages)
if hasattr(response, "content"):
response_text = response.content.strip()
print("Raw LLM Response:\n", response_text[:300], "...\n")
# Attempt to parse JSON from response
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == -1:
raise ValueError("No JSON object found in output")
json_str = response_text[start_idx:end_idx]
return json.loads(json_str)
else:
raise ValueError("LLM returned no usable content")
except Exception as e:
print(f"Error in DroneAssessmentAgent: {str(e)}")
return {
"error": str(e),
"raw_response": response.content if 'response' in locals() else None
}
async def main():
"""Async main function to properly handle the async agent."""
from test2 import booking_form_input
agent = DroneAssessmentAgent()
result = await agent.run(booking_form_input)
print("\nStructured Assessment Output:\n")
print(json.dumps(result, indent=2))
if __name__ == "__main__":
asyncio.run(main())