#!/usr/bin/env python3 """ Structuralize Class Descriptions Migrates unstructured content from class description fields to proper LinkML slots. For each class, this script: 1. Extracts sections like **Scope**:, **Notable Examples**:, etc. from descriptions 2. Creates slot_usage entries for the corresponding slots from description_sections.yaml 3. Removes the extracted sections from the description 4. Keeps only the core definition paragraph and **Wikidata**: reference Target slots (from description_sections.yaml): - scope_description: **Scope**: - notable_examples: **Notable Examples**: - historical_significance: **Historical Significance**: - typical_contents: **Typical Contents**: - related_types: **Related Types**: - research_value: **Research Value**: - dutch_context: **Dutch Context**: - key_distinction: **Key Distinction**:, **Key Distinctions from Other Types**: - administrative_context: **Administrative Context**: - temporal_dynamics: **Temporal Dynamics**: - use_cases: **Use Cases**:, **USE CASES**: - heritage_sector_usage: **Heritage Sector Usage**:, **Heritage use cases**: - characteristics: **Characteristics**:, **CHARACTERISTICS**: - purpose: **Purpose**:, **PURPOSE**: - class_definition: **Definition**:, **DEFINITION**: - privacy_note: **Privacy Considerations**: - preservation_note: **Preservation Considerations**: Usage: python scripts/structuralize_class_descriptions.py [--dry-run] [--verbose] [--file PATH] """ import argparse import re import sys from pathlib import Path from ruamel.yaml import YAML yaml = YAML() yaml.preserve_quotes = True yaml.width = 120 yaml.indent(mapping=2, sequence=2, offset=2) # Section patterns mapping to slot names # Format: (section_name, slot_name, regex_pattern, is_list) SECTION_MAPPINGS = [ ('scope', 'scope_description', r'\*\*Scope\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('definition', 'class_definition', r'\*\*(?:DEFINITION|Definition)\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('key_distinction', 'key_distinction', r'\*\*Key Distinction(?:s from Other Types)?\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('notable_examples', 'notable_examples', r'\*\*Notable Examples\*\*:\s*\n((?:- .*\n?)+)', True), ('related_types', 'related_types', r'\*\*(?:RELATED TYPES|Related Types)\*\*:\s*\n((?:- .*\n?)+)', True), ('typical_contents', 'typical_contents', r'\*\*Typical Contents\*\*:\s*\n((?:- .*\n?)+)', True), ('historical_significance', 'historical_significance', r'\*\*Historical Significance\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('research_value', 'research_value', r'\*\*Research Value\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('dutch_context', 'dutch_context', r'\*\*Dutch Context\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('administrative_context', 'administrative_context', r'\*\*Administrative Context\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('temporal_dynamics', 'temporal_dynamics', r'\*\*Temporal Dynamics\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('use_cases', 'use_cases', r'\*\*(?:USE CASES|Use Cases)\*\*:\s*\n((?:- .*\n?)+)', True), ('heritage_sector_usage', 'heritage_sector_usage', r'\*\*(?:Heritage Sector Usage|Heritage use cases)\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('characteristics', 'characteristics', r'\*\*(?:CHARACTERISTICS|Characteristics)\*\*:\s*\n((?:- .*\n?)+)', True), ('purpose', 'purpose', r'\*\*(?:PURPOSE|Purpose)\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('privacy_note', 'privacy_note', r'\*\*Privacy Considerations\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('preservation_note', 'preservation_note', r'\*\*Preservation(?:\s+Considerations)?\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ('geographic_restriction', 'geographic_restriction', r'\*\*Geographic Restriction\*\*:\s*\n(.*?)(?=\n\*\*[A-Z]|\Z)', False), ] # Sections to REMOVE entirely (already structured elsewhere or redundant) REMOVE_PATTERNS = [ (r'\*\*Dual-Class Pattern\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'dual_class_pattern'), (r'\*\*Ontological Alignment\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'ontological_alignment'), (r'\*\*Multilingual Labels\*\*:\s*\n(?:- [a-z]{2}: .*\n)+', 'multilingual_labels'), (r'\*\*SKOS\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'skos_alignment'), (r'\*\*Dublin Core\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'dublin_core'), (r'\*\*RDF Serialization(?: Example)?\*\*:\s*\n```.*?```', 'rdf_serialization'), (r'\*\*Example(?: JSON)? Structure\*\*:\s*\n```.*?```', 'example_structure'), (r'\*\*ONTOLOGY ALIGNMENT\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'ontology_alignment_upper'), (r'\*\*Primary GLAMORCUBESFIXPHDNT Category\*\*:\s*\n.*?(?=\n\*\*[A-Z]|\Z)', 'glamorcubes_category'), ] def parse_list_content(content: str) -> list[str]: """Parse bullet list content into a list of strings.""" items = [] for line in content.strip().split('\n'): line = line.strip() if line.startswith('- '): items.append(line[2:].strip()) elif line and items: # Continuation of previous item items[-1] += ' ' + line return items def parse_notable_examples(content: str) -> list[dict]: """Parse notable examples into structured format.""" examples = [] for line in content.strip().split('\n'): line = line.strip() if line.startswith('- '): example_text = line[2:].strip() example = {'example_name': example_text} # Try to extract location from parentheses location_match = re.search(r'\(([^)]+)\)$', example_text) if location_match: example['example_location'] = location_match.group(1) example['example_name'] = example_text[:location_match.start()].strip() examples.append(example) return examples def parse_related_types(content: str) -> list[dict]: """Parse related types into structured format.""" related = [] for line in content.strip().split('\n'): line = line.strip() if line.startswith('- '): type_text = line[2:].strip() rel = {'related_type_name': type_text} # Try to extract Wikidata ID wikidata_match = re.search(r'\(Q(\d+)\)', type_text) if wikidata_match: rel['related_type_wikidata'] = f"Q{wikidata_match.group(1)}" rel['related_type_name'] = type_text[:wikidata_match.start()].strip() # Try to extract note after dash note_match = re.search(r'\)\s*-\s*(.+)$', type_text) if note_match: rel['related_type_note'] = note_match.group(1).strip() elif ' - ' in type_text and not wikidata_match: parts = type_text.split(' - ', 1) rel['related_type_name'] = parts[0].strip() rel['related_type_note'] = parts[1].strip() related.append(rel) return related def extract_sections(description: str, verbose: bool = False) -> tuple[str, dict, list[str]]: """ Extract structured sections from a class description. Returns: tuple: (cleaned_description, extracted_data, removed_sections) """ if not description: return description, {}, [] cleaned = description extracted = {} removed_sections = [] # First, remove patterns that should be deleted entirely for pattern, section_name in REMOVE_PATTERNS: regex = re.compile(pattern, re.DOTALL | re.IGNORECASE) if regex.search(cleaned): cleaned = regex.sub('', cleaned) removed_sections.append(section_name) if verbose: print(f" Removed: {section_name}") # Extract sections to slots for section_name, slot_name, pattern, is_list in SECTION_MAPPINGS: regex = re.compile(pattern, re.DOTALL | re.IGNORECASE) match = regex.search(cleaned) if match: content = match.group(1).strip() if slot_name == 'notable_examples': extracted[slot_name] = parse_notable_examples(content) elif slot_name == 'related_types': extracted[slot_name] = parse_related_types(content) elif is_list: extracted[slot_name] = parse_list_content(content) else: # For non-list content, clean up and store as string extracted[slot_name] = content cleaned = regex.sub('', cleaned) removed_sections.append(section_name) if verbose: print(f" Extracted: {section_name} -> {slot_name}") # Clean up extra whitespace cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) cleaned = cleaned.strip() return cleaned, extracted, removed_sections def process_class(class_name: str, class_data: dict, verbose: bool = False) -> tuple[bool, list[str]]: """ Process a single class, extracting structured content from its description. Returns: tuple: (was_modified, list_of_extracted_sections) """ if not isinstance(class_data, dict): return False, [] if 'description' not in class_data or not isinstance(class_data['description'], str): return False, [] cleaned, extracted, removed_sections = extract_sections( class_data['description'], verbose ) if not removed_sections: return False, [] # Update description class_data['description'] = cleaned # Add extracted data to slot_usage or annotations if extracted: if 'slot_usage' not in class_data: class_data['slot_usage'] = {} elif class_data['slot_usage'] is None: class_data['slot_usage'] = {} import json for slot_name, value in extracted.items(): if isinstance(value, list) and value: if slot_name in ['notable_examples', 'related_types']: # Complex nested structures - store as JSON string to avoid YAML formatting issues class_data['slot_usage'][slot_name] = { 'range': 'NotableExample' if slot_name == 'notable_examples' else 'RelatedType', 'multivalued': True, 'inlined_as_list': True, 'annotations': { 'extracted_values': json.dumps(value, ensure_ascii=False) } } else: # Simple list of strings - store as JSON array string class_data['slot_usage'][slot_name] = { 'annotations': { 'default_values': json.dumps(value, ensure_ascii=False) } } elif isinstance(value, str) and value: class_data['slot_usage'][slot_name] = { 'annotations': { 'default_value': value } } return True, removed_sections def process_file(file_path: Path, dry_run: bool = False, verbose: bool = False) -> dict: """Process a single class YAML file.""" result = { 'file': str(file_path), 'modified': False, 'classes_processed': [], 'removed_sections': [], 'errors': [] } try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() data = yaml.load(content) if not data: return result modified = False # Process classes if 'classes' in data and isinstance(data['classes'], dict): for class_name, class_data in data['classes'].items(): was_modified, removed = process_class(class_name, class_data, verbose) if was_modified: result['classes_processed'].append(class_name) result['removed_sections'].extend(removed) modified = True result['modified'] = modified if modified and not dry_run: with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(data, f) except Exception as e: result['errors'].append(str(e)) import traceback if verbose: traceback.print_exc() return result def main(): parser = argparse.ArgumentParser(description='Structuralize class descriptions') parser.add_argument('--dry-run', action='store_true', help='Preview changes without modifying files') parser.add_argument('--verbose', action='store_true', help='Show detailed output') parser.add_argument('--file', type=str, help='Process a single file') args = parser.parse_args() classes_dir = Path('schemas/20251121/linkml/modules/classes') if args.file: files = [Path(args.file)] else: files = sorted(classes_dir.glob('*.yaml')) print(f"Processing {len(files)} class files...") if args.dry_run: print("DRY RUN - no files will be modified\n") stats = { 'files_processed': 0, 'files_modified': 0, 'classes_processed': 0, 'sections_removed': {}, 'errors': [] } for file_path in files: if args.verbose: print(f"\nProcessing: {file_path.name}") result = process_file(file_path, dry_run=args.dry_run, verbose=args.verbose) stats['files_processed'] += 1 if result['modified']: stats['files_modified'] += 1 if not args.verbose: print(f" Modified: {file_path.name} ({len(result['classes_processed'])} classes)") stats['classes_processed'] += len(result['classes_processed']) for section in result['removed_sections']: stats['sections_removed'][section] = stats['sections_removed'].get(section, 0) + 1 if result['errors']: stats['errors'].extend(result['errors']) print(f" ERROR in {file_path.name}: {result['errors']}") # Summary print(f"\n{'=' * 60}") print("SUMMARY") print(f"{'=' * 60}") print(f"Files processed: {stats['files_processed']}") print(f"Files modified: {stats['files_modified']}") print(f"Classes processed: {stats['classes_processed']}") print(f"\nSections removed/extracted by type:") for section, count in sorted(stats['sections_removed'].items(), key=lambda x: -x[1]): print(f" {section}: {count}") if stats['errors']: print(f"\nErrors: {len(stats['errors'])}") for error in stats['errors'][:10]: print(f" - {error}") if args.dry_run: print("\nDRY RUN complete. Run without --dry-run to apply changes.") if __name__ == '__main__': main()