Complete Enhanced Agricultural AI System - All Requirements Met
This commit is contained in:
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
Batch processing utilities for handling large volumes of agricultural photos
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import pandas as pd
|
||||
from typing import List, Dict, Callable, Optional
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import logging
|
||||
|
||||
class BatchProcessor:
|
||||
"""Handles batch processing of agricultural photos with progress tracking"""
|
||||
|
||||
def __init__(self, max_workers: int = 4, batch_size: int = 500):
|
||||
"""
|
||||
Initialize batch processor
|
||||
|
||||
Args:
|
||||
max_workers: Maximum number of parallel workers
|
||||
batch_size: Maximum images per batch
|
||||
"""
|
||||
self.max_workers = max_workers
|
||||
self.batch_size = batch_size
|
||||
self.setup_logging()
|
||||
|
||||
def setup_logging(self):
|
||||
"""Setup logging for batch processing"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('outputs/batch_processing.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def process_batch(self,
|
||||
image_files: List[str],
|
||||
process_function: Callable,
|
||||
output_file: str,
|
||||
resume_from: int = 0) -> Dict[str, any]:
|
||||
"""
|
||||
Process a batch of images with progress tracking and error handling
|
||||
|
||||
Args:
|
||||
image_files: List of image file paths
|
||||
process_function: Function to process each image
|
||||
output_file: Path to save results CSV
|
||||
resume_from: Index to resume processing from
|
||||
|
||||
Returns:
|
||||
Processing statistics
|
||||
"""
|
||||
start_time = time.time()
|
||||
total_images = len(image_files)
|
||||
|
||||
self.logger.info(f"Starting batch processing of {total_images} images")
|
||||
self.logger.info(f"Batch size: {self.batch_size}, Max workers: {self.max_workers}")
|
||||
|
||||
# Split into batches
|
||||
batches = self._split_into_batches(image_files[resume_from:])
|
||||
results = []
|
||||
errors = []
|
||||
processing_times = []
|
||||
|
||||
for batch_idx, batch in enumerate(batches):
|
||||
batch_start = time.time()
|
||||
self.logger.info(f"Processing batch {batch_idx + 1}/{len(batches)} ({len(batch)} images)")
|
||||
|
||||
# Process batch with parallel workers
|
||||
batch_results, batch_errors = self._process_single_batch(batch, process_function)
|
||||
|
||||
results.extend(batch_results)
|
||||
errors.extend(batch_errors)
|
||||
|
||||
batch_time = time.time() - batch_start
|
||||
processing_times.append(batch_time)
|
||||
|
||||
# Save intermediate results
|
||||
if results:
|
||||
self._save_intermediate_results(results, output_file, batch_idx)
|
||||
|
||||
# Progress update
|
||||
completed = resume_from + len(results)
|
||||
progress = (completed / total_images) * 100
|
||||
self.logger.info(f"Progress: {completed}/{total_images} ({progress:.1f}%) - Batch time: {batch_time:.1f}s")
|
||||
|
||||
# Final statistics
|
||||
total_time = time.time() - start_time
|
||||
stats = self._calculate_statistics(total_images, len(results), len(errors),
|
||||
total_time, processing_times)
|
||||
|
||||
self.logger.info(f"Batch processing completed: {stats}")
|
||||
return stats
|
||||
|
||||
def _split_into_batches(self, image_files: List[str]) -> List[List[str]]:
|
||||
"""Split image files into manageable batches"""
|
||||
batches = []
|
||||
for i in range(0, len(image_files), self.batch_size):
|
||||
batch = image_files[i:i + self.batch_size]
|
||||
batches.append(batch)
|
||||
return batches
|
||||
|
||||
def _process_single_batch(self, batch: List[str], process_function: Callable) -> tuple:
|
||||
"""Process a single batch with parallel workers"""
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
# Submit all tasks
|
||||
future_to_file = {
|
||||
executor.submit(self._safe_process_image, img_path, process_function): img_path
|
||||
for img_path in batch
|
||||
}
|
||||
|
||||
# Collect results
|
||||
for future in as_completed(future_to_file):
|
||||
img_path = future_to_file[future]
|
||||
try:
|
||||
result = future.result()
|
||||
if result:
|
||||
results.append(result)
|
||||
else:
|
||||
errors.append({'file': img_path, 'error': 'No result returned'})
|
||||
except Exception as e:
|
||||
errors.append({'file': img_path, 'error': str(e)})
|
||||
|
||||
return results, errors
|
||||
|
||||
def _safe_process_image(self, img_path: str, process_function: Callable) -> Optional[Dict]:
|
||||
"""Safely process a single image with error handling"""
|
||||
try:
|
||||
return process_function(img_path)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing {img_path}: {e}")
|
||||
return None
|
||||
|
||||
def _save_intermediate_results(self, results: List[Dict], output_file: str, batch_idx: int):
|
||||
"""Save intermediate results to prevent data loss"""
|
||||
try:
|
||||
df = pd.DataFrame(results)
|
||||
|
||||
# Save main file
|
||||
df.to_csv(output_file, index=False)
|
||||
|
||||
# Save backup
|
||||
backup_file = output_file.replace('.csv', f'_backup_batch_{batch_idx}.csv')
|
||||
df.to_csv(backup_file, index=False)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error saving intermediate results: {e}")
|
||||
|
||||
def _calculate_statistics(self, total: int, successful: int, errors: int,
|
||||
total_time: float, batch_times: List[float]) -> Dict[str, any]:
|
||||
"""Calculate processing statistics"""
|
||||
avg_batch_time = sum(batch_times) / len(batch_times) if batch_times else 0
|
||||
success_rate = (successful / total) * 100 if total > 0 else 0
|
||||
|
||||
return {
|
||||
'total_images': total,
|
||||
'successful': successful,
|
||||
'errors': errors,
|
||||
'success_rate': round(success_rate, 1),
|
||||
'total_time_minutes': round(total_time / 60, 2),
|
||||
'average_batch_time': round(avg_batch_time, 2),
|
||||
'images_per_minute': round(successful / (total_time / 60), 1) if total_time > 0 else 0
|
||||
}
|
||||
|
||||
class ProgressTracker:
|
||||
"""Track and display processing progress"""
|
||||
|
||||
def __init__(self, total_items: int):
|
||||
self.total_items = total_items
|
||||
self.completed = 0
|
||||
self.start_time = time.time()
|
||||
|
||||
def update(self, increment: int = 1):
|
||||
"""Update progress"""
|
||||
self.completed += increment
|
||||
self._display_progress()
|
||||
|
||||
def _display_progress(self):
|
||||
"""Display current progress"""
|
||||
if self.total_items == 0:
|
||||
return
|
||||
|
||||
progress = (self.completed / self.total_items) * 100
|
||||
elapsed = time.time() - self.start_time
|
||||
|
||||
if self.completed > 0:
|
||||
eta = (elapsed / self.completed) * (self.total_items - self.completed)
|
||||
eta_str = f"ETA: {eta/60:.1f}m" if eta > 60 else f"ETA: {eta:.0f}s"
|
||||
else:
|
||||
eta_str = "ETA: --"
|
||||
|
||||
print(f"\rProgress: {self.completed}/{self.total_items} ({progress:.1f}%) - {eta_str}", end='', flush=True)
|
||||
|
||||
if self.completed >= self.total_items:
|
||||
print(f"\nCompleted in {elapsed/60:.1f} minutes")
|
||||
|
||||
def estimate_processing_time(num_images: int, avg_time_per_image: float = 3.0) -> Dict[str, str]:
|
||||
"""Estimate processing time for given number of images"""
|
||||
total_seconds = num_images * avg_time_per_image
|
||||
|
||||
if total_seconds < 60:
|
||||
return {'estimate': f"{total_seconds:.0f} seconds", 'total_seconds': total_seconds}
|
||||
elif total_seconds < 3600:
|
||||
return {'estimate': f"{total_seconds/60:.1f} minutes", 'total_seconds': total_seconds}
|
||||
else:
|
||||
hours = total_seconds // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
return {'estimate': f"{hours:.0f}h {minutes:.0f}m", 'total_seconds': total_seconds}
|
||||
Reference in New Issue
Block a user