# Standards management import json import os from typing import Dict, List, Optional, BinaryIO, Tuple import uuid from loguru import logger from app.core.models import Standard, Requirement, RequirementSeverity from app.utils.helpers import load_standards_from_file from app.services.standards_matcher import StandardsMatcher # Singleton instance to ensure all parts of the application use the same standards _standards_service_instance = None class StandardsService: """Service for managing compliance standards.""" def __new__(cls): """Implement singleton pattern to ensure all parts of the app use the same standards.""" global _standards_service_instance if _standards_service_instance is None: _standards_service_instance = super(StandardsService, cls).__new__(cls) _standards_service_instance.standards = {} # In-memory storage for standards _standards_service_instance.matcher = StandardsMatcher() # Advanced standards matching logic _standards_service_instance._load_default_standards() return _standards_service_instance def __init__(self): """Initialize the standards service.""" # Initialization is done in __new__ for the singleton pattern def _load_default_standards(self): """Load default standards from the standards directory.""" standards_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "standard") if not os.path.exists(standards_dir): logger.warning(f"Standards directory not found: {standards_dir}") return for filename in os.listdir(standards_dir): if filename.endswith(".json"): try: file_path = os.path.join(standards_dir, filename) standards_data = load_standards_from_file(file_path) if "standards" in standards_data: for std_data in standards_data["standards"]: standard = self._create_standard_from_data(std_data) self.standards[standard.id] = standard logger.info(f"Loaded standard: {standard.name} ({standard.id})") except Exception as e: logger.error(f"Error loading standard from {filename}: {str(e)}") def _create_standard_from_data(self, data: Dict) -> Standard: """ Create a Standard object from dictionary data. Args: data: Dictionary containing standard data Returns: Standard object """ requirements = [] if "requirements" in data: for req_data in data["requirements"]: # Map severity string to RequirementSeverity enum severity_str = req_data.get("severity", "minor").lower() if severity_str == "critical": severity = RequirementSeverity.CRITICAL elif severity_str == "major": severity = RequirementSeverity.MAJOR elif severity_str == "info": severity = RequirementSeverity.INFO else: severity = RequirementSeverity.MINOR requirement = Requirement( id=req_data.get("id", str(uuid.uuid4())), description=req_data.get("description", ""), severity=severity, details=req_data.get("details", None) ) requirements.append(requirement) return Standard( id=data.get("id", str(uuid.uuid4())), name=data.get("name", "Unnamed Standard"), description=data.get("description", ""), requirements=requirements ) async def get_all_standards(self) -> List[Standard]: """ Get all available standards. Returns: List of Standard objects """ return list(self.standards.values()) async def get_standard(self, standard_id: str) -> Optional[Standard]: """ Get a standard by ID. Args: standard_id: ID of the standard to retrieve Returns: Standard object if found, None otherwise """ return self.standards.get(standard_id) async def get_standard_by_name(self, name: str) -> Optional[Standard]: """ Get a standard by name (case-insensitive). Args: name: Name of the standard to retrieve Returns: Standard object if found, None otherwise """ name_lower = name.lower() for standard in self.standards.values(): if standard.name.lower() == name_lower: return standard return None async def upload_standard(self, file: BinaryIO, filename: str) -> Standard: """ Upload and process a standard definition file. Args: file: The standard definition file (JSON) filename: Name of the uploaded file Returns: Standard object """ try: # Read file content content = await self._read_file_content(file) # Parse JSON data = json.loads(content) if "standards" in data and isinstance(data["standards"], list): # Multiple standards in file standards = [] for std_data in data["standards"]: standard = self._create_standard_from_data(std_data) self.standards[standard.id] = standard standards.append(standard) logger.info(f"Uploaded standard: {standard.name} (ID: {standard.id}) with {len(standard.requirements)} requirements") # Log the current standards count after upload logger.info(f"Total standards in system after upload: {len(self.standards)}") # Return the first standard for simplicity return standards[0] if standards else None else: # Single standard in file standard = self._create_standard_from_data(data) self.standards[standard.id] = standard logger.info(f"Uploaded standard: {standard.name} (ID: {standard.id}) with {len(standard.requirements)} requirements") # Log the current standards count after upload logger.info(f"Total standards in system after upload: {len(self.standards)}") return standard except json.JSONDecodeError: raise ValueError("Invalid JSON format in standard definition file") except Exception as e: logger.error(f"Error processing standard file: {str(e)}") raise async def _read_file_content(self, file: BinaryIO) -> str: """ Read and decode file content. Args: file: The file to read Returns: File content as string """ file_content = file.read() # Try to decode as UTF-8 try: return file_content.decode('utf-8') except UnicodeDecodeError: # Try other encodings if UTF-8 fails try: return file_content.decode('latin-1') except: raise ValueError("Unable to decode file content. Please ensure file is text-based.") async def get_standard_names_for_document(self, document_content: str) -> List[str]: """ Identify which standards might be relevant for a document based on content. Uses advanced matching logic to find the most relevant standards. Args: document_content: The document content Returns: List of standard names that might be relevant """ # Default standards to use if no matches are found DEFAULT_STANDARDS = ["ISO-9001", "IEEE-829", "RFC-2119"] # Log available standards for debugging logger.info(f"Available standards in the system: {len(self.standards)}") for std_id, std in self.standards.items(): logger.info(f" - {std.name} (ID: {std_id})") # If no standards are available, return defaults if not self.standards: logger.warning("No standards available in the system. Using default standards.") return DEFAULT_STANDARDS # Use the standards matcher to find relevant standards standard_scores = self.matcher.find_relevant_standards( document_content=document_content, standards=list(self.standards.values()), threshold=0.1, # Minimum relevance threshold max_standards=5 # Maximum number of standards to return ) # Log the matching results if standard_scores: logger.info(f"Found {len(standard_scores)} relevant standards:") for name, score in standard_scores: logger.info(f" - {name}: relevance score {score:.2f}") else: logger.info("No relevant standards found based on document content.") # Extract standard names from the results relevant_standards = [std[0] for std in standard_scores] # If no relevant standards found, use defaults if not relevant_standards: logger.info(f"Using default standards: {DEFAULT_STANDARDS}") return DEFAULT_STANDARDS return relevant_standards