#!/usr/bin/env python3 """ Process Mexican GLAM institutions from conversation files. Cleans, merges, deduplicates, and converts to LinkML YAML format. """ import json import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Set from collections import defaultdict # Non-institution terms to filter out (metadata standards, technical terms, statistics) NON_INSTITUTION_TERMS = { # Metadata standards "dublin core", "mods", "mets", "vra core", "object id", "marc21", "marc 21", "handle system", "orcid", "doi", # Technical/infrastructure terms "master formats", "documents", "audio/video", "storage", "climate control", "digital objects", "photography", "tiff", "jpeg", "pdf/a", "api", "oai-pmh", "rest", "iiif", "tainacan", "google arts & culture", "opportunities", # Aggregate statistics "current status", "museums", "archives", "libraries", "archaeological sites", "inah museums", "pueblos mágicos", "digital objects", # Generic descriptors "access", "platform", "network", "system", "standard", "format", "preservation", "digitization", "metadata" } def is_real_institution(name: str) -> bool: """Check if extracted name is a real institution vs metadata/technical term.""" name_lower = name.lower().strip() # Direct term match if name_lower in NON_INSTITUTION_TERMS: return False # Contains only generic terms words = set(name_lower.split()) if words <= NON_INSTITUTION_TERMS: # Subset check return False # Very short names without clear institutional indicators if len(name_lower) < 5 and not any(keyword in name_lower for keyword in ['inah', 'unam', 'agn']): return False return True def extract_institutions_from_report(report_text: str) -> List[Dict]: """Extract institution names and metadata from the comprehensive report.""" institutions = [] lines = report_text.split('\n') i = 0 while i < len(lines): line = lines[i].strip() # Pattern 1: Section headers like "### 1. Instituto Nacional de Antropología e Historia (INAH)" section_match = re.match(r'^###\s+\d+\.\s+(.+?)(?:\s*\(([A-Z]+)\))?$', line) if section_match: institution_name = section_match.group(1).strip() acronym = section_match.group(2) if section_match.group(2) else None # Skip if not a real institution if not is_real_institution(institution_name): i += 1 continue current_inst = { 'name': institution_name, 'urls': [], 'emails': [], 'description': None, 'institution_type': 'MIXED' } # Look ahead for metadata in following lines (next 30 lines) for j in range(i+1, min(i+30, len(lines))): next_line = lines[j] # Stop at next section header if next_line.startswith('###'): break # Extract **Full Name**: pattern full_name_match = re.match(r'\*\*Full Name\*\*:\s*(.+)', next_line) if full_name_match: # Use the full name if it's more complete full_name = full_name_match.group(1).strip() if '/' in full_name: # Has bilingual name # Take the first part (usually Spanish) current_inst['name'] = full_name.split('/')[0].strip() # Extract URLs (Main Website, Main Portal, Digital Repository, etc.) url_match = re.search(r'https?://[^\s\)]+', next_line) if url_match: url = url_match.group().rstrip(',.;:') if url not in current_inst['urls']: current_inst['urls'].append(url) # Extract emails email_match = re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', next_line) if email_match: email = email_match.group() if email not in current_inst['emails']: current_inst['emails'].append(email) # Classify institution type name_lower = current_inst['name'].lower() if 'museo' in name_lower or 'museum' in name_lower or 'mediateca' in name_lower: current_inst['institution_type'] = 'MUSEUM' elif 'archivo' in name_lower or 'archive' in name_lower: current_inst['institution_type'] = 'ARCHIVE' elif 'biblioteca' in name_lower or 'library' in name_lower: current_inst['institution_type'] = 'LIBRARY' elif 'universidad' in name_lower or 'university' in name_lower or 'college' in name_lower: current_inst['institution_type'] = 'EDUCATION_PROVIDER' elif 'secretar' in name_lower or 'instituto nacional' in name_lower or 'ministry' in name_lower: current_inst['institution_type'] = 'OFFICIAL_INSTITUTION' institutions.append(current_inst) # Pattern 2: Standalone bold platform names like "**Mexicana - Repositorio del Patrimonio Cultural**" elif line.startswith('**') and line.endswith('**') and not ':' in line: name = line.strip('*').strip() if is_real_institution(name) and len(name) > 10: # Avoid short generic terms current_inst = { 'name': name, 'urls': [], 'emails': [], 'description': None, 'institution_type': 'MIXED' } # Look ahead for URLs for j in range(i+1, min(i+10, len(lines))): next_line = lines[j] if next_line.startswith('###') or (next_line.startswith('**') and next_line.endswith('**')): break url_match = re.search(r'https?://[^\s\)]+', next_line) if url_match: url = url_match.group().rstrip(',.;:') if url not in current_inst['urls']: current_inst['urls'].append(url) # Classify name_lower = name.lower() if 'museo' in name_lower or 'museum' in name_lower: current_inst['institution_type'] = 'MUSEUM' elif 'archivo' in name_lower or 'archive' in name_lower: current_inst['institution_type'] = 'ARCHIVE' elif 'biblioteca' in name_lower or 'library' in name_lower or 'repositorio' in name_lower: current_inst['institution_type'] = 'LIBRARY' if current_inst['urls']: # Only add if we found at least a URL institutions.append(current_inst) i += 1 return institutions def clean_file1_data(file1_data: Dict) -> List[Dict]: """Remove non-institution entries from File 1 parsed data.""" cleaned = [] for inst in file1_data['institutions']: if is_real_institution(inst['name']): cleaned.append(inst) else: print(f" Filtered out: {inst['name']}") return cleaned def normalize_name(name: str) -> str: """Normalize institution name for comparison.""" # Remove parenthetical info, lowercase, remove extra spaces name = re.sub(r'\([^)]*\)', '', name) name = name.lower().strip() name = re.sub(r'\s+', ' ', name) return name def merge_institutions(inst1: Dict, inst2: Dict) -> Dict: """Merge two institution records, preferring more complete data.""" merged = inst1.copy() # Merge URLs (deduplicate) merged['urls'] = list(set(merged.get('urls', []) + inst2.get('urls', []))) # Merge emails (deduplicate) merged['emails'] = list(set(merged.get('emails', []) + inst2.get('emails', []))) # Prefer non-null description if not merged.get('description') and inst2.get('description'): merged['description'] = inst2['description'] # Prefer more specific institution type (not MIXED) if merged.get('institution_type') == 'MIXED' and inst2.get('institution_type') != 'MIXED': merged['institution_type'] = inst2['institution_type'] # Keep state from first record if present if not merged.get('state') and inst2.get('state'): merged['state'] = inst2['state'] return merged def deduplicate_institutions(institutions: List[Dict]) -> List[Dict]: """Deduplicate institutions by normalized name.""" name_map = {} for inst in institutions: norm_name = normalize_name(inst['name']) if norm_name in name_map: # Merge with existing name_map[norm_name] = merge_institutions(name_map[norm_name], inst) else: name_map[norm_name] = inst return list(name_map.values()) def generate_ghcid(country_code: str, inst_type: str, name: str, index: int) -> str: """Generate GHCID identifier following schema spec.""" # Type code mapping type_codes = { 'MUSEUM': 'M', 'ARCHIVE': 'A', 'LIBRARY': 'L', 'GALLERY': 'G', 'OFFICIAL_INSTITUTION': 'O', 'RESEARCH_CENTER': 'R', 'EDUCATION_PROVIDER': 'E', 'MIXED': 'M', 'UNKNOWN': 'U' } type_code = type_codes.get(inst_type, 'U') # Create slug from name slug = re.sub(r'[^a-z0-9]+', '-', name.lower()) slug = slug.strip('-')[:30] # Max 30 chars return f"https://w3id.org/heritage/custodian/{country_code.lower()}/{type_code.lower()}-{slug}-{index:04d}" def convert_to_linkml_yaml(institutions: List[Dict], file1_path: str, file2_path: str) -> str: """Convert institutions to LinkML YAML format with source file references. Args: institutions: List of institution dictionaries (each may have 'source_file' key) file1_path: Path to Mexican GLAM File 1 file2_path: Path to Mexican GLAM File 2 """ yaml_lines = [ "---", "# Mexican GLAM Institutions", "# Extracted from 2 conversation files:", f"# File 1: {file1_path}", f"# File 2: {file2_path}", "" ] for i, inst in enumerate(institutions, 1): # Generate GHCID ghcid = generate_ghcid('MX', inst.get('institution_type', 'MIXED'), inst['name'], i) # Escape double quotes in name by using single quotes if name contains double quotes name = inst['name'] if '"' in name: # Use single quotes and escape any single quotes in the name name_escaped = name.replace("'", "''") name_field = f" name: '{name_escaped}'" else: name_field = f" name: \"{name}\"" yaml_lines.append(f"- id: {ghcid}") yaml_lines.append(name_field) yaml_lines.append(f" institution_type: {inst.get('institution_type', 'MIXED')}") # Alternative names (if any) # (Could extract from parentheticals in future enhancement) # Description if inst.get('description'): yaml_lines.append(f" description: >-") yaml_lines.append(f" {inst['description']}") # Locations if inst.get('state'): yaml_lines.append(f" locations:") yaml_lines.append(f" - region: {inst['state']}") yaml_lines.append(f" country: MX") # Identifiers (URLs and emails) if inst.get('urls') or inst.get('emails'): yaml_lines.append(f" identifiers:") for url in inst.get('urls', []): yaml_lines.append(f" - identifier_scheme: Website") yaml_lines.append(f" identifier_value: {url}") yaml_lines.append(f" identifier_url: {url}") for email in inst.get('emails', []): yaml_lines.append(f" - identifier_scheme: Email") yaml_lines.append(f" identifier_value: {email}") # Provenance with source file reference extraction_date = datetime.now(timezone.utc).isoformat() source_file = inst.get('source_file', file1_path) # Default to file1 yaml_lines.append(f" provenance:") yaml_lines.append(f" data_source: CONVERSATION_NLP") yaml_lines.append(f" data_tier: TIER_4_INFERRED") yaml_lines.append(f" extraction_date: \"{extraction_date}\"") yaml_lines.append(f" extraction_method: \"Multi-file NLP extraction with deduplication\"") yaml_lines.append(f" confidence_score: 0.85") yaml_lines.append(f" conversation_id: \"mixed\"") yaml_lines.append(f" source_url: \"file://{source_file}\"") yaml_lines.append("") # Blank line between records return '\n'.join(yaml_lines) def main(): # Define source file paths file1_path = "/Users/kempersc/Documents/claude/data-2025-11-02-18-13-26-batch-0000/conversations/2025-09-22T14-44-06-c5c5529d-1405-47ff-bee8-16aaa6f97b7e-Mexican_GLAM_inventories_and_catalogues.json" file2_path = "/Users/kempersc/Documents/claude/data-2025-11-02-18-13-26-batch-0000/conversations/2025-09-23T09-59-53-3957d339-37cb-4944-8693-81f6db76bde8-Mexican_GLAM_resources_inventory.json" # Load File 1 parsed data print("Loading File 1 parsed data...") with open('/tmp/mexican_institutions_parsed.json', 'r', encoding='utf-8') as f: file1_data = json.load(f) print(f"File 1: {len(file1_data['institutions'])} raw entries") # Clean File 1 data print("\nCleaning File 1 data (removing non-institutions)...") file1_institutions = clean_file1_data(file1_data) print(f"File 1: {len(file1_institutions)} valid institutions after cleaning") # Load File 2 print("\nLoading File 2...") with open(file2_path, 'r', encoding='utf-8') as f: file2_data = json.load(f) # Extract report text from File 2 report_text = None for msg in file2_data.get('chat_messages', []): for content in msg.get('content', []): if content.get('type') == 'tool_use' and content.get('name') == 'artifacts': report_text = content.get('input', {}).get('content', '') break if report_text: break # Extract institutions from File 2 print("Extracting institutions from File 2 report...") file2_institutions = [] if report_text: file2_institutions = extract_institutions_from_report(report_text) print(f"File 2: {len(file2_institutions)} institutions extracted") # Merge all institutions print("\nMerging institutions from both files...") all_institutions = file1_institutions + file2_institutions print(f"Combined: {len(all_institutions)} total entries") # Deduplicate print("\nDeduplicating by normalized name...") final_institutions = deduplicate_institutions(all_institutions) print(f"Final: {len(final_institutions)} unique institutions") # Statistics print("\n" + "="*60) print("FINAL STATISTICS") print("="*60) type_counts = defaultdict(int) state_counts = defaultdict(int) with_urls = 0 with_emails = 0 for inst in final_institutions: type_counts[inst.get('institution_type', 'MIXED')] += 1 if inst.get('state'): state_counts[inst['state']] += 1 if inst.get('urls'): with_urls += 1 if inst.get('emails'): with_emails += 1 print(f"\nTotal institutions: {len(final_institutions)}") print(f"\nBy type:") for itype, count in sorted(type_counts.items(), key=lambda x: -x[1]): print(f" {itype}: {count}") print(f"\nTop 10 states by institution count:") for state, count in sorted(state_counts.items(), key=lambda x: -x[1])[:10]: print(f" {state}: {count}") print(f"\nIdentifiers:") print(f" Institutions with URLs: {with_urls} ({with_urls/len(final_institutions)*100:.1f}%)") print(f" Institutions with emails: {with_emails} ({with_emails/len(final_institutions)*100:.1f}%)") # Convert to LinkML YAML print("\nConverting to LinkML YAML format...") yaml_output = convert_to_linkml_yaml(final_institutions, file1_path, file2_path) # Write output output_path = Path('/Users/kempersc/apps/glam/data/instances/mexican_institutions.yaml') output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: f.write(yaml_output) print(f"\nOutput written to: {output_path}") print(f"Total records: {len(final_institutions)}") # Save JSON version for analysis json_output_path = '/tmp/mexican_institutions_final.json' with open(json_output_path, 'w', encoding='utf-8') as f: json.dump({ 'institutions': final_institutions, 'statistics': { 'total': len(final_institutions), 'by_type': dict(type_counts), 'by_state': dict(state_counts), 'with_urls': with_urls, 'with_emails': with_emails } }, f, indent=2, ensure_ascii=False) print(f"JSON version saved to: {json_output_path}") if __name__ == '__main__': main()