Files
ds_scp_task_solution/app/services/standards.py
T
Aherobo Ovie Victor 0e3e22e8cb Initial commit
2025-07-17 22:20:25 +01:00

251 lines
9.4 KiB
Python

# Standards management
import json
import os
from typing import Dict, List, Optional, BinaryIO, Tuple
import uuid
from loguru import logger
from app.core.models import Standard, Requirement, RequirementSeverity
from app.utils.helpers import load_standards_from_file
from app.services.standards_matcher import StandardsMatcher
# Singleton instance to ensure all parts of the application use the same standards
_standards_service_instance = None
class StandardsService:
"""Service for managing compliance standards."""
def __new__(cls):
"""Implement singleton pattern to ensure all parts of the app use the same standards."""
global _standards_service_instance
if _standards_service_instance is None:
_standards_service_instance = super(StandardsService, cls).__new__(cls)
_standards_service_instance.standards = {} # In-memory storage for standards
_standards_service_instance.matcher = StandardsMatcher() # Advanced standards matching logic
_standards_service_instance._load_default_standards()
return _standards_service_instance
def __init__(self):
"""Initialize the standards service."""
# Initialization is done in __new__ for the singleton pattern
def _load_default_standards(self):
"""Load default standards from the standards directory."""
standards_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "standard")
if not os.path.exists(standards_dir):
logger.warning(f"Standards directory not found: {standards_dir}")
return
for filename in os.listdir(standards_dir):
if filename.endswith(".json"):
try:
file_path = os.path.join(standards_dir, filename)
standards_data = load_standards_from_file(file_path)
if "standards" in standards_data:
for std_data in standards_data["standards"]:
standard = self._create_standard_from_data(std_data)
self.standards[standard.id] = standard
logger.info(f"Loaded standard: {standard.name} ({standard.id})")
except Exception as e:
logger.error(f"Error loading standard from {filename}: {str(e)}")
def _create_standard_from_data(self, data: Dict) -> Standard:
"""
Create a Standard object from dictionary data.
Args:
data: Dictionary containing standard data
Returns:
Standard object
"""
requirements = []
if "requirements" in data:
for req_data in data["requirements"]:
# Map severity string to RequirementSeverity enum
severity_str = req_data.get("severity", "minor").lower()
if severity_str == "critical":
severity = RequirementSeverity.CRITICAL
elif severity_str == "major":
severity = RequirementSeverity.MAJOR
elif severity_str == "info":
severity = RequirementSeverity.INFO
else:
severity = RequirementSeverity.MINOR
requirement = Requirement(
id=req_data.get("id", str(uuid.uuid4())),
description=req_data.get("description", ""),
severity=severity,
details=req_data.get("details", None)
)
requirements.append(requirement)
return Standard(
id=data.get("id", str(uuid.uuid4())),
name=data.get("name", "Unnamed Standard"),
description=data.get("description", ""),
requirements=requirements
)
async def get_all_standards(self) -> List[Standard]:
"""
Get all available standards.
Returns:
List of Standard objects
"""
return list(self.standards.values())
async def get_standard(self, standard_id: str) -> Optional[Standard]:
"""
Get a standard by ID.
Args:
standard_id: ID of the standard to retrieve
Returns:
Standard object if found, None otherwise
"""
return self.standards.get(standard_id)
async def get_standard_by_name(self, name: str) -> Optional[Standard]:
"""
Get a standard by name (case-insensitive).
Args:
name: Name of the standard to retrieve
Returns:
Standard object if found, None otherwise
"""
name_lower = name.lower()
for standard in self.standards.values():
if standard.name.lower() == name_lower:
return standard
return None
async def upload_standard(self, file: BinaryIO, filename: str) -> Standard:
"""
Upload and process a standard definition file.
Args:
file: The standard definition file (JSON)
filename: Name of the uploaded file
Returns:
Standard object
"""
try:
# Read file content
content = await self._read_file_content(file)
# Parse JSON
data = json.loads(content)
if "standards" in data and isinstance(data["standards"], list):
# Multiple standards in file
standards = []
for std_data in data["standards"]:
standard = self._create_standard_from_data(std_data)
self.standards[standard.id] = standard
standards.append(standard)
logger.info(f"Uploaded standard: {standard.name} (ID: {standard.id}) with {len(standard.requirements)} requirements")
# Log the current standards count after upload
logger.info(f"Total standards in system after upload: {len(self.standards)}")
# Return the first standard for simplicity
return standards[0] if standards else None
else:
# Single standard in file
standard = self._create_standard_from_data(data)
self.standards[standard.id] = standard
logger.info(f"Uploaded standard: {standard.name} (ID: {standard.id}) with {len(standard.requirements)} requirements")
# Log the current standards count after upload
logger.info(f"Total standards in system after upload: {len(self.standards)}")
return standard
except json.JSONDecodeError:
raise ValueError("Invalid JSON format in standard definition file")
except Exception as e:
logger.error(f"Error processing standard file: {str(e)}")
raise
async def _read_file_content(self, file: BinaryIO) -> str:
"""
Read and decode file content.
Args:
file: The file to read
Returns:
File content as string
"""
file_content = file.read()
# Try to decode as UTF-8
try:
return file_content.decode('utf-8')
except UnicodeDecodeError:
# Try other encodings if UTF-8 fails
try:
return file_content.decode('latin-1')
except:
raise ValueError("Unable to decode file content. Please ensure file is text-based.")
async def get_standard_names_for_document(self, document_content: str) -> List[str]:
"""
Identify which standards might be relevant for a document based on content.
Uses advanced matching logic to find the most relevant standards.
Args:
document_content: The document content
Returns:
List of standard names that might be relevant
"""
# Default standards to use if no matches are found
DEFAULT_STANDARDS = ["ISO-9001", "IEEE-829", "RFC-2119"]
# Log available standards for debugging
logger.info(f"Available standards in the system: {len(self.standards)}")
for std_id, std in self.standards.items():
logger.info(f" - {std.name} (ID: {std_id})")
# If no standards are available, return defaults
if not self.standards:
logger.warning("No standards available in the system. Using default standards.")
return DEFAULT_STANDARDS
# Use the standards matcher to find relevant standards
standard_scores = self.matcher.find_relevant_standards(
document_content=document_content,
standards=list(self.standards.values()),
threshold=0.1, # Minimum relevance threshold
max_standards=5 # Maximum number of standards to return
)
# Log the matching results
if standard_scores:
logger.info(f"Found {len(standard_scores)} relevant standards:")
for name, score in standard_scores:
logger.info(f" - {name}: relevance score {score:.2f}")
else:
logger.info("No relevant standards found based on document content.")
# Extract standard names from the results
relevant_standards = [std[0] for std in standard_scores]
# If no relevant standards found, use defaults
if not relevant_standards:
logger.info(f"Using default standards: {DEFAULT_STANDARDS}")
return DEFAULT_STANDARDS
return relevant_standards