Files
ds-smart-farm-project/src/utils/batch_processor.py
T
2025-07-16 20:35:20 +01:00

215 lines
8.2 KiB
Python

"""
Batch processing utilities for handling large volumes of agricultural photos
"""
import os
import time
import pandas as pd
from typing import List, Dict, Callable, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
class BatchProcessor:
"""Handles batch processing of agricultural photos with progress tracking"""
def __init__(self, max_workers: int = 4, batch_size: int = 500):
"""
Initialize batch processor
Args:
max_workers: Maximum number of parallel workers
batch_size: Maximum images per batch
"""
self.max_workers = max_workers
self.batch_size = batch_size
self.setup_logging()
def setup_logging(self):
"""Setup logging for batch processing"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('outputs/batch_processing.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def process_batch(self,
image_files: List[str],
process_function: Callable,
output_file: str,
resume_from: int = 0) -> Dict[str, any]:
"""
Process a batch of images with progress tracking and error handling
Args:
image_files: List of image file paths
process_function: Function to process each image
output_file: Path to save results CSV
resume_from: Index to resume processing from
Returns:
Processing statistics
"""
start_time = time.time()
total_images = len(image_files)
self.logger.info(f"Starting batch processing of {total_images} images")
self.logger.info(f"Batch size: {self.batch_size}, Max workers: {self.max_workers}")
# Split into batches
batches = self._split_into_batches(image_files[resume_from:])
results = []
errors = []
processing_times = []
for batch_idx, batch in enumerate(batches):
batch_start = time.time()
self.logger.info(f"Processing batch {batch_idx + 1}/{len(batches)} ({len(batch)} images)")
# Process batch with parallel workers
batch_results, batch_errors = self._process_single_batch(batch, process_function)
results.extend(batch_results)
errors.extend(batch_errors)
batch_time = time.time() - batch_start
processing_times.append(batch_time)
# Save intermediate results
if results:
self._save_intermediate_results(results, output_file, batch_idx)
# Progress update
completed = resume_from + len(results)
progress = (completed / total_images) * 100
self.logger.info(f"Progress: {completed}/{total_images} ({progress:.1f}%) - Batch time: {batch_time:.1f}s")
# Final statistics
total_time = time.time() - start_time
stats = self._calculate_statistics(total_images, len(results), len(errors),
total_time, processing_times)
self.logger.info(f"Batch processing completed: {stats}")
return stats
def _split_into_batches(self, image_files: List[str]) -> List[List[str]]:
"""Split image files into manageable batches"""
batches = []
for i in range(0, len(image_files), self.batch_size):
batch = image_files[i:i + self.batch_size]
batches.append(batch)
return batches
def _process_single_batch(self, batch: List[str], process_function: Callable) -> tuple:
"""Process a single batch with parallel workers"""
results = []
errors = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(self._safe_process_image, img_path, process_function): img_path
for img_path in batch
}
# Collect results
for future in as_completed(future_to_file):
img_path = future_to_file[future]
try:
result = future.result()
if result:
results.append(result)
else:
errors.append({'file': img_path, 'error': 'No result returned'})
except Exception as e:
errors.append({'file': img_path, 'error': str(e)})
return results, errors
def _safe_process_image(self, img_path: str, process_function: Callable) -> Optional[Dict]:
"""Safely process a single image with error handling"""
try:
return process_function(img_path)
except Exception as e:
self.logger.error(f"Error processing {img_path}: {e}")
return None
def _save_intermediate_results(self, results: List[Dict], output_file: str, batch_idx: int):
"""Save intermediate results to prevent data loss"""
try:
df = pd.DataFrame(results)
# Save main file
df.to_csv(output_file, index=False)
# Save backup
backup_file = output_file.replace('.csv', f'_backup_batch_{batch_idx}.csv')
df.to_csv(backup_file, index=False)
except Exception as e:
self.logger.error(f"Error saving intermediate results: {e}")
def _calculate_statistics(self, total: int, successful: int, errors: int,
total_time: float, batch_times: List[float]) -> Dict[str, any]:
"""Calculate processing statistics"""
avg_batch_time = sum(batch_times) / len(batch_times) if batch_times else 0
success_rate = (successful / total) * 100 if total > 0 else 0
return {
'total_images': total,
'successful': successful,
'errors': errors,
'success_rate': round(success_rate, 1),
'total_time_minutes': round(total_time / 60, 2),
'average_batch_time': round(avg_batch_time, 2),
'images_per_minute': round(successful / (total_time / 60), 1) if total_time > 0 else 0
}
class ProgressTracker:
"""Track and display processing progress"""
def __init__(self, total_items: int):
self.total_items = total_items
self.completed = 0
self.start_time = time.time()
def update(self, increment: int = 1):
"""Update progress"""
self.completed += increment
self._display_progress()
def _display_progress(self):
"""Display current progress"""
if self.total_items == 0:
return
progress = (self.completed / self.total_items) * 100
elapsed = time.time() - self.start_time
if self.completed > 0:
eta = (elapsed / self.completed) * (self.total_items - self.completed)
eta_str = f"ETA: {eta/60:.1f}m" if eta > 60 else f"ETA: {eta:.0f}s"
else:
eta_str = "ETA: --"
print(f"\rProgress: {self.completed}/{self.total_items} ({progress:.1f}%) - {eta_str}", end='', flush=True)
if self.completed >= self.total_items:
print(f"\nCompleted in {elapsed/60:.1f} minutes")
def estimate_processing_time(num_images: int, avg_time_per_image: float = 3.0) -> Dict[str, str]:
"""Estimate processing time for given number of images"""
total_seconds = num_images * avg_time_per_image
if total_seconds < 60:
return {'estimate': f"{total_seconds:.0f} seconds", 'total_seconds': total_seconds}
elif total_seconds < 3600:
return {'estimate': f"{total_seconds/60:.1f} minutes", 'total_seconds': total_seconds}
else:
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
return {'estimate': f"{hours:.0f}h {minutes:.0f}m", 'total_seconds': total_seconds}