- Created the Country class with ISO 3166-1 alpha-2 and alpha-3 codes, ensuring minimal design without additional metadata. - Integrated the Country class into CustodianPlace and LegalForm schemas to support country-specific feature types and legal forms. - Removed duplicate keys in FeatureTypeEnum.yaml, resulting in 294 unique feature types. - Eliminated "Hypernyms:" text from FeatureTypeEnum descriptions, verifying that semantic relationships are now conveyed through ontology mappings. - Created example instance file demonstrating integration of Country with CustodianPlace and LegalForm. - Updated documentation to reflect the completion of the Country class implementation and hypernyms removal.
429 lines
15 KiB
Python
429 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
LinkML Custom Validators for Heritage Custodian Ontology
|
|
|
|
Implements validation rules as Python functions that can be called during
|
|
LinkML data loading. These complement SHACL shapes (Phase 7) by providing
|
|
validation at the YAML instance level before RDF conversion.
|
|
|
|
Usage:
|
|
from linkml_validators import validate_collection_unit_temporal
|
|
|
|
errors = validate_collection_unit_temporal(collection, unit)
|
|
if errors:
|
|
print(f"Validation failed: {errors}")
|
|
|
|
Author: Heritage Custodian Ontology Project
|
|
Date: 2025-11-22
|
|
Schema Version: v0.7.0 (Phase 8: LinkML Constraints)
|
|
"""
|
|
|
|
from datetime import date, datetime
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
|
|
# ============================================================================
|
|
# Validation Error Class
|
|
# ============================================================================
|
|
|
|
@dataclass
|
|
class ValidationError:
|
|
"""Validation error with context."""
|
|
rule_id: str
|
|
message: str
|
|
entity_id: str
|
|
entity_type: str
|
|
field: Optional[str] = None
|
|
severity: str = "ERROR" # ERROR, WARNING, INFO
|
|
|
|
def __str__(self):
|
|
field_str = f" (field: {self.field})" if self.field else ""
|
|
return f"[{self.severity}] {self.rule_id}: {self.message}{field_str}\n Entity: {self.entity_id} ({self.entity_type})"
|
|
|
|
|
|
# ============================================================================
|
|
# Helper Functions
|
|
# ============================================================================
|
|
|
|
def parse_date(date_value: Any) -> Optional[date]:
|
|
"""Parse date from various formats."""
|
|
if isinstance(date_value, date):
|
|
return date_value
|
|
if isinstance(date_value, datetime):
|
|
return date_value.date()
|
|
if isinstance(date_value, str):
|
|
try:
|
|
return datetime.fromisoformat(date_value).date()
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def get_field_value(entity: Dict[str, Any], field: str) -> Any:
|
|
"""Safely get field value from entity dict."""
|
|
return entity.get(field)
|
|
|
|
|
|
# ============================================================================
|
|
# Rule 1: Collection-Unit Temporal Consistency
|
|
# ============================================================================
|
|
|
|
def validate_collection_unit_temporal(
|
|
collection: Dict[str, Any],
|
|
organizational_units: Dict[str, Dict[str, Any]]
|
|
) -> List[ValidationError]:
|
|
"""
|
|
Validate that collection custody dates fit within managing unit's validity period.
|
|
|
|
Rule 1.1: Collection.valid_from >= OrganizationalStructure.valid_from
|
|
Rule 1.2: Collection.valid_to <= OrganizationalStructure.valid_to (if unit dissolved)
|
|
|
|
Args:
|
|
collection: CustodianCollection instance dict
|
|
organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts
|
|
|
|
Returns:
|
|
List of ValidationError instances (empty if valid)
|
|
"""
|
|
errors = []
|
|
|
|
collection_id = get_field_value(collection, 'id')
|
|
managing_unit_id = get_field_value(collection, 'managing_unit')
|
|
|
|
# Skip validation if no managing unit
|
|
if not managing_unit_id:
|
|
return errors
|
|
|
|
# Get managing unit
|
|
managing_unit = organizational_units.get(managing_unit_id)
|
|
if not managing_unit:
|
|
errors.append(ValidationError(
|
|
rule_id="COLLECTION_UNIT_TEMPORAL",
|
|
message=f"Managing unit not found: {managing_unit_id}",
|
|
entity_id=collection_id,
|
|
entity_type="CustodianCollection",
|
|
field="managing_unit",
|
|
severity="ERROR"
|
|
))
|
|
return errors
|
|
|
|
# Parse dates
|
|
collection_start = parse_date(get_field_value(collection, 'valid_from'))
|
|
collection_end = parse_date(get_field_value(collection, 'valid_to'))
|
|
unit_start = parse_date(get_field_value(managing_unit, 'valid_from'))
|
|
unit_end = parse_date(get_field_value(managing_unit, 'valid_to'))
|
|
|
|
# Rule 1.1: Collection starts on or after unit founding
|
|
if collection_start and unit_start:
|
|
if collection_start < unit_start:
|
|
errors.append(ValidationError(
|
|
rule_id="COLLECTION_UNIT_TEMPORAL_START",
|
|
message=f"Collection valid_from ({collection_start}) must be >= managing unit valid_from ({unit_start})",
|
|
entity_id=collection_id,
|
|
entity_type="CustodianCollection",
|
|
field="valid_from",
|
|
severity="ERROR"
|
|
))
|
|
|
|
# Rule 1.2: Collection ends on or before unit dissolution (if unit dissolved)
|
|
if unit_end:
|
|
if collection_end and collection_end > unit_end:
|
|
errors.append(ValidationError(
|
|
rule_id="COLLECTION_UNIT_TEMPORAL_END",
|
|
message=f"Collection valid_to ({collection_end}) must be <= managing unit valid_to ({unit_end}) when unit is dissolved",
|
|
entity_id=collection_id,
|
|
entity_type="CustodianCollection",
|
|
field="valid_to",
|
|
severity="ERROR"
|
|
))
|
|
|
|
# Warning: Collection ongoing but unit dissolved
|
|
if not collection_end:
|
|
errors.append(ValidationError(
|
|
rule_id="COLLECTION_UNIT_TEMPORAL_ONGOING",
|
|
message=f"Collection has ongoing custody (no valid_to) but managing unit was dissolved on {unit_end}. Missing custody transfer?",
|
|
entity_id=collection_id,
|
|
entity_type="CustodianCollection",
|
|
field="valid_to",
|
|
severity="WARNING"
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
# ============================================================================
|
|
# Rule 2: Collection-Unit Bidirectional Relationships
|
|
# ============================================================================
|
|
|
|
def validate_collection_unit_bidirectional(
|
|
collection: Dict[str, Any],
|
|
organizational_units: Dict[str, Dict[str, Any]]
|
|
) -> List[ValidationError]:
|
|
"""
|
|
Validate bidirectional relationship between collection and managing unit.
|
|
|
|
Rule: If collection.managing_unit = unit, then unit.managed_collections must include collection.
|
|
|
|
Args:
|
|
collection: CustodianCollection instance dict
|
|
organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts
|
|
|
|
Returns:
|
|
List of ValidationError instances (empty if valid)
|
|
"""
|
|
errors = []
|
|
|
|
collection_id = get_field_value(collection, 'id')
|
|
managing_unit_id = get_field_value(collection, 'managing_unit')
|
|
|
|
# Skip if no managing unit
|
|
if not managing_unit_id:
|
|
return errors
|
|
|
|
# Get managing unit
|
|
managing_unit = organizational_units.get(managing_unit_id)
|
|
if not managing_unit:
|
|
return errors # Already caught by temporal validator
|
|
|
|
# Check inverse relationship
|
|
managed_collections = get_field_value(managing_unit, 'managed_collections') or []
|
|
|
|
if collection_id not in managed_collections:
|
|
errors.append(ValidationError(
|
|
rule_id="COLLECTION_UNIT_BIDIRECTIONAL",
|
|
message=f"Collection references managing_unit {managing_unit_id} but unit does not list collection in managed_collections",
|
|
entity_id=collection_id,
|
|
entity_type="CustodianCollection",
|
|
field="managing_unit",
|
|
severity="ERROR"
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
# ============================================================================
|
|
# Rule 4: Staff-Unit Temporal Consistency
|
|
# ============================================================================
|
|
|
|
def validate_staff_unit_temporal(
|
|
person: Dict[str, Any],
|
|
organizational_units: Dict[str, Dict[str, Any]]
|
|
) -> List[ValidationError]:
|
|
"""
|
|
Validate that staff employment dates fit within unit's validity period.
|
|
|
|
Rule 4.1: PersonObservation.employment_start_date >= OrganizationalStructure.valid_from
|
|
Rule 4.2: PersonObservation.employment_end_date <= OrganizationalStructure.valid_to (if unit dissolved)
|
|
|
|
Args:
|
|
person: PersonObservation instance dict
|
|
organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts
|
|
|
|
Returns:
|
|
List of ValidationError instances (empty if valid)
|
|
"""
|
|
errors = []
|
|
|
|
person_id = get_field_value(person, 'id')
|
|
unit_affiliation_id = get_field_value(person, 'unit_affiliation')
|
|
|
|
# Skip if no unit affiliation
|
|
if not unit_affiliation_id:
|
|
return errors
|
|
|
|
# Get unit
|
|
unit = organizational_units.get(unit_affiliation_id)
|
|
if not unit:
|
|
errors.append(ValidationError(
|
|
rule_id="STAFF_UNIT_TEMPORAL",
|
|
message=f"Unit affiliation not found: {unit_affiliation_id}",
|
|
entity_id=person_id,
|
|
entity_type="PersonObservation",
|
|
field="unit_affiliation",
|
|
severity="ERROR"
|
|
))
|
|
return errors
|
|
|
|
# Parse dates
|
|
employment_start = parse_date(get_field_value(person, 'employment_start_date'))
|
|
employment_end = parse_date(get_field_value(person, 'employment_end_date'))
|
|
unit_start = parse_date(get_field_value(unit, 'valid_from'))
|
|
unit_end = parse_date(get_field_value(unit, 'valid_to'))
|
|
|
|
# Rule 4.1: Employment starts on or after unit founding
|
|
if employment_start and unit_start:
|
|
if employment_start < unit_start:
|
|
errors.append(ValidationError(
|
|
rule_id="STAFF_UNIT_TEMPORAL_START",
|
|
message=f"Staff employment_start_date ({employment_start}) must be >= unit valid_from ({unit_start})",
|
|
entity_id=person_id,
|
|
entity_type="PersonObservation",
|
|
field="employment_start_date",
|
|
severity="ERROR"
|
|
))
|
|
|
|
# Rule 4.2: Employment ends on or before unit dissolution (if unit dissolved)
|
|
if unit_end:
|
|
if employment_end and employment_end > unit_end:
|
|
errors.append(ValidationError(
|
|
rule_id="STAFF_UNIT_TEMPORAL_END",
|
|
message=f"Staff employment_end_date ({employment_end}) must be <= unit valid_to ({unit_end}) when unit is dissolved",
|
|
entity_id=person_id,
|
|
entity_type="PersonObservation",
|
|
field="employment_end_date",
|
|
severity="ERROR"
|
|
))
|
|
|
|
# Warning: Employment ongoing but unit dissolved
|
|
if not employment_end:
|
|
errors.append(ValidationError(
|
|
rule_id="STAFF_UNIT_TEMPORAL_ONGOING",
|
|
message=f"Staff has ongoing employment (no employment_end_date) but unit was dissolved on {unit_end}. Missing employment termination?",
|
|
entity_id=person_id,
|
|
entity_type="PersonObservation",
|
|
field="employment_end_date",
|
|
severity="WARNING"
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
# ============================================================================
|
|
# Rule 5: Staff-Unit Bidirectional Relationships
|
|
# ============================================================================
|
|
|
|
def validate_staff_unit_bidirectional(
|
|
person: Dict[str, Any],
|
|
organizational_units: Dict[str, Dict[str, Any]]
|
|
) -> List[ValidationError]:
|
|
"""
|
|
Validate bidirectional relationship between person and unit.
|
|
|
|
Rule: If person.unit_affiliation = unit, then unit.staff_members must include person.
|
|
|
|
Args:
|
|
person: PersonObservation instance dict
|
|
organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts
|
|
|
|
Returns:
|
|
List of ValidationError instances (empty if valid)
|
|
"""
|
|
errors = []
|
|
|
|
person_id = get_field_value(person, 'id')
|
|
unit_affiliation_id = get_field_value(person, 'unit_affiliation')
|
|
|
|
# Skip if no unit affiliation
|
|
if not unit_affiliation_id:
|
|
return errors
|
|
|
|
# Get unit
|
|
unit = organizational_units.get(unit_affiliation_id)
|
|
if not unit:
|
|
return errors # Already caught by temporal validator
|
|
|
|
# Check inverse relationship
|
|
staff_members = get_field_value(unit, 'staff_members') or []
|
|
|
|
if person_id not in staff_members:
|
|
errors.append(ValidationError(
|
|
rule_id="STAFF_UNIT_BIDIRECTIONAL",
|
|
message=f"Person references unit_affiliation {unit_affiliation_id} but unit does not list person in staff_members",
|
|
entity_id=person_id,
|
|
entity_type="PersonObservation",
|
|
field="unit_affiliation",
|
|
severity="ERROR"
|
|
))
|
|
|
|
return errors
|
|
|
|
|
|
# ============================================================================
|
|
# Batch Validation
|
|
# ============================================================================
|
|
|
|
def validate_all(
|
|
collections: List[Dict[str, Any]],
|
|
persons: List[Dict[str, Any]],
|
|
organizational_units: Dict[str, Dict[str, Any]]
|
|
) -> Tuple[List[ValidationError], List[ValidationError]]:
|
|
"""
|
|
Validate all collections and persons against organizational units.
|
|
|
|
Args:
|
|
collections: List of CustodianCollection instance dicts
|
|
persons: List of PersonObservation instance dicts
|
|
organizational_units: Dict mapping unit IDs to OrganizationalStructure dicts
|
|
|
|
Returns:
|
|
Tuple of (errors, warnings)
|
|
"""
|
|
all_errors = []
|
|
all_warnings = []
|
|
|
|
# Validate collections
|
|
for collection in collections:
|
|
errors = validate_collection_unit_temporal(collection, organizational_units)
|
|
errors += validate_collection_unit_bidirectional(collection, organizational_units)
|
|
|
|
for error in errors:
|
|
if error.severity == "ERROR":
|
|
all_errors.append(error)
|
|
elif error.severity == "WARNING":
|
|
all_warnings.append(error)
|
|
|
|
# Validate persons
|
|
for person in persons:
|
|
errors = validate_staff_unit_temporal(person, organizational_units)
|
|
errors += validate_staff_unit_bidirectional(person, organizational_units)
|
|
|
|
for error in errors:
|
|
if error.severity == "ERROR":
|
|
all_errors.append(error)
|
|
elif error.severity == "WARNING":
|
|
all_warnings.append(error)
|
|
|
|
return all_errors, all_warnings
|
|
|
|
|
|
# ============================================================================
|
|
# CLI Interface (Optional)
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
import yaml
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python linkml_validators.py <yaml_file>")
|
|
sys.exit(1)
|
|
|
|
# Load YAML file
|
|
with open(sys.argv[1], 'r') as f:
|
|
data = list(yaml.safe_load_all(f))
|
|
|
|
# Separate by type
|
|
collections = [d for d in data if d.get('collection_name')]
|
|
persons = [d for d in data if d.get('staff_role')]
|
|
units = {d['id']: d for d in data if d.get('unit_name')}
|
|
|
|
# Validate
|
|
errors, warnings = validate_all(collections, persons, units)
|
|
|
|
# Print results
|
|
print(f"\nValidation Results:")
|
|
print(f" Errors: {len(errors)}")
|
|
print(f" Warnings: {len(warnings)}")
|
|
|
|
if errors:
|
|
print("\nErrors:")
|
|
for error in errors:
|
|
print(f" {error}")
|
|
|
|
if warnings:
|
|
print("\nWarnings:")
|
|
for warning in warnings:
|
|
print(f" {warning}")
|
|
|
|
sys.exit(0 if not errors else 1)
|