#!/usr/bin/env python3 """ Migrate LinkML class annotations to proper slots. This script converts annotations like: - custodian_types, custodian_types_rationale - wikidata, wikidata_label - skos_broader, skos_broader_label - specificity_score, specificity_rationale, etc. - dual_class_pattern, linked_collection_type, etc. To proper slots that map to ontology predicates (SKOS, PROV-O, RiC-O, etc.) Usage: python scripts/migrate_annotations_to_slots.py [--dry-run] [--file ] Examples: # Dry run on all archive classes python scripts/migrate_annotations_to_slots.py --dry-run # Migrate a single file python scripts/migrate_annotations_to_slots.py --file schemas/20251121/linkml/modules/classes/MunicipalArchive.yaml # Migrate all archive classes python scripts/migrate_annotations_to_slots.py Author: OpenCode Claude Date: 2026-01-06 """ import argparse import json import re import sys from datetime import datetime from pathlib import Path from typing import Any import yaml # Preserve YAML formatting with custom representer def str_representer(dumper, data): if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) yaml.add_representer(str, str_representer) def parse_json_annotation(value: str) -> Any: """Parse JSON-encoded annotation value.""" if isinstance(value, str): try: return json.loads(value) except json.JSONDecodeError: return value return value def extract_annotations(class_def: dict) -> dict: """Extract annotations from a class definition.""" return class_def.get('annotations', {}) def build_wikidata_alignment(annotations: dict) -> dict | None: """Build WikidataAlignment structured object from annotations.""" entity_id = annotations.get('wikidata') if not entity_id: return None alignment = { 'entity_id': str(entity_id), 'entity_label': annotations.get('wikidata_label', ''), } mapping_type = annotations.get('wikidata_mapping_type', 'exact') alignment['mapping_type'] = mapping_type mapping_note = annotations.get('wikidata_mapping_note') if mapping_note: alignment['mapping_rationale'] = mapping_note return alignment def build_dual_class_link(annotations: dict) -> dict | None: """Build DualClassLink structured object from annotations.""" pattern = annotations.get('dual_class_pattern') if not pattern: return None link = { 'role': pattern, # custodian_type or collection_type } # Determine linked class based on role if pattern == 'custodian_type': linked = annotations.get('linked_collection_type') else: linked = annotations.get('linked_custodian_type') if linked: link['linked_class'] = linked note = annotations.get('dual_class_pattern_note') if note: link['rationale'] = note return link def build_specificity_annotation(annotations: dict) -> dict | None: """Build SpecificityAnnotation structured object from annotations.""" score = annotations.get('specificity_score') if score is None: return None annotation = { 'score': float(score), 'rationale': annotations.get('specificity_rationale', ''), } timestamp = annotations.get('specificity_annotation_timestamp') if timestamp: annotation['timestamp'] = timestamp agent = annotations.get('specificity_annotation_agent') if agent: annotation['agent'] = agent return annotation def build_template_specificity(annotations: dict) -> dict | None: """Build TemplateSpecificityScores from annotations.""" template_spec = annotations.get('template_specificity') if not template_spec: return None if isinstance(template_spec, str): try: template_spec = json.loads(template_spec) except json.JSONDecodeError: return None return template_spec def migrate_class_annotations(class_def: dict, class_name: str) -> dict: """ Migrate annotations to slots for a single class definition. Returns the modified class definition. """ annotations = extract_annotations(class_def) if not annotations: return class_def # Build slot usage section (handle None/empty slot_usage) slot_usage = class_def.get('slot_usage') or {} # Required slots to add slots_to_add = set() # 1. Custodian types custodian_types = annotations.get('custodian_types') if custodian_types: parsed = parse_json_annotation(custodian_types) slots_to_add.add('custodian_types') slot_usage['custodian_types'] = { 'equals_expression': json.dumps(parsed) if isinstance(parsed, list) else custodian_types } rationale = annotations.get('custodian_types_rationale') if rationale: slots_to_add.add('custodian_types_rationale') slot_usage['custodian_types_rationale'] = { 'equals_string': rationale } # 2. Wikidata alignment wikidata_alignment = build_wikidata_alignment(annotations) if wikidata_alignment: slots_to_add.add('wikidata_alignment') slot_usage['wikidata_alignment'] = { 'range': 'WikidataAlignment', 'inlined': True, 'description': f"Wikidata alignment: {wikidata_alignment.get('entity_id')} ({wikidata_alignment.get('entity_label', '')})" } # 3. SKOS broader skos_broader = annotations.get('skos_broader') if skos_broader: slots_to_add.add('skos_broader') # Ensure it's a list if isinstance(skos_broader, str) and not skos_broader.startswith('['): skos_broader = [skos_broader] slot_usage['skos_broader'] = { 'equals_expression': json.dumps(skos_broader) if isinstance(skos_broader, list) else f'["{skos_broader}"]' } skos_broader_label = annotations.get('skos_broader_label') if skos_broader_label: slots_to_add.add('skos_broader_label') slot_usage['skos_broader_label'] = { 'equals_string': skos_broader_label } # 4. Dual-class pattern dual_class_link = build_dual_class_link(annotations) if dual_class_link: slots_to_add.add('dual_class_link') slot_usage['dual_class_link'] = { 'range': 'DualClassLink', 'inlined': True } # 5. Specificity annotation specificity_annotation = build_specificity_annotation(annotations) if specificity_annotation: slots_to_add.add('specificity_annotation') slot_usage['specificity_annotation'] = { 'range': 'SpecificityAnnotation', 'inlined': True } # 6. Template specificity template_specificity = build_template_specificity(annotations) if template_specificity: slots_to_add.add('template_specificity') slot_usage['template_specificity'] = { 'range': 'TemplateSpecificityScores', 'inlined': True } # Update class definition new_class_def = class_def.copy() # Add slots if not already present existing_slots = set(new_class_def.get('slots', [])) new_slots = list(existing_slots | slots_to_add) if new_slots: new_class_def['slots'] = sorted(new_slots) # Update slot_usage if slot_usage: new_class_def['slot_usage'] = slot_usage # 7. RiC-O annotations rico_annotations = [ 'rico_record_set_type', 'rico_organizational_principle', 'rico_organizational_principle_uri', 'rico_has_or_had_holder', 'rico_has_or_had_holder_note', 'rico_note' ] for rico_key in rico_annotations: if rico_key in annotations: slots_to_add.add(rico_key) slot_usage[rico_key] = { 'equals_string': str(annotations[rico_key]) } # 8. Multilingual labels label_keys = ['label_de', 'label_es', 'label_fr', 'label_nl', 'label_it', 'label_pt'] for label_key in label_keys: if label_key in annotations: slots_to_add.add(label_key) slot_usage[label_key] = { 'equals_string': str(annotations[label_key]) } # 9. Scope and classification metadata scope_keys = ['scope_includes', 'scope_excludes', 'custodian_only', 'custodian_type', 'organizational_level', 'geographic_restriction'] for scope_key in scope_keys: if scope_key in annotations: slots_to_add.add(scope_key) value = annotations[scope_key] if isinstance(value, bool): slot_usage[scope_key] = {'equals_expression': str(value).lower()} else: slot_usage[scope_key] = {'equals_string': str(value)} # 10. Notes (including domain-specific *_note patterns) note_keys = ['privacy_note', 'preservation_note', 'legal_note'] for note_key in note_keys: if note_key in annotations: slots_to_add.add(note_key) slot_usage[note_key] = { 'equals_string': str(annotations[note_key]) } # 11. Generic *_note annotations - migrate to comments or keep as annotations # These are domain-specific notes that don't have dedicated slots yet generic_note_keys = [k for k in annotations.keys() if k.endswith('_note') and k not in note_keys and k not in ['rico_has_or_had_holder_note', 'rico_note']] for note_key in generic_note_keys: # Keep these as annotations for now - they're too varied to have dedicated slots pass # Remove migrated annotations (keep non-migrated ones) migrated_keys = { 'custodian_types', 'custodian_types_rationale', 'wikidata', 'wikidata_label', 'wikidata_mapping_type', 'wikidata_mapping_note', 'skos_broader', 'skos_broader_label', 'dual_class_pattern', 'dual_class_pattern_note', 'linked_collection_type', 'linked_custodian_type', 'specificity_score', 'specificity_rationale', 'specificity_annotation_timestamp', 'specificity_annotation_agent', 'template_specificity', # RiC-O annotations 'rico_record_set_type', 'rico_organizational_principle', 'rico_organizational_principle_uri', 'rico_has_or_had_holder', 'rico_has_or_had_holder_note', 'rico_note', # Multilingual labels 'label_de', 'label_es', 'label_fr', 'label_nl', 'label_it', 'label_pt', # Scope and classification 'scope_includes', 'scope_excludes', 'custodian_only', 'custodian_type', 'organizational_level', 'geographic_restriction', # Notes 'privacy_note', 'preservation_note', 'legal_note' } remaining_annotations = {k: v for k, v in annotations.items() if k not in migrated_keys} if remaining_annotations: new_class_def['annotations'] = remaining_annotations elif 'annotations' in new_class_def: del new_class_def['annotations'] return new_class_def def add_class_metadata_import(schema: dict) -> dict: """Ensure class_metadata_slots import is present.""" imports = schema.get('imports', []) metadata_import = '../slots/class_metadata_slots' if metadata_import not in imports: imports.append(metadata_import) schema['imports'] = imports return schema def migrate_file(file_path: Path, dry_run: bool = False) -> bool: """ Migrate a single LinkML schema file. Returns True if changes were made, False otherwise. """ print(f"\n{'[DRY RUN] ' if dry_run else ''}Processing: {file_path.name}") with open(file_path, 'r', encoding='utf-8') as f: content = f.read() try: schema = yaml.safe_load(content) except yaml.YAMLError as e: print(f" ERROR: Failed to parse YAML: {e}") return False if not schema: print(f" SKIP: Empty file") return False # Check if file has classes with annotations classes = schema.get('classes', {}) if not classes: print(f" SKIP: No classes found") return False changes_made = False # Process each class for class_name, class_def in classes.items(): if not isinstance(class_def, dict): continue annotations = class_def.get('annotations', {}) if not annotations: continue # Check for annotations we can migrate migratable = { 'custodian_types', 'wikidata', 'skos_broader', 'specificity_score', 'dual_class_pattern' } if not any(key in annotations for key in migratable): continue print(f" Migrating class: {class_name}") print(f" Found annotations: {list(annotations.keys())}") migrated_class = migrate_class_annotations(class_def, class_name) classes[class_name] = migrated_class changes_made = True # Report changes new_slots = migrated_class.get('slots', []) print(f" Added slots: {new_slots}") remaining = migrated_class.get('annotations', {}) if remaining: print(f" Remaining annotations: {list(remaining.keys())}") if not changes_made: print(f" SKIP: No migratable annotations found") return False # Add import for class_metadata_slots schema = add_class_metadata_import(schema) schema['classes'] = classes if dry_run: print(f" [DRY RUN] Would write changes to {file_path}") return True # Write updated schema with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(schema, f, default_flow_style=False, allow_unicode=True, sort_keys=False) print(f" DONE: Updated {file_path}") return True def find_archive_classes(base_path: Path) -> list[Path]: """Find all archive class files that need migration.""" classes_dir = base_path / 'schemas/20251121/linkml/modules/classes' # Find files matching *Archive*.yaml but exclude _refactored versions archive_files = [] for f in classes_dir.glob('*Archive*.yaml'): if '_refactored' not in f.name and 'RecordSetTypes' not in f.name: archive_files.append(f) return sorted(archive_files) def main(): parser = argparse.ArgumentParser( description='Migrate LinkML class annotations to proper slots' ) parser.add_argument( '--dry-run', '-n', action='store_true', help='Show what would be done without making changes' ) parser.add_argument( '--file', '-f', type=Path, help='Migrate a specific file instead of all archive classes' ) parser.add_argument( '--all-classes', action='store_true', help='Migrate all class files, not just archive classes' ) args = parser.parse_args() # Determine base path script_path = Path(__file__).resolve() base_path = script_path.parent.parent # Go up from scripts/ to project root if args.file: files = [args.file] elif args.all_classes: classes_dir = base_path / 'schemas/20251121/linkml/modules/classes' files = sorted(classes_dir.glob('*.yaml')) # Exclude _refactored files files = [f for f in files if '_refactored' not in f.name] else: files = find_archive_classes(base_path) print(f"Found {len(files)} files to process") print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}") print("=" * 60) migrated_count = 0 skipped_count = 0 error_count = 0 for file_path in files: try: if migrate_file(file_path, args.dry_run): migrated_count += 1 else: skipped_count += 1 except Exception as e: print(f" ERROR: {e}") error_count += 1 print("\n" + "=" * 60) print(f"Summary:") print(f" Migrated: {migrated_count}") print(f" Skipped: {skipped_count}") print(f" Errors: {error_count}") if args.dry_run: print("\nThis was a dry run. No files were modified.") print("Run without --dry-run to apply changes.") if __name__ == '__main__': main()