#!/usr/bin/env python3 """ LinkML Custom Validators for Heritage Custodian Ontology Implements validation rules as Python functions that can be called during LinkML data loading. These complement SHACL shapes (Phase 7) by providing validation at the YAML instance level before RDF conversion. Usage: from linkml_validators import validate_collection_unit_temporal errors = validate_collection_unit_temporal(collection, unit) if errors: print(f"Validation failed: {errors}") Author: Heritage Custodian Ontology Project Date: 2025-11-22 Schema Version: v0.7.0 (Phase 8: LinkML Constraints) """ from datetime import date, datetime from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass # ============================================================================ # Validation Error Class # ============================================================================ @dataclass class ValidationError: """Validation error with context.""" rule_id: str message: str entity_id: str entity_type: str field: Optional[str] = None severity: str = "ERROR" # ERROR, WARNING, INFO def __str__(self): field_str = f" (field: {self.field})" if self.field else "" return f"[{self.severity}] {self.rule_id}: {self.message}{field_str}\n Entity: {self.entity_id} ({self.entity_type})" # ============================================================================ # Helper Functions # ============================================================================ def parse_date(date_value: Any) -> Optional[date]: """Parse date from various formats.""" if isinstance(date_value, date): return date_value if isinstance(date_value, datetime): return date_value.date() if isinstance(date_value, str): try: return datetime.fromisoformat(date_value).date() except ValueError: return None return None def get_field_value(entity: Dict[str, Any], field: str) -> Any: """Safely get field value from entity dict.""" return entity.get(field) # ============================================================================ # Rule 1: Collection-Unit Temporal Consistency # ============================================================================ def validate_collection_unit_temporal( collection: Dict[str, Any], organizational_units: Dict[str, Dict[str, Any]] ) -> List[ValidationError]: """ Validate that collection custody dates fit within managing unit's validity period. Rule 1.1: Collection.valid_from >= OrganizationalStructure.valid_from Rule 1.2: Collection.valid_to <= OrganizationalStructure.valid_to (if unit dissolved) Args: collection: CustodianCollection instance dict organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts Returns: List of ValidationError instances (empty if valid) """ errors = [] collection_id = get_field_value(collection, 'id') managing_unit_id = get_field_value(collection, 'managing_unit') # Skip validation if no managing unit if not managing_unit_id: return errors # Get managing unit managing_unit = organizational_units.get(managing_unit_id) if not managing_unit: errors.append(ValidationError( rule_id="COLLECTION_UNIT_TEMPORAL", message=f"Managing unit not found: {managing_unit_id}", entity_id=collection_id, entity_type="CustodianCollection", field="managing_unit", severity="ERROR" )) return errors # Parse dates collection_start = parse_date(get_field_value(collection, 'valid_from')) collection_end = parse_date(get_field_value(collection, 'valid_to')) unit_start = parse_date(get_field_value(managing_unit, 'valid_from')) unit_end = parse_date(get_field_value(managing_unit, 'valid_to')) # Rule 1.1: Collection starts on or after unit founding if collection_start and unit_start: if collection_start < unit_start: errors.append(ValidationError( rule_id="COLLECTION_UNIT_TEMPORAL_START", message=f"Collection valid_from ({collection_start}) must be >= managing unit valid_from ({unit_start})", entity_id=collection_id, entity_type="CustodianCollection", field="valid_from", severity="ERROR" )) # Rule 1.2: Collection ends on or before unit dissolution (if unit dissolved) if unit_end: if collection_end and collection_end > unit_end: errors.append(ValidationError( rule_id="COLLECTION_UNIT_TEMPORAL_END", message=f"Collection valid_to ({collection_end}) must be <= managing unit valid_to ({unit_end}) when unit is dissolved", entity_id=collection_id, entity_type="CustodianCollection", field="valid_to", severity="ERROR" )) # Warning: Collection ongoing but unit dissolved if not collection_end: errors.append(ValidationError( rule_id="COLLECTION_UNIT_TEMPORAL_ONGOING", message=f"Collection has ongoing custody (no valid_to) but managing unit was dissolved on {unit_end}. Missing custody transfer?", entity_id=collection_id, entity_type="CustodianCollection", field="valid_to", severity="WARNING" )) return errors # ============================================================================ # Rule 2: Collection-Unit Bidirectional Relationships # ============================================================================ def validate_collection_unit_bidirectional( collection: Dict[str, Any], organizational_units: Dict[str, Dict[str, Any]] ) -> List[ValidationError]: """ Validate bidirectional relationship between collection and managing unit. Rule: If collection.managing_unit = unit, then unit.managed_collections must include collection. Args: collection: CustodianCollection instance dict organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts Returns: List of ValidationError instances (empty if valid) """ errors = [] collection_id = get_field_value(collection, 'id') managing_unit_id = get_field_value(collection, 'managing_unit') # Skip if no managing unit if not managing_unit_id: return errors # Get managing unit managing_unit = organizational_units.get(managing_unit_id) if not managing_unit: return errors # Already caught by temporal validator # Check inverse relationship managed_collections = get_field_value(managing_unit, 'managed_collections') or [] if collection_id not in managed_collections: errors.append(ValidationError( rule_id="COLLECTION_UNIT_BIDIRECTIONAL", message=f"Collection references managing_unit {managing_unit_id} but unit does not list collection in managed_collections", entity_id=collection_id, entity_type="CustodianCollection", field="managing_unit", severity="ERROR" )) return errors # ============================================================================ # Rule 4: Staff-Unit Temporal Consistency # ============================================================================ def validate_staff_unit_temporal( person: Dict[str, Any], organizational_units: Dict[str, Dict[str, Any]] ) -> List[ValidationError]: """ Validate that staff employment dates fit within unit's validity period. Rule 4.1: PersonObservation.employment_start_date >= OrganizationalStructure.valid_from Rule 4.2: PersonObservation.employment_end_date <= OrganizationalStructure.valid_to (if unit dissolved) Args: person: PersonObservation instance dict organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts Returns: List of ValidationError instances (empty if valid) """ errors = [] person_id = get_field_value(person, 'id') unit_affiliation_id = get_field_value(person, 'unit_affiliation') # Skip if no unit affiliation if not unit_affiliation_id: return errors # Get unit unit = organizational_units.get(unit_affiliation_id) if not unit: errors.append(ValidationError( rule_id="STAFF_UNIT_TEMPORAL", message=f"Unit affiliation not found: {unit_affiliation_id}", entity_id=person_id, entity_type="PersonObservation", field="unit_affiliation", severity="ERROR" )) return errors # Parse dates employment_start = parse_date(get_field_value(person, 'employment_start_date')) employment_end = parse_date(get_field_value(person, 'employment_end_date')) unit_start = parse_date(get_field_value(unit, 'valid_from')) unit_end = parse_date(get_field_value(unit, 'valid_to')) # Rule 4.1: Employment starts on or after unit founding if employment_start and unit_start: if employment_start < unit_start: errors.append(ValidationError( rule_id="STAFF_UNIT_TEMPORAL_START", message=f"Staff employment_start_date ({employment_start}) must be >= unit valid_from ({unit_start})", entity_id=person_id, entity_type="PersonObservation", field="employment_start_date", severity="ERROR" )) # Rule 4.2: Employment ends on or before unit dissolution (if unit dissolved) if unit_end: if employment_end and employment_end > unit_end: errors.append(ValidationError( rule_id="STAFF_UNIT_TEMPORAL_END", message=f"Staff employment_end_date ({employment_end}) must be <= unit valid_to ({unit_end}) when unit is dissolved", entity_id=person_id, entity_type="PersonObservation", field="employment_end_date", severity="ERROR" )) # Warning: Employment ongoing but unit dissolved if not employment_end: errors.append(ValidationError( rule_id="STAFF_UNIT_TEMPORAL_ONGOING", message=f"Staff has ongoing employment (no employment_end_date) but unit was dissolved on {unit_end}. Missing employment termination?", entity_id=person_id, entity_type="PersonObservation", field="employment_end_date", severity="WARNING" )) return errors # ============================================================================ # Rule 5: Staff-Unit Bidirectional Relationships # ============================================================================ def validate_staff_unit_bidirectional( person: Dict[str, Any], organizational_units: Dict[str, Dict[str, Any]] ) -> List[ValidationError]: """ Validate bidirectional relationship between person and unit. Rule: If person.unit_affiliation = unit, then unit.staff_members must include person. Args: person: PersonObservation instance dict organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts Returns: List of ValidationError instances (empty if valid) """ errors = [] person_id = get_field_value(person, 'id') unit_affiliation_id = get_field_value(person, 'unit_affiliation') # Skip if no unit affiliation if not unit_affiliation_id: return errors # Get unit unit = organizational_units.get(unit_affiliation_id) if not unit: return errors # Already caught by temporal validator # Check inverse relationship staff_members = get_field_value(unit, 'staff_members') or [] if person_id not in staff_members: errors.append(ValidationError( rule_id="STAFF_UNIT_BIDIRECTIONAL", message=f"Person references unit_affiliation {unit_affiliation_id} but unit does not list person in staff_members", entity_id=person_id, entity_type="PersonObservation", field="unit_affiliation", severity="ERROR" )) return errors # ============================================================================ # Batch Validation # ============================================================================ def validate_all( collections: List[Dict[str, Any]], persons: List[Dict[str, Any]], organizational_units: Dict[str, Dict[str, Any]] ) -> Tuple[List[ValidationError], List[ValidationError]]: """ Validate all collections and persons against organizational units. Args: collections: List of CustodianCollection instance dicts persons: List of PersonObservation instance dicts organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts Returns: Tuple of (errors, warnings) """ all_errors = [] all_warnings = [] # Validate collections for collection in collections: errors = validate_collection_unit_temporal(collection, organizational_units) errors += validate_collection_unit_bidirectional(collection, organizational_units) for error in errors: if error.severity == "ERROR": all_errors.append(error) elif error.severity == "WARNING": all_warnings.append(error) # Validate persons for person in persons: errors = validate_staff_unit_temporal(person, organizational_units) errors += validate_staff_unit_bidirectional(person, organizational_units) for error in errors: if error.severity == "ERROR": all_errors.append(error) elif error.severity == "WARNING": all_warnings.append(error) return all_errors, all_warnings # ============================================================================ # CLI Interface (Optional) # ============================================================================ if __name__ == "__main__": import sys import yaml if len(sys.argv) < 2: print("Usage: python linkml_validators.py ") sys.exit(1) # Load YAML file with open(sys.argv[1], 'r') as f: data = list(yaml.safe_load_all(f)) # Separate by type collections = [d for d in data if d.get('collection_name')] persons = [d for d in data if d.get('staff_role')] units = {d['id']: d for d in data if d.get('unit_name')} # Validate errors, warnings = validate_all(collections, persons, units) # Print results print(f"\nValidation Results:") print(f" Errors: {len(errors)}") print(f" Warnings: {len(warnings)}") if errors: print("\nErrors:") for error in errors: print(f" {error}") if warnings: print("\nWarnings:") for warning in warnings: print(f" {warning}") sys.exit(0 if not errors else 1)