- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
247 lines
7.9 KiB
Python
Executable file
247 lines
7.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Add UUID identifiers to Latin American institutions YAML file.
|
|
|
|
This script adds three UUID formats to each institution record:
|
|
1. record_id: UUID v7 (time-ordered, random) - for database PKs
|
|
2. ghcid_uuid: UUID v5 (deterministic, SHA-1) - for interoperability
|
|
3. ghcid_uuid_sha256: UUID v8 (deterministic, SHA-256) - for SOTA security
|
|
|
|
Usage:
|
|
python scripts/add_uuids_to_latin_american_institutions.py
|
|
|
|
# With custom input/output paths:
|
|
python scripts/add_uuids_to_latin_american_institutions.py \
|
|
--input data/instances/custom.yaml \
|
|
--output data/instances/custom_with_uuids.yaml
|
|
"""
|
|
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any
|
|
import yaml
|
|
import shutil
|
|
|
|
# Add src to Python path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
|
|
|
from glam_extractor.identifiers.ghcid import GHCIDComponents
|
|
|
|
|
|
def parse_ghcid_string(ghcid_str: str) -> GHCIDComponents:
|
|
"""
|
|
Parse GHCID string into components.
|
|
|
|
Format: CC-RR-CCC-T-III
|
|
- CC: Country code (2 chars)
|
|
- RR: Region code (2-3 chars)
|
|
- CCC: City code (3 chars, may be XXX for unknown)
|
|
- T: Institution type (1 char)
|
|
- III: Institution abbreviation (2-4 chars)
|
|
"""
|
|
parts = ghcid_str.split("-")
|
|
if len(parts) < 5:
|
|
raise ValueError(f"Invalid GHCID format: {ghcid_str} (expected at least 5 parts)")
|
|
|
|
country_code = parts[0]
|
|
region_code = parts[1]
|
|
city_code = parts[2]
|
|
type_code = parts[3]
|
|
abbreviation = parts[4]
|
|
|
|
# Handle optional Wikidata QID (6th part)
|
|
wikidata_qid = parts[5] if len(parts) > 5 else None
|
|
|
|
return GHCIDComponents(
|
|
country_code=country_code,
|
|
region_code=region_code,
|
|
city_locode=city_code,
|
|
institution_type=type_code,
|
|
abbreviation=abbreviation,
|
|
wikidata_qid=wikidata_qid
|
|
)
|
|
|
|
|
|
def add_uuids_to_institution(institution: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Add UUID fields to an institution record.
|
|
|
|
Args:
|
|
institution: Institution dictionary from YAML
|
|
|
|
Returns:
|
|
Updated institution dictionary with UUID fields
|
|
"""
|
|
# Skip if no GHCID
|
|
ghcid_str = institution.get("ghcid")
|
|
if not ghcid_str:
|
|
print(f" Skipping {institution.get('name', 'Unknown')}: No GHCID")
|
|
return institution
|
|
|
|
try:
|
|
# Parse GHCID string
|
|
components = parse_ghcid_string(ghcid_str)
|
|
|
|
# Generate UUIDs
|
|
record_id = str(GHCIDComponents.generate_uuid_v7())
|
|
ghcid_uuid = str(components.to_uuid())
|
|
ghcid_uuid_sha256 = str(components.to_uuid_sha256())
|
|
|
|
# Add UUID fields
|
|
institution["record_id"] = record_id
|
|
institution["ghcid_uuid"] = ghcid_uuid
|
|
institution["ghcid_uuid_sha256"] = ghcid_uuid_sha256
|
|
|
|
# Also add to identifiers list
|
|
identifiers = institution.get("identifiers", [])
|
|
|
|
# Add GHCID_UUID identifier if not exists
|
|
if not any(id.get("identifier_scheme") == "GHCID_UUID" for id in identifiers):
|
|
identifiers.append({
|
|
"identifier_scheme": "GHCID_UUID",
|
|
"identifier_value": ghcid_uuid,
|
|
"identifier_url": f"urn:uuid:{ghcid_uuid}"
|
|
})
|
|
|
|
# Add GHCID_UUID_SHA256 identifier if not exists
|
|
if not any(id.get("identifier_scheme") == "GHCID_UUID_SHA256" for id in identifiers):
|
|
identifiers.append({
|
|
"identifier_scheme": "GHCID_UUID_SHA256",
|
|
"identifier_value": ghcid_uuid_sha256,
|
|
"identifier_url": f"urn:uuid:{ghcid_uuid_sha256}"
|
|
})
|
|
|
|
# Add RECORD_ID identifier if not exists
|
|
if not any(id.get("identifier_scheme") == "RECORD_ID" for id in identifiers):
|
|
identifiers.append({
|
|
"identifier_scheme": "RECORD_ID",
|
|
"identifier_value": record_id,
|
|
"identifier_url": f"urn:uuid:{record_id}"
|
|
})
|
|
|
|
institution["identifiers"] = identifiers
|
|
|
|
print(f" ✓ {institution.get('name', 'Unknown')}: Added UUIDs")
|
|
return institution
|
|
|
|
except Exception as e:
|
|
print(f" ✗ {institution.get('name', 'Unknown')}: Error - {e}")
|
|
return institution
|
|
|
|
|
|
def update_yaml_file(input_path: Path, output_path: Path, backup: bool = True) -> None:
|
|
"""
|
|
Update YAML file with UUID identifiers.
|
|
|
|
Args:
|
|
input_path: Path to input YAML file
|
|
output_path: Path to output YAML file
|
|
backup: Whether to create a backup of the input file
|
|
"""
|
|
print(f"\nReading institutions from: {input_path}")
|
|
|
|
# Read input YAML
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
institutions = yaml.safe_load(f)
|
|
|
|
if not isinstance(institutions, list):
|
|
raise ValueError("YAML file must contain a list of institutions")
|
|
|
|
print(f"Found {len(institutions)} institutions")
|
|
|
|
# Create backup if requested
|
|
if backup and input_path == output_path:
|
|
backup_path = input_path.with_suffix(f".backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml")
|
|
print(f"\nCreating backup: {backup_path}")
|
|
shutil.copy2(input_path, backup_path)
|
|
|
|
# Process each institution
|
|
print("\nProcessing institutions:")
|
|
updated_institutions = []
|
|
for institution in institutions:
|
|
updated = add_uuids_to_institution(institution)
|
|
updated_institutions.append(updated)
|
|
|
|
# Update header comment
|
|
header = f"""---
|
|
# Latin American GLAM Institutions - GHCID + UUID Enhanced
|
|
# Last updated: {datetime.now(timezone.utc).isoformat()}
|
|
# UUID generation: {len([i for i in updated_institutions if 'record_id' in i])}/{len(updated_institutions)} institutions
|
|
#
|
|
# UUID Statistics:
|
|
# - Total institutions: {len(updated_institutions)}
|
|
# - UUIDs generated: {len([i for i in updated_institutions if 'record_id' in i])}
|
|
# - UUID v7 (record_id): Time-ordered database PKs
|
|
# - UUID v5 (ghcid_uuid): SHA-1 interoperability PIDs
|
|
# - UUID v8 (ghcid_uuid_sha256): SHA-256 SOTA PIDs
|
|
|
|
"""
|
|
|
|
# Write output YAML
|
|
print(f"\nWriting updated institutions to: {output_path}")
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(header)
|
|
yaml.dump(
|
|
updated_institutions,
|
|
f,
|
|
allow_unicode=True,
|
|
default_flow_style=False,
|
|
sort_keys=False,
|
|
width=120
|
|
)
|
|
|
|
print(f"\n✓ Successfully updated {len(updated_institutions)} institutions")
|
|
print(f"✓ Added UUIDs to {len([i for i in updated_institutions if 'record_id' in i])} institutions")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Add UUID identifiers to Latin American institutions YAML file"
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=Path,
|
|
default=Path("data/instances/latin_american_institutions_AUTHORITATIVE.yaml"),
|
|
help="Input YAML file path"
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=None,
|
|
help="Output YAML file path (defaults to input path, creating a backup)"
|
|
)
|
|
parser.add_argument(
|
|
"--no-backup",
|
|
action="store_true",
|
|
help="Don't create a backup when overwriting input file"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Default output to input if not specified
|
|
if args.output is None:
|
|
args.output = args.input
|
|
|
|
# Resolve paths
|
|
input_path = args.input.resolve()
|
|
output_path = args.output.resolve()
|
|
|
|
# Check input exists
|
|
if not input_path.exists():
|
|
print(f"Error: Input file not found: {input_path}")
|
|
sys.exit(1)
|
|
|
|
# Update file
|
|
try:
|
|
update_yaml_file(input_path, output_path, backup=not args.no_backup)
|
|
except Exception as e:
|
|
print(f"\nError: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|