glam/scripts/validate_temporal_consistency.py
kempersc 2761857b0d Add scripts for converting OWL/Turtle ontology to Mermaid and PlantUML diagrams
- Implemented `owl_to_mermaid.py` to convert OWL/Turtle files into Mermaid class diagrams.
- Implemented `owl_to_plantuml.py` to convert OWL/Turtle files into PlantUML class diagrams.
- Added two new PlantUML files for custodian multi-aspect diagrams.
2025-11-22 23:01:13 +01:00

636 lines
26 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Temporal Consistency Validator for Heritage Custodian Ontology (v0.7.0)
Validates temporal and bidirectional relationship constraints across:
- CustodianCollection ↔ OrganizationalStructure (Phase 4)
- PersonObservation ↔ OrganizationalStructure (Phase 3)
- OrganizationalChangeEvent impacts (Phase 2)
Usage:
python scripts/validate_temporal_consistency.py <yaml_file>
python scripts/validate_temporal_consistency.py schemas/20251121/examples/*.yaml
Author: Heritage Custodian Ontology Project
Date: 2025-11-22
Schema Version: v0.7.0 (Phase 5: Validation Framework)
"""
import yaml
import sys
from datetime import datetime, date
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from pathlib import Path
# ============================================================================
# Validation Result Classes
# ============================================================================
@dataclass
class ValidationError:
"""Represents a validation error with context."""
rule: str
message: str
entity_id: str
entity_type: str
severity: str = "ERROR" # ERROR, WARNING
def __str__(self):
return f"[{self.severity}] {self.rule}: {self.message}\n Entity: {self.entity_id} ({self.entity_type})"
@dataclass
class ValidationResult:
"""Validation results with errors, warnings, and statistics."""
errors: List[ValidationError] = field(default_factory=list)
warnings: List[ValidationError] = field(default_factory=list)
entities_validated: int = 0
rules_checked: int = 0
def add_error(self, rule: str, message: str, entity_id: str, entity_type: str):
"""Add validation error."""
self.errors.append(ValidationError(rule, message, entity_id, entity_type, "ERROR"))
def add_warning(self, rule: str, message: str, entity_id: str, entity_type: str):
"""Add validation warning."""
self.warnings.append(ValidationError(rule, message, entity_id, entity_type, "WARNING"))
@property
def is_valid(self) -> bool:
"""Check if validation passed (no errors, warnings allowed)."""
return len(self.errors) == 0
@property
def total_issues(self) -> int:
"""Total errors + warnings."""
return len(self.errors) + len(self.warnings)
def print_summary(self):
"""Print validation summary."""
print("\n" + "=" * 80)
print("VALIDATION SUMMARY")
print("=" * 80)
print(f"Entities validated: {self.entities_validated}")
print(f"Rules checked: {self.rules_checked}")
print(f"Errors: {len(self.errors)}")
print(f"Warnings: {len(self.warnings)}")
print(f"Status: {'✅ PASS' if self.is_valid else '❌ FAIL'}")
print("=" * 80)
if self.errors:
print("\n🔴 ERRORS:")
for error in self.errors:
print(f"\n{error}")
if self.warnings:
print("\n🟡 WARNINGS:")
for warning in self.warnings:
print(f"\n{warning}")
if not self.total_issues:
print("\n✅ All validation rules passed!")
# ============================================================================
# Data Loader
# ============================================================================
class DataLoader:
"""Load and organize YAML instances into typed collections."""
def __init__(self, yaml_path: Path):
self.yaml_path = yaml_path
self.organizational_units: Dict[str, Dict] = {}
self.collections: Dict[str, Dict] = {}
self.person_observations: Dict[str, Dict] = {}
self.change_events: Dict[str, Dict] = {}
def load(self):
"""Load YAML file and categorize entities."""
with open(self.yaml_path, 'r', encoding='utf-8') as f:
documents = list(yaml.safe_load_all(f))
for doc in documents:
if not doc or not isinstance(doc, dict):
continue
entity_id = doc.get('id', '')
# Categorize by type based on fields present
if 'unit_name' in doc or 'unit_type' in doc:
self.organizational_units[entity_id] = doc
elif 'collection_name' in doc:
self.collections[entity_id] = doc
elif 'person_name' in doc or 'staff_role' in doc:
self.person_observations[entity_id] = doc
elif 'event_type' in doc and 'event_date' in doc:
self.change_events[entity_id] = doc
return self
# ============================================================================
# Date Utilities
# ============================================================================
def parse_date(date_str: Any) -> Optional[date]:
"""Parse date string to date object (handles ISO format and date objects)."""
if date_str is None:
return None
if isinstance(date_str, date):
return date_str
if isinstance(date_str, datetime):
return date_str.date()
if isinstance(date_str, str):
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00')).date()
except ValueError:
return None
return None
def date_within_range(check_date: Optional[date],
start_date: Optional[date],
end_date: Optional[date]) -> bool:
"""Check if date falls within range (handles None as open-ended)."""
if check_date is None:
return True # Can't validate if date missing
if start_date and check_date < start_date:
return False
if end_date and check_date > end_date:
return False
return True
# ============================================================================
# Validation Rules
# ============================================================================
class TemporalValidator:
"""Validates temporal consistency rules."""
def __init__(self, data: DataLoader):
self.data = data
self.result = ValidationResult()
def validate_all(self) -> ValidationResult:
"""Run all validation rules."""
print(f"\n🔍 Validating {self.data.yaml_path.name}...")
print(f" - Organizational units: {len(self.data.organizational_units)}")
print(f" - Collections: {len(self.data.collections)}")
print(f" - Person observations: {len(self.data.person_observations)}")
print(f" - Change events: {len(self.data.change_events)}")
# Collection-Unit Temporal Validation (Phase 4)
self.validate_collection_unit_temporal()
self.result.rules_checked += 1
# Collection-Unit Bidirectional Relationships (Phase 4)
self.validate_collection_unit_bidirectional()
self.result.rules_checked += 1
# Custody Transfer Continuity (Phase 4)
self.validate_custody_continuity()
self.result.rules_checked += 1
# Staff-Unit Temporal Validation (Phase 3)
self.validate_staff_unit_temporal()
self.result.rules_checked += 1
# Staff-Unit Bidirectional Relationships (Phase 3)
self.validate_staff_unit_bidirectional()
self.result.rules_checked += 1
# Update entity count
self.result.entities_validated = (
len(self.data.organizational_units) +
len(self.data.collections) +
len(self.data.person_observations)
)
return self.result
# ========================================================================
# Rule 1: Collection-Unit Temporal Consistency (Phase 4)
# ========================================================================
def validate_collection_unit_temporal(self):
"""
RULE: Collection custody dates must fit within managing unit validity period.
Constraints:
1. collection.valid_from >= unit.valid_from
2. collection.valid_to <= unit.valid_to (if unit dissolved)
Rationale: Collection cannot be managed by unit that doesn't exist.
"""
for coll_id, collection in self.data.collections.items():
managing_unit_id = collection.get('managing_unit')
if not managing_unit_id:
# No managing unit specified (might be optional in some contexts)
continue
unit = self.data.organizational_units.get(managing_unit_id)
if not unit:
self.result.add_error(
rule="COLLECTION_UNIT_TEMPORAL",
message=f"Collection references non-existent managing unit: {managing_unit_id}",
entity_id=coll_id,
entity_type="CustodianCollection"
)
continue
# Parse dates
coll_start = parse_date(collection.get('valid_from'))
coll_end = parse_date(collection.get('valid_to'))
unit_start = parse_date(unit.get('valid_from'))
unit_end = parse_date(unit.get('valid_to'))
# Check start date constraint
if coll_start and unit_start and coll_start < unit_start:
self.result.add_error(
rule="COLLECTION_UNIT_TEMPORAL",
message=(
f"Collection custody starts ({coll_start}) before managing unit exists ({unit_start}). "
f"Managing unit: {unit.get('unit_name', managing_unit_id)}"
),
entity_id=coll_id,
entity_type="CustodianCollection"
)
# Check end date constraint (only if unit dissolved)
if unit_end and coll_end and coll_end > unit_end:
self.result.add_error(
rule="COLLECTION_UNIT_TEMPORAL",
message=(
f"Collection custody extends ({coll_end}) beyond managing unit validity ({unit_end}). "
f"Managing unit: {unit.get('unit_name', managing_unit_id)}"
),
entity_id=coll_id,
entity_type="CustodianCollection"
)
# Warning: Collection still active after unit dissolved
if unit_end and not coll_end:
self.result.add_warning(
rule="COLLECTION_UNIT_TEMPORAL",
message=(
f"Collection custody ongoing but managing unit dissolved ({unit_end}). "
f"Missing custody transfer? Managing unit: {unit.get('unit_name', managing_unit_id)}"
),
entity_id=coll_id,
entity_type="CustodianCollection"
)
# ========================================================================
# Rule 2: Collection-Unit Bidirectional Consistency (Phase 4)
# ========================================================================
def validate_collection_unit_bidirectional(self):
"""
RULE: Bidirectional relationships must be consistent.
Constraints:
1. IF collection.managing_unit = unit_id
THEN unit.managed_collections MUST include collection_id
2. IF unit.managed_collections includes collection_id
THEN collection.managing_unit MUST equal unit_id
Rationale: Forward and reverse relationships must match.
"""
# Check forward direction (collection → unit)
for coll_id, collection in self.data.collections.items():
managing_unit_id = collection.get('managing_unit')
if not managing_unit_id:
continue
unit = self.data.organizational_units.get(managing_unit_id)
if not unit:
continue # Already flagged in temporal validation
managed_collections = unit.get('managed_collections', [])
if coll_id not in managed_collections:
self.result.add_error(
rule="COLLECTION_UNIT_BIDIRECTIONAL",
message=(
f"Collection references unit '{unit.get('unit_name', managing_unit_id)}' as managing_unit, "
f"but unit does not list collection in managed_collections. "
f"Add collection to unit.managed_collections."
),
entity_id=coll_id,
entity_type="CustodianCollection"
)
# Check reverse direction (unit → collections)
for unit_id, unit in self.data.organizational_units.items():
managed_collections = unit.get('managed_collections', [])
for coll_id in managed_collections:
collection = self.data.collections.get(coll_id)
if not collection:
self.result.add_error(
rule="COLLECTION_UNIT_BIDIRECTIONAL",
message=(
f"Unit references non-existent collection: {coll_id}. "
f"Remove from unit.managed_collections or create collection."
),
entity_id=unit_id,
entity_type="OrganizationalStructure"
)
continue
managing_unit_id = collection.get('managing_unit')
if managing_unit_id != unit_id:
self.result.add_error(
rule="COLLECTION_UNIT_BIDIRECTIONAL",
message=(
f"Unit lists collection '{collection.get('collection_name', coll_id)}' "
f"in managed_collections, but collection's managing_unit is "
f"'{managing_unit_id}' (should be '{unit_id}'). "
f"Update collection.managing_unit."
),
entity_id=unit_id,
entity_type="OrganizationalStructure"
)
# ========================================================================
# Rule 3: Custody Transfer Continuity (Phase 4)
# ========================================================================
def validate_custody_continuity(self):
"""
RULE: Collection custody transfers must be continuous (no gaps).
Constraints:
1. IF collection version 1 ends (valid_to = T1)
AND collection version 2 exists with same collection_name
THEN version 2 must start at T1 or T1+1 day
Rationale: Collections don't disappear; custody must transfer during org changes.
Note: This requires grouping collections by name to find versions.
"""
# Group collections by name
collections_by_name: Dict[str, List[Tuple[str, Dict]]] = {}
for coll_id, collection in self.data.collections.items():
name = collection.get('collection_name', '')
if name:
collections_by_name.setdefault(name, []).append((coll_id, collection))
# Check continuity for collections with multiple versions
for name, versions in collections_by_name.items():
if len(versions) < 2:
continue # Single version, no continuity to check
# Sort by valid_from date
sorted_versions = sorted(
versions,
key=lambda v: parse_date(v[1].get('valid_from')) or date.min
)
for i in range(len(sorted_versions) - 1):
current_id, current = sorted_versions[i]
next_id, next_version = sorted_versions[i + 1]
current_end = parse_date(current.get('valid_to'))
next_start = parse_date(next_version.get('valid_from'))
if current_end and next_start:
gap_days = (next_start - current_end).days
if gap_days < 0:
self.result.add_error(
rule="CUSTODY_CONTINUITY",
message=(
f"Collection '{name}' has overlapping custody periods: "
f"version ending {current_end} overlaps with version starting {next_start} "
f"(overlap: {abs(gap_days)} days). "
f"Current: {current_id}, Next: {next_id}"
),
entity_id=current_id,
entity_type="CustodianCollection"
)
elif gap_days > 1:
self.result.add_warning(
rule="CUSTODY_CONTINUITY",
message=(
f"Collection '{name}' has custody gap: "
f"version ending {current_end}, next version starting {next_start} "
f"(gap: {gap_days} days). "
f"Expected continuous custody transfer. "
f"Current: {current_id}, Next: {next_id}"
),
entity_id=current_id,
entity_type="CustodianCollection"
)
# ========================================================================
# Rule 4: Staff-Unit Temporal Consistency (Phase 3)
# ========================================================================
def validate_staff_unit_temporal(self):
"""
RULE: Staff role dates must fit within organizational unit validity period.
Constraints:
1. person_obs.role_start_date >= unit.valid_from
2. person_obs.role_end_date <= unit.valid_to (if unit dissolved)
Rationale: Person cannot work for unit that doesn't exist.
"""
for person_id, person_obs in self.data.person_observations.items():
unit_affiliation_id = person_obs.get('unit_affiliation')
if not unit_affiliation_id:
continue
unit = self.data.organizational_units.get(unit_affiliation_id)
if not unit:
self.result.add_error(
rule="STAFF_UNIT_TEMPORAL",
message=f"Person references non-existent unit: {unit_affiliation_id}",
entity_id=person_id,
entity_type="PersonObservation"
)
continue
# Parse dates (PersonObservation uses role_start_date / role_end_date)
role_start = parse_date(person_obs.get('role_start_date'))
role_end = parse_date(person_obs.get('role_end_date'))
unit_start = parse_date(unit.get('valid_from'))
unit_end = parse_date(unit.get('valid_to'))
# Check start date constraint
if role_start and unit_start and role_start < unit_start:
self.result.add_error(
rule="STAFF_UNIT_TEMPORAL",
message=(
f"Staff role starts ({role_start}) before unit exists ({unit_start}). "
f"Unit: {unit.get('unit_name', unit_affiliation_id)}, "
f"Person: {person_obs.get('person_name', person_id)}"
),
entity_id=person_id,
entity_type="PersonObservation"
)
# Check end date constraint
if unit_end and role_end and role_end > unit_end:
self.result.add_error(
rule="STAFF_UNIT_TEMPORAL",
message=(
f"Staff role extends ({role_end}) beyond unit validity ({unit_end}). "
f"Unit: {unit.get('unit_name', unit_affiliation_id)}, "
f"Person: {person_obs.get('person_name', person_id)}"
),
entity_id=person_id,
entity_type="PersonObservation"
)
# Warning: Role ongoing after unit dissolved
if unit_end and not role_end:
self.result.add_warning(
rule="STAFF_UNIT_TEMPORAL",
message=(
f"Staff role ongoing but unit dissolved ({unit_end}). "
f"Missing staff reassignment? "
f"Unit: {unit.get('unit_name', unit_affiliation_id)}, "
f"Person: {person_obs.get('person_name', person_id)}"
),
entity_id=person_id,
entity_type="PersonObservation"
)
# ========================================================================
# Rule 5: Staff-Unit Bidirectional Consistency (Phase 3)
# ========================================================================
def validate_staff_unit_bidirectional(self):
"""
RULE: Bidirectional staff-unit relationships must be consistent.
Constraints:
1. IF person_obs.unit_affiliation = unit_id
THEN unit.staff_members MUST include person_obs_id
2. IF unit.staff_members includes person_obs_id
THEN person_obs.unit_affiliation MUST equal unit_id
Rationale: Forward and reverse relationships must match.
"""
# Check forward direction (person → unit)
for person_id, person_obs in self.data.person_observations.items():
unit_affiliation_id = person_obs.get('unit_affiliation')
if not unit_affiliation_id:
continue
unit = self.data.organizational_units.get(unit_affiliation_id)
if not unit:
continue # Already flagged in temporal validation
staff_members = unit.get('staff_members', [])
if person_id not in staff_members:
self.result.add_error(
rule="STAFF_UNIT_BIDIRECTIONAL",
message=(
f"Person references unit '{unit.get('unit_name', unit_affiliation_id)}' as unit_affiliation, "
f"but unit does not list person in staff_members. "
f"Add person to unit.staff_members. "
f"Person: {person_obs.get('person_name', person_id)}"
),
entity_id=person_id,
entity_type="PersonObservation"
)
# Check reverse direction (unit → staff)
for unit_id, unit in self.data.organizational_units.items():
staff_members = unit.get('staff_members', [])
for person_id in staff_members:
person_obs = self.data.person_observations.get(person_id)
if not person_obs:
self.result.add_error(
rule="STAFF_UNIT_BIDIRECTIONAL",
message=(
f"Unit references non-existent person: {person_id}. "
f"Remove from unit.staff_members or create PersonObservation."
),
entity_id=unit_id,
entity_type="OrganizationalStructure"
)
continue
unit_affiliation_id = person_obs.get('unit_affiliation')
if unit_affiliation_id != unit_id:
self.result.add_error(
rule="STAFF_UNIT_BIDIRECTIONAL",
message=(
f"Unit lists person '{person_obs.get('person_name', person_id)}' "
f"in staff_members, but person's unit_affiliation is "
f"'{unit_affiliation_id}' (should be '{unit_id}'). "
f"Update person_obs.unit_affiliation."
),
entity_id=unit_id,
entity_type="OrganizationalStructure"
)
# ============================================================================
# CLI Interface
# ============================================================================
def main():
"""Main validation entry point."""
if len(sys.argv) < 2:
print("Usage: python scripts/validate_temporal_consistency.py <yaml_file> [<yaml_file2> ...]")
print("\nExample:")
print(" python scripts/validate_temporal_consistency.py schemas/20251121/examples/collection_department_integration_examples.yaml")
sys.exit(1)
yaml_files = [Path(arg) for arg in sys.argv[1:]]
print("\n" + "=" * 80)
print("HERITAGE CUSTODIAN ONTOLOGY - TEMPORAL CONSISTENCY VALIDATOR")
print("Schema Version: v0.7.0 (Phase 5)")
print("=" * 80)
all_results = []
for yaml_file in yaml_files:
if not yaml_file.exists():
print(f"\n❌ File not found: {yaml_file}")
continue
# Load data
data = DataLoader(yaml_file).load()
# Validate
validator = TemporalValidator(data)
result = validator.validate_all()
all_results.append((yaml_file.name, result))
# Print result
result.print_summary()
# Overall summary
print("\n" + "=" * 80)
print("OVERALL VALIDATION SUMMARY")
print("=" * 80)
print(f"Files validated: {len(all_results)}")
total_errors = sum(len(r.errors) for _, r in all_results)
total_warnings = sum(len(r.warnings) for _, r in all_results)
print(f"Total errors: {total_errors}")
print(f"Total warnings: {total_warnings}")
all_valid = all(r.is_valid for _, r in all_results)
print(f"Overall status: {'✅ ALL PASS' if all_valid else '❌ SOME FAILURES'}")
print("=" * 80)
# Exit with error code if validation failed
sys.exit(0 if all_valid else 1)
if __name__ == "__main__":
main()