#!/usr/bin/env python3 """ Temporal Consistency Validator for Heritage Custodian Ontology (v0.7.0) Validates temporal and bidirectional relationship constraints across: - CustodianCollection ↔ OrganizationalStructure (Phase 4) - PersonObservation ↔ OrganizationalStructure (Phase 3) - OrganizationalChangeEvent impacts (Phase 2) Usage: python scripts/validate_temporal_consistency.py python scripts/validate_temporal_consistency.py schemas/20251121/examples/*.yaml Author: Heritage Custodian Ontology Project Date: 2025-11-22 Schema Version: v0.7.0 (Phase 5: Validation Framework) """ import yaml import sys from datetime import datetime, date from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, field from pathlib import Path # ============================================================================ # Validation Result Classes # ============================================================================ @dataclass class ValidationError: """Represents a validation error with context.""" rule: str message: str entity_id: str entity_type: str severity: str = "ERROR" # ERROR, WARNING def __str__(self): return f"[{self.severity}] {self.rule}: {self.message}\n Entity: {self.entity_id} ({self.entity_type})" @dataclass class ValidationResult: """Validation results with errors, warnings, and statistics.""" errors: List[ValidationError] = field(default_factory=list) warnings: List[ValidationError] = field(default_factory=list) entities_validated: int = 0 rules_checked: int = 0 def add_error(self, rule: str, message: str, entity_id: str, entity_type: str): """Add validation error.""" self.errors.append(ValidationError(rule, message, entity_id, entity_type, "ERROR")) def add_warning(self, rule: str, message: str, entity_id: str, entity_type: str): """Add validation warning.""" self.warnings.append(ValidationError(rule, message, entity_id, entity_type, "WARNING")) @property def is_valid(self) -> bool: """Check if validation passed (no errors, warnings allowed).""" return len(self.errors) == 0 @property def total_issues(self) -> int: """Total errors + warnings.""" return len(self.errors) + len(self.warnings) def print_summary(self): """Print validation summary.""" print("\n" + "=" * 80) print("VALIDATION SUMMARY") print("=" * 80) print(f"Entities validated: {self.entities_validated}") print(f"Rules checked: {self.rules_checked}") print(f"Errors: {len(self.errors)}") print(f"Warnings: {len(self.warnings)}") print(f"Status: {'āœ… PASS' if self.is_valid else 'āŒ FAIL'}") print("=" * 80) if self.errors: print("\nšŸ”“ ERRORS:") for error in self.errors: print(f"\n{error}") if self.warnings: print("\n🟔 WARNINGS:") for warning in self.warnings: print(f"\n{warning}") if not self.total_issues: print("\nāœ… All validation rules passed!") # ============================================================================ # Data Loader # ============================================================================ class DataLoader: """Load and organize YAML instances into typed collections.""" def __init__(self, yaml_path: Path): self.yaml_path = yaml_path self.organizational_units: Dict[str, Dict] = {} self.collections: Dict[str, Dict] = {} self.person_observations: Dict[str, Dict] = {} self.change_events: Dict[str, Dict] = {} def load(self): """Load YAML file and categorize entities.""" with open(self.yaml_path, 'r', encoding='utf-8') as f: documents = list(yaml.safe_load_all(f)) for doc in documents: if not doc or not isinstance(doc, dict): continue entity_id = doc.get('id', '') # Categorize by type based on fields present if 'unit_name' in doc or 'unit_type' in doc: self.organizational_units[entity_id] = doc elif 'collection_name' in doc: self.collections[entity_id] = doc elif 'person_name' in doc or 'staff_role' in doc: self.person_observations[entity_id] = doc elif 'event_type' in doc and 'event_date' in doc: self.change_events[entity_id] = doc return self # ============================================================================ # Date Utilities # ============================================================================ def parse_date(date_str: Any) -> Optional[date]: """Parse date string to date object (handles ISO format and date objects).""" if date_str is None: return None if isinstance(date_str, date): return date_str if isinstance(date_str, datetime): return date_str.date() if isinstance(date_str, str): try: return datetime.fromisoformat(date_str.replace('Z', '+00:00')).date() except ValueError: return None return None def date_within_range(check_date: Optional[date], start_date: Optional[date], end_date: Optional[date]) -> bool: """Check if date falls within range (handles None as open-ended).""" if check_date is None: return True # Can't validate if date missing if start_date and check_date < start_date: return False if end_date and check_date > end_date: return False return True # ============================================================================ # Validation Rules # ============================================================================ class TemporalValidator: """Validates temporal consistency rules.""" def __init__(self, data: DataLoader): self.data = data self.result = ValidationResult() def validate_all(self) -> ValidationResult: """Run all validation rules.""" print(f"\nšŸ” Validating {self.data.yaml_path.name}...") print(f" - Organizational units: {len(self.data.organizational_units)}") print(f" - Collections: {len(self.data.collections)}") print(f" - Person observations: {len(self.data.person_observations)}") print(f" - Change events: {len(self.data.change_events)}") # Collection-Unit Temporal Validation (Phase 4) self.validate_collection_unit_temporal() self.result.rules_checked += 1 # Collection-Unit Bidirectional Relationships (Phase 4) self.validate_collection_unit_bidirectional() self.result.rules_checked += 1 # Custody Transfer Continuity (Phase 4) self.validate_custody_continuity() self.result.rules_checked += 1 # Staff-Unit Temporal Validation (Phase 3) self.validate_staff_unit_temporal() self.result.rules_checked += 1 # Staff-Unit Bidirectional Relationships (Phase 3) self.validate_staff_unit_bidirectional() self.result.rules_checked += 1 # Update entity count self.result.entities_validated = ( len(self.data.organizational_units) + len(self.data.collections) + len(self.data.person_observations) ) return self.result # ======================================================================== # Rule 1: Collection-Unit Temporal Consistency (Phase 4) # ======================================================================== def validate_collection_unit_temporal(self): """ RULE: Collection custody dates must fit within managing unit validity period. Constraints: 1. collection.valid_from >= unit.valid_from 2. collection.valid_to <= unit.valid_to (if unit dissolved) Rationale: Collection cannot be managed by unit that doesn't exist. """ for coll_id, collection in self.data.collections.items(): managing_unit_id = collection.get('managing_unit') if not managing_unit_id: # No managing unit specified (might be optional in some contexts) continue unit = self.data.organizational_units.get(managing_unit_id) if not unit: self.result.add_error( rule="COLLECTION_UNIT_TEMPORAL", message=f"Collection references non-existent managing unit: {managing_unit_id}", entity_id=coll_id, entity_type="CustodianCollection" ) continue # Parse dates coll_start = parse_date(collection.get('valid_from')) coll_end = parse_date(collection.get('valid_to')) unit_start = parse_date(unit.get('valid_from')) unit_end = parse_date(unit.get('valid_to')) # Check start date constraint if coll_start and unit_start and coll_start < unit_start: self.result.add_error( rule="COLLECTION_UNIT_TEMPORAL", message=( f"Collection custody starts ({coll_start}) before managing unit exists ({unit_start}). " f"Managing unit: {unit.get('unit_name', managing_unit_id)}" ), entity_id=coll_id, entity_type="CustodianCollection" ) # Check end date constraint (only if unit dissolved) if unit_end and coll_end and coll_end > unit_end: self.result.add_error( rule="COLLECTION_UNIT_TEMPORAL", message=( f"Collection custody extends ({coll_end}) beyond managing unit validity ({unit_end}). " f"Managing unit: {unit.get('unit_name', managing_unit_id)}" ), entity_id=coll_id, entity_type="CustodianCollection" ) # Warning: Collection still active after unit dissolved if unit_end and not coll_end: self.result.add_warning( rule="COLLECTION_UNIT_TEMPORAL", message=( f"Collection custody ongoing but managing unit dissolved ({unit_end}). " f"Missing custody transfer? Managing unit: {unit.get('unit_name', managing_unit_id)}" ), entity_id=coll_id, entity_type="CustodianCollection" ) # ======================================================================== # Rule 2: Collection-Unit Bidirectional Consistency (Phase 4) # ======================================================================== def validate_collection_unit_bidirectional(self): """ RULE: Bidirectional relationships must be consistent. Constraints: 1. IF collection.managing_unit = unit_id THEN unit.managed_collections MUST include collection_id 2. IF unit.managed_collections includes collection_id THEN collection.managing_unit MUST equal unit_id Rationale: Forward and reverse relationships must match. """ # Check forward direction (collection → unit) for coll_id, collection in self.data.collections.items(): managing_unit_id = collection.get('managing_unit') if not managing_unit_id: continue unit = self.data.organizational_units.get(managing_unit_id) if not unit: continue # Already flagged in temporal validation managed_collections = unit.get('managed_collections', []) if coll_id not in managed_collections: self.result.add_error( rule="COLLECTION_UNIT_BIDIRECTIONAL", message=( f"Collection references unit '{unit.get('unit_name', managing_unit_id)}' as managing_unit, " f"but unit does not list collection in managed_collections. " f"Add collection to unit.managed_collections." ), entity_id=coll_id, entity_type="CustodianCollection" ) # Check reverse direction (unit → collections) for unit_id, unit in self.data.organizational_units.items(): managed_collections = unit.get('managed_collections', []) for coll_id in managed_collections: collection = self.data.collections.get(coll_id) if not collection: self.result.add_error( rule="COLLECTION_UNIT_BIDIRECTIONAL", message=( f"Unit references non-existent collection: {coll_id}. " f"Remove from unit.managed_collections or create collection." ), entity_id=unit_id, entity_type="OrganizationalStructure" ) continue managing_unit_id = collection.get('managing_unit') if managing_unit_id != unit_id: self.result.add_error( rule="COLLECTION_UNIT_BIDIRECTIONAL", message=( f"Unit lists collection '{collection.get('collection_name', coll_id)}' " f"in managed_collections, but collection's managing_unit is " f"'{managing_unit_id}' (should be '{unit_id}'). " f"Update collection.managing_unit." ), entity_id=unit_id, entity_type="OrganizationalStructure" ) # ======================================================================== # Rule 3: Custody Transfer Continuity (Phase 4) # ======================================================================== def validate_custody_continuity(self): """ RULE: Collection custody transfers must be continuous (no gaps). Constraints: 1. IF collection version 1 ends (valid_to = T1) AND collection version 2 exists with same collection_name THEN version 2 must start at T1 or T1+1 day Rationale: Collections don't disappear; custody must transfer during org changes. Note: This requires grouping collections by name to find versions. """ # Group collections by name collections_by_name: Dict[str, List[Tuple[str, Dict]]] = {} for coll_id, collection in self.data.collections.items(): name = collection.get('collection_name', '') if name: collections_by_name.setdefault(name, []).append((coll_id, collection)) # Check continuity for collections with multiple versions for name, versions in collections_by_name.items(): if len(versions) < 2: continue # Single version, no continuity to check # Sort by valid_from date sorted_versions = sorted( versions, key=lambda v: parse_date(v[1].get('valid_from')) or date.min ) for i in range(len(sorted_versions) - 1): current_id, current = sorted_versions[i] next_id, next_version = sorted_versions[i + 1] current_end = parse_date(current.get('valid_to')) next_start = parse_date(next_version.get('valid_from')) if current_end and next_start: gap_days = (next_start - current_end).days if gap_days < 0: self.result.add_error( rule="CUSTODY_CONTINUITY", message=( f"Collection '{name}' has overlapping custody periods: " f"version ending {current_end} overlaps with version starting {next_start} " f"(overlap: {abs(gap_days)} days). " f"Current: {current_id}, Next: {next_id}" ), entity_id=current_id, entity_type="CustodianCollection" ) elif gap_days > 1: self.result.add_warning( rule="CUSTODY_CONTINUITY", message=( f"Collection '{name}' has custody gap: " f"version ending {current_end}, next version starting {next_start} " f"(gap: {gap_days} days). " f"Expected continuous custody transfer. " f"Current: {current_id}, Next: {next_id}" ), entity_id=current_id, entity_type="CustodianCollection" ) # ======================================================================== # Rule 4: Staff-Unit Temporal Consistency (Phase 3) # ======================================================================== def validate_staff_unit_temporal(self): """ RULE: Staff role dates must fit within organizational unit validity period. Constraints: 1. person_obs.role_start_date >= unit.valid_from 2. person_obs.role_end_date <= unit.valid_to (if unit dissolved) Rationale: Person cannot work for unit that doesn't exist. """ for person_id, person_obs in self.data.person_observations.items(): unit_affiliation_id = person_obs.get('unit_affiliation') if not unit_affiliation_id: continue unit = self.data.organizational_units.get(unit_affiliation_id) if not unit: self.result.add_error( rule="STAFF_UNIT_TEMPORAL", message=f"Person references non-existent unit: {unit_affiliation_id}", entity_id=person_id, entity_type="PersonObservation" ) continue # Parse dates (PersonObservation uses role_start_date / role_end_date) role_start = parse_date(person_obs.get('role_start_date')) role_end = parse_date(person_obs.get('role_end_date')) unit_start = parse_date(unit.get('valid_from')) unit_end = parse_date(unit.get('valid_to')) # Check start date constraint if role_start and unit_start and role_start < unit_start: self.result.add_error( rule="STAFF_UNIT_TEMPORAL", message=( f"Staff role starts ({role_start}) before unit exists ({unit_start}). " f"Unit: {unit.get('unit_name', unit_affiliation_id)}, " f"Person: {person_obs.get('person_name', person_id)}" ), entity_id=person_id, entity_type="PersonObservation" ) # Check end date constraint if unit_end and role_end and role_end > unit_end: self.result.add_error( rule="STAFF_UNIT_TEMPORAL", message=( f"Staff role extends ({role_end}) beyond unit validity ({unit_end}). " f"Unit: {unit.get('unit_name', unit_affiliation_id)}, " f"Person: {person_obs.get('person_name', person_id)}" ), entity_id=person_id, entity_type="PersonObservation" ) # Warning: Role ongoing after unit dissolved if unit_end and not role_end: self.result.add_warning( rule="STAFF_UNIT_TEMPORAL", message=( f"Staff role ongoing but unit dissolved ({unit_end}). " f"Missing staff reassignment? " f"Unit: {unit.get('unit_name', unit_affiliation_id)}, " f"Person: {person_obs.get('person_name', person_id)}" ), entity_id=person_id, entity_type="PersonObservation" ) # ======================================================================== # Rule 5: Staff-Unit Bidirectional Consistency (Phase 3) # ======================================================================== def validate_staff_unit_bidirectional(self): """ RULE: Bidirectional staff-unit relationships must be consistent. Constraints: 1. IF person_obs.unit_affiliation = unit_id THEN unit.staff_members MUST include person_obs_id 2. IF unit.staff_members includes person_obs_id THEN person_obs.unit_affiliation MUST equal unit_id Rationale: Forward and reverse relationships must match. """ # Check forward direction (person → unit) for person_id, person_obs in self.data.person_observations.items(): unit_affiliation_id = person_obs.get('unit_affiliation') if not unit_affiliation_id: continue unit = self.data.organizational_units.get(unit_affiliation_id) if not unit: continue # Already flagged in temporal validation staff_members = unit.get('staff_members', []) if person_id not in staff_members: self.result.add_error( rule="STAFF_UNIT_BIDIRECTIONAL", message=( f"Person references unit '{unit.get('unit_name', unit_affiliation_id)}' as unit_affiliation, " f"but unit does not list person in staff_members. " f"Add person to unit.staff_members. " f"Person: {person_obs.get('person_name', person_id)}" ), entity_id=person_id, entity_type="PersonObservation" ) # Check reverse direction (unit → staff) for unit_id, unit in self.data.organizational_units.items(): staff_members = unit.get('staff_members', []) for person_id in staff_members: person_obs = self.data.person_observations.get(person_id) if not person_obs: self.result.add_error( rule="STAFF_UNIT_BIDIRECTIONAL", message=( f"Unit references non-existent person: {person_id}. " f"Remove from unit.staff_members or create PersonObservation." ), entity_id=unit_id, entity_type="OrganizationalStructure" ) continue unit_affiliation_id = person_obs.get('unit_affiliation') if unit_affiliation_id != unit_id: self.result.add_error( rule="STAFF_UNIT_BIDIRECTIONAL", message=( f"Unit lists person '{person_obs.get('person_name', person_id)}' " f"in staff_members, but person's unit_affiliation is " f"'{unit_affiliation_id}' (should be '{unit_id}'). " f"Update person_obs.unit_affiliation." ), entity_id=unit_id, entity_type="OrganizationalStructure" ) # ============================================================================ # CLI Interface # ============================================================================ def main(): """Main validation entry point.""" if len(sys.argv) < 2: print("Usage: python scripts/validate_temporal_consistency.py [ ...]") print("\nExample:") print(" python scripts/validate_temporal_consistency.py schemas/20251121/examples/collection_department_integration_examples.yaml") sys.exit(1) yaml_files = [Path(arg) for arg in sys.argv[1:]] print("\n" + "=" * 80) print("HERITAGE CUSTODIAN ONTOLOGY - TEMPORAL CONSISTENCY VALIDATOR") print("Schema Version: v0.7.0 (Phase 5)") print("=" * 80) all_results = [] for yaml_file in yaml_files: if not yaml_file.exists(): print(f"\nāŒ File not found: {yaml_file}") continue # Load data data = DataLoader(yaml_file).load() # Validate validator = TemporalValidator(data) result = validator.validate_all() all_results.append((yaml_file.name, result)) # Print result result.print_summary() # Overall summary print("\n" + "=" * 80) print("OVERALL VALIDATION SUMMARY") print("=" * 80) print(f"Files validated: {len(all_results)}") total_errors = sum(len(r.errors) for _, r in all_results) total_warnings = sum(len(r.warnings) for _, r in all_results) print(f"Total errors: {total_errors}") print(f"Total warnings: {total_warnings}") all_valid = all(r.is_valid for _, r in all_results) print(f"Overall status: {'āœ… ALL PASS' if all_valid else 'āŒ SOME FAILURES'}") print("=" * 80) # Exit with error code if validation failed sys.exit(0 if all_valid else 1) if __name__ == "__main__": main()