Summary: - Create 46 missing slot definition files with proper slot_uri values - Add slot imports to main schema (01_custodian_name_modular.yaml) - Fix YAML examples sections in 116+ class and slot files - Fix PersonObservation.yaml examples section (nested objects → string literals) Technical changes: - All slots now have explicit slot_uri mapping to base ontologies (RiC-O, Schema.org, SKOS) - Eliminates malformed URIs like 'custodian/:slot_name' in generated RDF - gen-owl now produces valid Turtle with 153,166 triples New slot files (46): - RiC-O slots: rico_note, rico_organizational_principle, rico_has_or_had_holder, etc. - Scope slots: scope_includes, scope_excludes, archive_scope - Organization slots: organization_type, governance_authority, area_served - Platform slots: platform_type_category, portal_type_category - Social media slots: social_media_platform_category, post_type_* - Type hierarchy slots: broader_type, narrower_types, custodian_type_broader - Wikidata slots: wikidata_equivalent, wikidata_mapping Generated output: - schemas/20251121/rdf/01_custodian_name_modular_20260107_134534_clean.owl.ttl (6.9MB) - Validated with rdflib: 153,166 triples, no malformed URIs
398 lines
16 KiB
Python
398 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Structuralize Slot Descriptions
|
|
|
|
Migrates unstructured content from slot description fields to proper LinkML elements:
|
|
- **Example**: → examples: list
|
|
- **INVERSE PROPERTY**: → comments: list item + inverse annotation
|
|
- **W3C ORG Alignment**:/etc → already in exact_mappings, remove from description
|
|
- **Navigation**: → comments: list item
|
|
- **Rationale**: → comments: list item
|
|
- YAML code blocks → examples: list
|
|
|
|
Usage:
|
|
python scripts/structuralize_slot_descriptions.py [--dry-run] [--verbose] [--file PATH]
|
|
"""
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from ruamel.yaml import YAML
|
|
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.width = 120
|
|
yaml.indent(mapping=2, sequence=2, offset=2)
|
|
|
|
|
|
# Section patterns to extract from descriptions
|
|
SECTION_PATTERNS = {
|
|
# Patterns that should become examples
|
|
'yaml_code_block': re.compile(r'\*\*Example\*\*:\s*\n```(?:yaml|turtle)?\n(.*?)```', re.DOTALL | re.IGNORECASE),
|
|
|
|
# Patterns that should become comments
|
|
'inverse_property': re.compile(r'\*\*INVERSE PROPERTY\*\*:\s*\n((?:- .*\n?)+)', re.IGNORECASE),
|
|
'navigation': re.compile(r'\*\*Navigation\*\*:\s*\n((?:- .*\n?)+)', re.IGNORECASE),
|
|
'rationale': re.compile(r'\*\*Rationale\*\*:\s*\n(.*?)(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'see_also': re.compile(r'\*\*See Also\*\*:\s*\n((?:- .*\n?)+)', re.IGNORECASE),
|
|
'see': re.compile(r'\*\*See\*\*:\s*\n((?:- .*\n?)+)', re.IGNORECASE),
|
|
|
|
# Patterns that should be REMOVED (already in structured elements)
|
|
'ontology_alignment_w3c': re.compile(r'\*\*W3C ORG(?: Alignment)?\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'ontology_alignment_cidoc': re.compile(r'\*\*CIDOC-CRM(?: Alignment)?\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'ontology_alignment_prov': re.compile(r'\*\*PROV-O(?: Alignment)?\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'ontology_alignment_schema': re.compile(r'\*\*Schema\.org(?: Alignment)?\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'ontology_alignment_rico': re.compile(r'\*\*RiC-O(?: Alignment)?\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'ontology_alignment_generic': re.compile(r'\*\*Ontology Alignment\*\*:\s*\n.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'three_layer_alignment': re.compile(r'\*\*Three-Layer Ontology Alignment\*\*:.*?(?=\n\*\*[A-Z]|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'why_primary': re.compile(r'\*\*Why .*? is Primary\*\*:.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'why_secondary': re.compile(r'\*\*Why .*? is Secondary\*\*:.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'why_tertiary': re.compile(r'\*\*Why .*? is Tertiary\*\*:.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'property_details': re.compile(r'Property: `[a-z]+:[A-Za-z0-9_]+`\s*\n- Domain:.*?(?=\n\*\*|\n\n[A-Z]|\Z)', re.DOTALL),
|
|
'rdf_serialization': re.compile(r'\*\*RDF Serialization(?: Example)?\*\*:\s*\n```.*?```', re.DOTALL | re.IGNORECASE),
|
|
'glamorcubesfixphdnt': re.compile(r'\*\*GLAMORCUBESFIXPHDNT Taxonomy.*?\*\*:\s*\n(?:- \*\*[A-Z]\*\* - .*\n)+', re.IGNORECASE),
|
|
'relationship_to': re.compile(r'\*\*Relationship to .*?\*\*:\s*\n.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'data_population': re.compile(r'\*\*Data Population Strategy\*\*:\s*\n.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'special_case': re.compile(r'\*\*Special Case.*?\*\*:\s*\n.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'example_dutch': re.compile(r'\*\*Example - Dutch.*?\*\*:\s*\n```.*?```', re.DOTALL | re.IGNORECASE),
|
|
'ghcid_code': re.compile(r'\*\*GHCID Code Derivation\*\*:\s*\n.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
'migration_note': re.compile(r'\*\*Migration Note\*\*.*?:\s*\n.*?(?=\n\*\*|\Z)', re.DOTALL | re.IGNORECASE),
|
|
}
|
|
|
|
# Patterns to remove entirely (redundant with structured elements)
|
|
REMOVE_PATTERNS = [
|
|
'ontology_alignment_w3c',
|
|
'ontology_alignment_cidoc',
|
|
'ontology_alignment_prov',
|
|
'ontology_alignment_schema',
|
|
'ontology_alignment_rico',
|
|
'ontology_alignment_generic',
|
|
'three_layer_alignment',
|
|
'why_primary',
|
|
'why_secondary',
|
|
'why_tertiary',
|
|
'property_details',
|
|
'rdf_serialization',
|
|
'glamorcubesfixphdnt',
|
|
'relationship_to',
|
|
'data_population',
|
|
'special_case',
|
|
'example_dutch',
|
|
'ghcid_code',
|
|
'migration_note',
|
|
]
|
|
|
|
# Patterns to extract to comments
|
|
COMMENT_PATTERNS = [
|
|
'inverse_property',
|
|
'navigation',
|
|
'rationale',
|
|
'see_also',
|
|
'see',
|
|
]
|
|
|
|
# Patterns to extract to examples
|
|
EXAMPLE_PATTERNS = [
|
|
'yaml_code_block',
|
|
]
|
|
|
|
|
|
def extract_inverse_info(match_text: str) -> tuple[str | None, str]:
|
|
"""Extract inverse slot name and comment from inverse property section."""
|
|
inverse_name = None
|
|
comment_lines = []
|
|
|
|
for line in match_text.strip().split('\n'):
|
|
line = line.strip().lstrip('- ')
|
|
if line.startswith('**Inverse**:'):
|
|
# Extract inverse slot name: `slot_name` (predicate)
|
|
inverse_match = re.search(r'`([a-z_]+)`', line)
|
|
if inverse_match:
|
|
inverse_name = inverse_match.group(1)
|
|
comment_lines.append(f"Inverse: {line.split(':', 1)[1].strip()}")
|
|
elif line.startswith('Pattern:'):
|
|
comment_lines.append(line)
|
|
|
|
return inverse_name, ' | '.join(comment_lines) if comment_lines else match_text.strip()
|
|
|
|
|
|
def extract_yaml_example(match_text: str) -> dict:
|
|
"""Extract YAML code block as an example."""
|
|
# Clean up the YAML
|
|
cleaned = match_text.strip()
|
|
# Try to extract a meaningful description from context
|
|
return {
|
|
'value': cleaned[:200] + '...' if len(cleaned) > 200 else cleaned,
|
|
'description': 'Usage example'
|
|
}
|
|
|
|
|
|
def process_slot_description(description: str, slot_data: dict, verbose: bool = False) -> tuple[str, dict, list[str]]:
|
|
"""
|
|
Process a slot description, extracting structured content.
|
|
|
|
Returns:
|
|
tuple: (cleaned_description, updates_dict, removed_sections_list)
|
|
"""
|
|
if not description:
|
|
return description, {}, []
|
|
|
|
cleaned = description
|
|
updates = {}
|
|
removed_sections = []
|
|
|
|
# First, remove patterns that are redundant with existing structured elements
|
|
for pattern_name in REMOVE_PATTERNS:
|
|
pattern = SECTION_PATTERNS.get(pattern_name)
|
|
if pattern:
|
|
matches = pattern.findall(cleaned)
|
|
if matches:
|
|
removed_sections.append(pattern_name)
|
|
cleaned = pattern.sub('', cleaned)
|
|
if verbose:
|
|
print(f" Removed: {pattern_name} ({len(matches)} match(es))")
|
|
|
|
# Extract inverse property info to comments
|
|
inverse_match = SECTION_PATTERNS['inverse_property'].search(cleaned)
|
|
if inverse_match:
|
|
inverse_name, comment = extract_inverse_info(inverse_match.group(1))
|
|
|
|
# Add to comments
|
|
if 'comments' not in updates:
|
|
updates['comments'] = []
|
|
updates['comments'].append(comment)
|
|
|
|
# Add inverse annotation if found
|
|
if inverse_name:
|
|
updates['annotations'] = updates.get('annotations', {})
|
|
updates['annotations']['inverse_slot'] = inverse_name
|
|
|
|
cleaned = SECTION_PATTERNS['inverse_property'].sub('', cleaned)
|
|
removed_sections.append('inverse_property')
|
|
if verbose:
|
|
print(f" Extracted inverse property: {inverse_name}")
|
|
|
|
# Extract navigation to comments
|
|
nav_match = SECTION_PATTERNS['navigation'].search(cleaned)
|
|
if nav_match:
|
|
nav_text = nav_match.group(1).strip()
|
|
nav_lines = [line.strip().lstrip('- ') for line in nav_text.split('\n') if line.strip()]
|
|
if 'comments' not in updates:
|
|
updates['comments'] = []
|
|
updates['comments'].append(f"Navigation: {' | '.join(nav_lines)}")
|
|
cleaned = SECTION_PATTERNS['navigation'].sub('', cleaned)
|
|
removed_sections.append('navigation')
|
|
if verbose:
|
|
print(f" Extracted navigation")
|
|
|
|
# Extract rationale to comments
|
|
rationale_match = SECTION_PATTERNS['rationale'].search(cleaned)
|
|
if rationale_match:
|
|
rationale_text = rationale_match.group(1).strip()
|
|
if rationale_text:
|
|
if 'comments' not in updates:
|
|
updates['comments'] = []
|
|
updates['comments'].append(f"Rationale: {rationale_text[:200]}")
|
|
cleaned = SECTION_PATTERNS['rationale'].sub('', cleaned)
|
|
removed_sections.append('rationale')
|
|
if verbose:
|
|
print(f" Extracted rationale")
|
|
|
|
# Extract see also to comments
|
|
for see_pattern in ['see_also', 'see']:
|
|
see_match = SECTION_PATTERNS[see_pattern].search(cleaned)
|
|
if see_match:
|
|
see_text = see_match.group(1).strip()
|
|
see_lines = [line.strip().lstrip('- ') for line in see_text.split('\n') if line.strip()]
|
|
if 'comments' not in updates:
|
|
updates['comments'] = []
|
|
updates['comments'].append(f"See: {' | '.join(see_lines)}")
|
|
cleaned = SECTION_PATTERNS[see_pattern].sub('', cleaned)
|
|
removed_sections.append(see_pattern)
|
|
if verbose:
|
|
print(f" Extracted {see_pattern}")
|
|
|
|
# Extract YAML examples - but only if there are no existing examples
|
|
if 'examples' not in slot_data or not slot_data['examples']:
|
|
yaml_match = SECTION_PATTERNS['yaml_code_block'].search(cleaned)
|
|
if yaml_match:
|
|
example = extract_yaml_example(yaml_match.group(1))
|
|
updates['examples'] = [example]
|
|
cleaned = SECTION_PATTERNS['yaml_code_block'].sub('', cleaned)
|
|
removed_sections.append('yaml_code_block')
|
|
if verbose:
|
|
print(f" Extracted YAML example")
|
|
else:
|
|
# Remove YAML blocks anyway since we have examples
|
|
cleaned = SECTION_PATTERNS['yaml_code_block'].sub('', cleaned)
|
|
|
|
# Clean up extra whitespace
|
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
|
cleaned = cleaned.strip()
|
|
|
|
return cleaned, updates, removed_sections
|
|
|
|
|
|
def process_file(file_path: Path, dry_run: bool = False, verbose: bool = False) -> dict:
|
|
"""Process a single slot YAML file."""
|
|
result = {
|
|
'file': str(file_path),
|
|
'modified': False,
|
|
'file_description_cleaned': False,
|
|
'slots_processed': [],
|
|
'removed_sections': [],
|
|
'errors': []
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
data = yaml.load(content)
|
|
if not data:
|
|
return result
|
|
|
|
modified = False
|
|
|
|
# Process file-level description (if exists)
|
|
if 'description' in data and isinstance(data['description'], str):
|
|
orig_len = len(data['description'])
|
|
cleaned, updates, removed = process_slot_description(
|
|
data['description'], {}, verbose
|
|
)
|
|
|
|
if removed:
|
|
# For file-level description, just clean it (don't add updates)
|
|
if len(cleaned) < orig_len * 0.5: # If more than 50% was removed
|
|
# Keep only first paragraph
|
|
first_para = cleaned.split('\n\n')[0] if '\n\n' in cleaned else cleaned
|
|
data['description'] = first_para.strip()
|
|
else:
|
|
data['description'] = cleaned
|
|
result['file_description_cleaned'] = True
|
|
result['removed_sections'].extend(removed)
|
|
modified = True
|
|
if verbose:
|
|
print(f" Cleaned file-level description")
|
|
|
|
# Process slots
|
|
if 'slots' in data and isinstance(data['slots'], dict):
|
|
for slot_name, slot_data in data['slots'].items():
|
|
if not isinstance(slot_data, dict):
|
|
continue
|
|
|
|
if 'description' in slot_data and isinstance(slot_data['description'], str):
|
|
cleaned, updates, removed = process_slot_description(
|
|
slot_data['description'], slot_data, verbose
|
|
)
|
|
|
|
if removed or updates:
|
|
slot_data['description'] = cleaned
|
|
|
|
# Merge updates
|
|
for key, value in updates.items():
|
|
if key == 'comments':
|
|
existing = slot_data.get('comments', [])
|
|
if not isinstance(existing, list):
|
|
existing = [existing] if existing else []
|
|
# Add new comments, avoiding duplicates
|
|
for comment in value:
|
|
if comment not in existing:
|
|
existing.append(comment)
|
|
slot_data['comments'] = existing
|
|
elif key == 'examples':
|
|
if 'examples' not in slot_data:
|
|
slot_data['examples'] = value
|
|
elif key == 'annotations':
|
|
existing = slot_data.get('annotations', {})
|
|
existing.update(value)
|
|
slot_data['annotations'] = existing
|
|
else:
|
|
slot_data[key] = value
|
|
|
|
result['slots_processed'].append(slot_name)
|
|
result['removed_sections'].extend(removed)
|
|
modified = True
|
|
|
|
result['modified'] = modified
|
|
|
|
if modified and not dry_run:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f)
|
|
|
|
except Exception as e:
|
|
result['errors'].append(str(e))
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Structuralize slot descriptions')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview changes without modifying files')
|
|
parser.add_argument('--verbose', action='store_true', help='Show detailed output')
|
|
parser.add_argument('--file', type=str, help='Process a single file')
|
|
args = parser.parse_args()
|
|
|
|
slots_dir = Path('schemas/20251121/linkml/modules/slots')
|
|
|
|
if args.file:
|
|
files = [Path(args.file)]
|
|
else:
|
|
files = sorted(slots_dir.glob('*.yaml'))
|
|
|
|
print(f"Processing {len(files)} slot files...")
|
|
if args.dry_run:
|
|
print("DRY RUN - no files will be modified\n")
|
|
|
|
stats = {
|
|
'files_processed': 0,
|
|
'files_modified': 0,
|
|
'slots_processed': 0,
|
|
'sections_removed': {},
|
|
'errors': []
|
|
}
|
|
|
|
for file_path in files:
|
|
if args.verbose:
|
|
print(f"\nProcessing: {file_path.name}")
|
|
|
|
result = process_file(file_path, dry_run=args.dry_run, verbose=args.verbose)
|
|
|
|
stats['files_processed'] += 1
|
|
if result['modified']:
|
|
stats['files_modified'] += 1
|
|
if not args.verbose:
|
|
print(f" Modified: {file_path.name} ({len(result['slots_processed'])} slots)")
|
|
|
|
stats['slots_processed'] += len(result['slots_processed'])
|
|
|
|
for section in result['removed_sections']:
|
|
stats['sections_removed'][section] = stats['sections_removed'].get(section, 0) + 1
|
|
|
|
if result['errors']:
|
|
stats['errors'].extend(result['errors'])
|
|
print(f" ERROR in {file_path.name}: {result['errors']}")
|
|
|
|
# Summary
|
|
print(f"\n{'=' * 60}")
|
|
print("SUMMARY")
|
|
print(f"{'=' * 60}")
|
|
print(f"Files processed: {stats['files_processed']}")
|
|
print(f"Files modified: {stats['files_modified']}")
|
|
print(f"Slots processed: {stats['slots_processed']}")
|
|
print(f"\nSections removed by type:")
|
|
for section, count in sorted(stats['sections_removed'].items(), key=lambda x: -x[1]):
|
|
print(f" {section}: {count}")
|
|
|
|
if stats['errors']:
|
|
print(f"\nErrors: {len(stats['errors'])}")
|
|
for error in stats['errors'][:10]:
|
|
print(f" - {error}")
|
|
|
|
if args.dry_run:
|
|
print("\nDRY RUN complete. Run without --dry-run to apply changes.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|