glam/scripts/migrate_annotations_to_slots.py
kempersc aa763dab25 Migrate 94 archive class annotations to ontology-aligned slots
- Add migration script: scripts/migrate_annotations_to_slots.py
- Convert custodian_types, wikidata, skos_broader, specificity_* annotations
- Replace with proper slots mapped to SKOS, PROV-O, RiC-O predicates
- Add ../slots/class_metadata_slots import to all migrated files
- Remove AcademicArchive_refactored.yaml (main file now migrated)
- Sync changes to frontend/public/schemas/

Migration converts:
  - custodian_types → hc:custodianTypes slot
  - wikidata/wikidata_label → wikidata_alignment structured slot
  - skos_broader → skos:broader slot
  - specificity_* → specificity_annotation structured slot
  - dual_class_pattern → dual_class_link structured slot
  - template_specificity → template_specificity slot

All 94 migrated schemas pass linkml-validate.
2026-01-06 11:25:37 +01:00

445 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Migrate LinkML class annotations to proper slots.
This script converts annotations like:
- custodian_types, custodian_types_rationale
- wikidata, wikidata_label
- skos_broader, skos_broader_label
- specificity_score, specificity_rationale, etc.
- dual_class_pattern, linked_collection_type, etc.
To proper slots that map to ontology predicates (SKOS, PROV-O, RiC-O, etc.)
Usage:
python scripts/migrate_annotations_to_slots.py [--dry-run] [--file <path>]
Examples:
# Dry run on all archive classes
python scripts/migrate_annotations_to_slots.py --dry-run
# Migrate a single file
python scripts/migrate_annotations_to_slots.py --file schemas/20251121/linkml/modules/classes/MunicipalArchive.yaml
# Migrate all archive classes
python scripts/migrate_annotations_to_slots.py
Author: OpenCode Claude
Date: 2026-01-06
"""
import argparse
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import yaml
# Preserve YAML formatting with custom representer
def str_representer(dumper, data):
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_representer)
def parse_json_annotation(value: str) -> Any:
"""Parse JSON-encoded annotation value."""
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return value
return value
def extract_annotations(class_def: dict) -> dict:
"""Extract annotations from a class definition."""
return class_def.get('annotations', {})
def build_wikidata_alignment(annotations: dict) -> dict | None:
"""Build WikidataAlignment structured object from annotations."""
entity_id = annotations.get('wikidata')
if not entity_id:
return None
alignment = {
'entity_id': str(entity_id),
'entity_label': annotations.get('wikidata_label', ''),
}
mapping_type = annotations.get('wikidata_mapping_type', 'exact')
alignment['mapping_type'] = mapping_type
mapping_note = annotations.get('wikidata_mapping_note')
if mapping_note:
alignment['mapping_rationale'] = mapping_note
return alignment
def build_dual_class_link(annotations: dict) -> dict | None:
"""Build DualClassLink structured object from annotations."""
pattern = annotations.get('dual_class_pattern')
if not pattern:
return None
link = {
'role': pattern, # custodian_type or collection_type
}
# Determine linked class based on role
if pattern == 'custodian_type':
linked = annotations.get('linked_collection_type')
else:
linked = annotations.get('linked_custodian_type')
if linked:
link['linked_class'] = linked
note = annotations.get('dual_class_pattern_note')
if note:
link['rationale'] = note
return link
def build_specificity_annotation(annotations: dict) -> dict | None:
"""Build SpecificityAnnotation structured object from annotations."""
score = annotations.get('specificity_score')
if score is None:
return None
annotation = {
'score': float(score),
'rationale': annotations.get('specificity_rationale', ''),
}
timestamp = annotations.get('specificity_annotation_timestamp')
if timestamp:
annotation['timestamp'] = timestamp
agent = annotations.get('specificity_annotation_agent')
if agent:
annotation['agent'] = agent
return annotation
def build_template_specificity(annotations: dict) -> dict | None:
"""Build TemplateSpecificityScores from annotations."""
template_spec = annotations.get('template_specificity')
if not template_spec:
return None
if isinstance(template_spec, str):
try:
template_spec = json.loads(template_spec)
except json.JSONDecodeError:
return None
return template_spec
def migrate_class_annotations(class_def: dict, class_name: str) -> dict:
"""
Migrate annotations to slots for a single class definition.
Returns the modified class definition.
"""
annotations = extract_annotations(class_def)
if not annotations:
return class_def
# Build slot usage section (handle None/empty slot_usage)
slot_usage = class_def.get('slot_usage') or {}
# Required slots to add
slots_to_add = set()
# 1. Custodian types
custodian_types = annotations.get('custodian_types')
if custodian_types:
parsed = parse_json_annotation(custodian_types)
slots_to_add.add('custodian_types')
slot_usage['custodian_types'] = {
'equals_expression': json.dumps(parsed) if isinstance(parsed, list) else custodian_types
}
rationale = annotations.get('custodian_types_rationale')
if rationale:
slots_to_add.add('custodian_types_rationale')
slot_usage['custodian_types_rationale'] = {
'equals_string': rationale
}
# 2. Wikidata alignment
wikidata_alignment = build_wikidata_alignment(annotations)
if wikidata_alignment:
slots_to_add.add('wikidata_alignment')
slot_usage['wikidata_alignment'] = {
'range': 'WikidataAlignment',
'inlined': True,
'description': f"Wikidata alignment: {wikidata_alignment.get('entity_id')} ({wikidata_alignment.get('entity_label', '')})"
}
# 3. SKOS broader
skos_broader = annotations.get('skos_broader')
if skos_broader:
slots_to_add.add('skos_broader')
# Ensure it's a list
if isinstance(skos_broader, str) and not skos_broader.startswith('['):
skos_broader = [skos_broader]
slot_usage['skos_broader'] = {
'equals_expression': json.dumps(skos_broader) if isinstance(skos_broader, list) else f'["{skos_broader}"]'
}
skos_broader_label = annotations.get('skos_broader_label')
if skos_broader_label:
slots_to_add.add('skos_broader_label')
slot_usage['skos_broader_label'] = {
'equals_string': skos_broader_label
}
# 4. Dual-class pattern
dual_class_link = build_dual_class_link(annotations)
if dual_class_link:
slots_to_add.add('dual_class_link')
slot_usage['dual_class_link'] = {
'range': 'DualClassLink',
'inlined': True
}
# 5. Specificity annotation
specificity_annotation = build_specificity_annotation(annotations)
if specificity_annotation:
slots_to_add.add('specificity_annotation')
slot_usage['specificity_annotation'] = {
'range': 'SpecificityAnnotation',
'inlined': True
}
# 6. Template specificity
template_specificity = build_template_specificity(annotations)
if template_specificity:
slots_to_add.add('template_specificity')
slot_usage['template_specificity'] = {
'range': 'TemplateSpecificityScores',
'inlined': True
}
# Update class definition
new_class_def = class_def.copy()
# Add slots if not already present
existing_slots = set(new_class_def.get('slots', []))
new_slots = list(existing_slots | slots_to_add)
if new_slots:
new_class_def['slots'] = sorted(new_slots)
# Update slot_usage
if slot_usage:
new_class_def['slot_usage'] = slot_usage
# Remove migrated annotations (keep non-migrated ones)
migrated_keys = {
'custodian_types', 'custodian_types_rationale',
'wikidata', 'wikidata_label', 'wikidata_mapping_type', 'wikidata_mapping_note',
'skos_broader', 'skos_broader_label',
'dual_class_pattern', 'dual_class_pattern_note',
'linked_collection_type', 'linked_custodian_type',
'specificity_score', 'specificity_rationale',
'specificity_annotation_timestamp', 'specificity_annotation_agent',
'template_specificity'
}
remaining_annotations = {k: v for k, v in annotations.items() if k not in migrated_keys}
if remaining_annotations:
new_class_def['annotations'] = remaining_annotations
elif 'annotations' in new_class_def:
del new_class_def['annotations']
return new_class_def
def add_class_metadata_import(schema: dict) -> dict:
"""Ensure class_metadata_slots import is present."""
imports = schema.get('imports', [])
metadata_import = '../slots/class_metadata_slots'
if metadata_import not in imports:
imports.append(metadata_import)
schema['imports'] = imports
return schema
def migrate_file(file_path: Path, dry_run: bool = False) -> bool:
"""
Migrate a single LinkML schema file.
Returns True if changes were made, False otherwise.
"""
print(f"\n{'[DRY RUN] ' if dry_run else ''}Processing: {file_path.name}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
try:
schema = yaml.safe_load(content)
except yaml.YAMLError as e:
print(f" ERROR: Failed to parse YAML: {e}")
return False
if not schema:
print(f" SKIP: Empty file")
return False
# Check if file has classes with annotations
classes = schema.get('classes', {})
if not classes:
print(f" SKIP: No classes found")
return False
changes_made = False
# Process each class
for class_name, class_def in classes.items():
if not isinstance(class_def, dict):
continue
annotations = class_def.get('annotations', {})
if not annotations:
continue
# Check for annotations we can migrate
migratable = {
'custodian_types', 'wikidata', 'skos_broader',
'specificity_score', 'dual_class_pattern'
}
if not any(key in annotations for key in migratable):
continue
print(f" Migrating class: {class_name}")
print(f" Found annotations: {list(annotations.keys())}")
migrated_class = migrate_class_annotations(class_def, class_name)
classes[class_name] = migrated_class
changes_made = True
# Report changes
new_slots = migrated_class.get('slots', [])
print(f" Added slots: {new_slots}")
remaining = migrated_class.get('annotations', {})
if remaining:
print(f" Remaining annotations: {list(remaining.keys())}")
if not changes_made:
print(f" SKIP: No migratable annotations found")
return False
# Add import for class_metadata_slots
schema = add_class_metadata_import(schema)
schema['classes'] = classes
if dry_run:
print(f" [DRY RUN] Would write changes to {file_path}")
return True
# Write updated schema
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(schema, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
print(f" DONE: Updated {file_path}")
return True
def find_archive_classes(base_path: Path) -> list[Path]:
"""Find all archive class files that need migration."""
classes_dir = base_path / 'schemas/20251121/linkml/modules/classes'
# Find files matching *Archive*.yaml but exclude _refactored versions
archive_files = []
for f in classes_dir.glob('*Archive*.yaml'):
if '_refactored' not in f.name and 'RecordSetTypes' not in f.name:
archive_files.append(f)
return sorted(archive_files)
def main():
parser = argparse.ArgumentParser(
description='Migrate LinkML class annotations to proper slots'
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='Show what would be done without making changes'
)
parser.add_argument(
'--file', '-f',
type=Path,
help='Migrate a specific file instead of all archive classes'
)
parser.add_argument(
'--all-classes',
action='store_true',
help='Migrate all class files, not just archive classes'
)
args = parser.parse_args()
# Determine base path
script_path = Path(__file__).resolve()
base_path = script_path.parent.parent # Go up from scripts/ to project root
if args.file:
files = [args.file]
elif args.all_classes:
classes_dir = base_path / 'schemas/20251121/linkml/modules/classes'
files = sorted(classes_dir.glob('*.yaml'))
# Exclude _refactored files
files = [f for f in files if '_refactored' not in f.name]
else:
files = find_archive_classes(base_path)
print(f"Found {len(files)} files to process")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print("=" * 60)
migrated_count = 0
skipped_count = 0
error_count = 0
for file_path in files:
try:
if migrate_file(file_path, args.dry_run):
migrated_count += 1
else:
skipped_count += 1
except Exception as e:
print(f" ERROR: {e}")
error_count += 1
print("\n" + "=" * 60)
print(f"Summary:")
print(f" Migrated: {migrated_count}")
print(f" Skipped: {skipped_count}")
print(f" Errors: {error_count}")
if args.dry_run:
print("\nThis was a dry run. No files were modified.")
print("Run without --dry-run to apply changes.")
if __name__ == '__main__':
main()