#!/usr/bin/env python3 """ Add UUID identifiers to Latin American institutions YAML file. This script adds three UUID formats to each institution record: 1. record_id: UUID v7 (time-ordered, random) - for database PKs 2. ghcid_uuid: UUID v5 (deterministic, SHA-1) - for interoperability 3. ghcid_uuid_sha256: UUID v8 (deterministic, SHA-256) - for SOTA security Usage: python scripts/add_uuids_to_latin_american_institutions.py # With custom input/output paths: python scripts/add_uuids_to_latin_american_institutions.py \ --input data/instances/custom.yaml \ --output data/instances/custom_with_uuids.yaml """ import sys import argparse from pathlib import Path from datetime import datetime, timezone from typing import List, Dict, Any import yaml import shutil # Add src to Python path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from glam_extractor.identifiers.ghcid import GHCIDComponents def parse_ghcid_string(ghcid_str: str) -> GHCIDComponents: """ Parse GHCID string into components. Format: CC-RR-CCC-T-III - CC: Country code (2 chars) - RR: Region code (2-3 chars) - CCC: City code (3 chars, may be XXX for unknown) - T: Institution type (1 char) - III: Institution abbreviation (2-4 chars) """ parts = ghcid_str.split("-") if len(parts) < 5: raise ValueError(f"Invalid GHCID format: {ghcid_str} (expected at least 5 parts)") country_code = parts[0] region_code = parts[1] city_code = parts[2] type_code = parts[3] abbreviation = parts[4] # Handle optional Wikidata QID (6th part) wikidata_qid = parts[5] if len(parts) > 5 else None return GHCIDComponents( country_code=country_code, region_code=region_code, city_locode=city_code, institution_type=type_code, abbreviation=abbreviation, wikidata_qid=wikidata_qid ) def add_uuids_to_institution(institution: Dict[str, Any]) -> Dict[str, Any]: """ Add UUID fields to an institution record. Args: institution: Institution dictionary from YAML Returns: Updated institution dictionary with UUID fields """ # Skip if no GHCID ghcid_str = institution.get("ghcid") if not ghcid_str: print(f" Skipping {institution.get('name', 'Unknown')}: No GHCID") return institution try: # Parse GHCID string components = parse_ghcid_string(ghcid_str) # Generate UUIDs record_id = str(GHCIDComponents.generate_uuid_v7()) ghcid_uuid = str(components.to_uuid()) ghcid_uuid_sha256 = str(components.to_uuid_sha256()) # Add UUID fields institution["record_id"] = record_id institution["ghcid_uuid"] = ghcid_uuid institution["ghcid_uuid_sha256"] = ghcid_uuid_sha256 # Also add to identifiers list identifiers = institution.get("identifiers", []) # Add GHCID_UUID identifier if not exists if not any(id.get("identifier_scheme") == "GHCID_UUID" for id in identifiers): identifiers.append({ "identifier_scheme": "GHCID_UUID", "identifier_value": ghcid_uuid, "identifier_url": f"urn:uuid:{ghcid_uuid}" }) # Add GHCID_UUID_SHA256 identifier if not exists if not any(id.get("identifier_scheme") == "GHCID_UUID_SHA256" for id in identifiers): identifiers.append({ "identifier_scheme": "GHCID_UUID_SHA256", "identifier_value": ghcid_uuid_sha256, "identifier_url": f"urn:uuid:{ghcid_uuid_sha256}" }) # Add RECORD_ID identifier if not exists if not any(id.get("identifier_scheme") == "RECORD_ID" for id in identifiers): identifiers.append({ "identifier_scheme": "RECORD_ID", "identifier_value": record_id, "identifier_url": f"urn:uuid:{record_id}" }) institution["identifiers"] = identifiers print(f" āœ“ {institution.get('name', 'Unknown')}: Added UUIDs") return institution except Exception as e: print(f" āœ— {institution.get('name', 'Unknown')}: Error - {e}") return institution def update_yaml_file(input_path: Path, output_path: Path, backup: bool = True) -> None: """ Update YAML file with UUID identifiers. Args: input_path: Path to input YAML file output_path: Path to output YAML file backup: Whether to create a backup of the input file """ print(f"\nReading institutions from: {input_path}") # Read input YAML with open(input_path, 'r', encoding='utf-8') as f: institutions = yaml.safe_load(f) if not isinstance(institutions, list): raise ValueError("YAML file must contain a list of institutions") print(f"Found {len(institutions)} institutions") # Create backup if requested if backup and input_path == output_path: backup_path = input_path.with_suffix(f".backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml") print(f"\nCreating backup: {backup_path}") shutil.copy2(input_path, backup_path) # Process each institution print("\nProcessing institutions:") updated_institutions = [] for institution in institutions: updated = add_uuids_to_institution(institution) updated_institutions.append(updated) # Update header comment header = f"""--- # Latin American GLAM Institutions - GHCID + UUID Enhanced # Last updated: {datetime.now(timezone.utc).isoformat()} # UUID generation: {len([i for i in updated_institutions if 'record_id' in i])}/{len(updated_institutions)} institutions # # UUID Statistics: # - Total institutions: {len(updated_institutions)} # - UUIDs generated: {len([i for i in updated_institutions if 'record_id' in i])} # - UUID v7 (record_id): Time-ordered database PKs # - UUID v5 (ghcid_uuid): SHA-1 interoperability PIDs # - UUID v8 (ghcid_uuid_sha256): SHA-256 SOTA PIDs """ # Write output YAML print(f"\nWriting updated institutions to: {output_path}") with open(output_path, 'w', encoding='utf-8') as f: f.write(header) yaml.dump( updated_institutions, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120 ) print(f"\nāœ“ Successfully updated {len(updated_institutions)} institutions") print(f"āœ“ Added UUIDs to {len([i for i in updated_institutions if 'record_id' in i])} institutions") def main(): parser = argparse.ArgumentParser( description="Add UUID identifiers to Latin American institutions YAML file" ) parser.add_argument( "--input", type=Path, default=Path("data/instances/latin_american_institutions_AUTHORITATIVE.yaml"), help="Input YAML file path" ) parser.add_argument( "--output", type=Path, default=None, help="Output YAML file path (defaults to input path, creating a backup)" ) parser.add_argument( "--no-backup", action="store_true", help="Don't create a backup when overwriting input file" ) args = parser.parse_args() # Default output to input if not specified if args.output is None: args.output = args.input # Resolve paths input_path = args.input.resolve() output_path = args.output.resolve() # Check input exists if not input_path.exists(): print(f"Error: Input file not found: {input_path}") sys.exit(1) # Update file try: update_yaml_file(input_path, output_path, backup=not args.no_backup) except Exception as e: print(f"\nError: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()