""" Apply Manual Validation Results to Danish Dataset Reads the validated CSV review file and updates the denmark_complete_enriched.json dataset by removing incorrect Wikidata links and updating provenance metadata. """ import json import csv import re from pathlib import Path from typing import Dict, List, Optional, Set from datetime import datetime, timezone def parse_identifier_string(identifier_str: str) -> Optional[Dict]: """Parse identifier from string representation.""" if not identifier_str or not isinstance(identifier_str, str): return None scheme_match = re.search(r"'identifier_scheme':\s*'([^']+)'", identifier_str) value_match = re.search(r"'identifier_value':\s*'([^']+)'", identifier_str) url_match = re.search(r"'identifier_url':\s*'([^']+)'", identifier_str) if scheme_match and value_match: return { 'scheme': scheme_match.group(1), 'value': value_match.group(1), 'url': url_match.group(1) if url_match else None } return None def load_validation_results(csv_path: Path) -> Dict[str, Dict]: """ Load validation results from CSV. Returns dict mapping institution_id -> validation info """ validations = {} with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: inst_id = row['institution_id'] status = row['validation_status'].strip().upper() # Only process rows with validation status if status in ['CORRECT', 'INCORRECT', 'UNCERTAIN']: validations[inst_id] = { 'status': status, 'notes': row['validation_notes'].strip(), 'wikidata_qid': row['wikidata_qid'], 'match_score': float(row['match_score']) } return validations def apply_validations( institutions: List[Dict], validations: Dict[str, Dict] ) -> tuple[List[Dict], Dict]: """ Apply validation results to institutions. Returns: (updated_institutions, statistics) """ stats = { 'total_reviewed': len(validations), 'correct': 0, 'incorrect_removed': 0, 'uncertain_flagged': 0, 'not_found': 0 } updated = [] for inst in institutions: inst_id = inst.get('id', '') if inst_id in validations: validation = validations[inst_id] status = validation['status'] if status == 'CORRECT': # Keep Wikidata link, update provenance stats['correct'] += 1 # Update enrichment history if inst.get('enrichment_history'): for enrichment in inst['enrichment_history']: if enrichment.get('match_score') == validation['match_score']: enrichment['manual_validation'] = { 'validated_date': datetime.now(timezone.utc).isoformat(), 'validation_status': 'CORRECT', 'validation_notes': validation['notes'] } elif status == 'INCORRECT': # Remove Wikidata identifier stats['incorrect_removed'] += 1 # Filter out Wikidata identifier if inst.get('identifiers'): new_identifiers = [] for identifier_data in inst['identifiers']: identifier = parse_identifier_string(identifier_data) if isinstance(identifier_data, str) else identifier_data # Skip Wikidata identifier with matching QID if identifier and isinstance(identifier, dict): if identifier.get('scheme') == 'Wikidata' and identifier.get('value') == validation['wikidata_qid']: continue new_identifiers.append(identifier_data) inst['identifiers'] = new_identifiers # Update enrichment history if inst.get('enrichment_history'): for enrichment in inst['enrichment_history']: if enrichment.get('match_score') == validation['match_score']: enrichment['manual_validation'] = { 'validated_date': datetime.now(timezone.utc).isoformat(), 'validation_status': 'INCORRECT', 'validation_notes': validation['notes'], 'action_taken': 'Wikidata link removed' } elif status == 'UNCERTAIN': # Keep link but flag for further review stats['uncertain_flagged'] += 1 # Update enrichment history if inst.get('enrichment_history'): for enrichment in inst['enrichment_history']: if enrichment.get('match_score') == validation['match_score']: enrichment['manual_validation'] = { 'validated_date': datetime.now(timezone.utc).isoformat(), 'validation_status': 'UNCERTAIN', 'validation_notes': validation['notes'], 'requires_further_review': True } updated.append(inst) # Check for validations that didn't match any institution matched_ids = {inst.get('id', '') for inst in institutions} for inst_id in validations: if inst_id not in matched_ids: stats['not_found'] += 1 return updated, stats def main(): print("=" * 70) print("Apply Wikidata Validation Results") print("=" * 70) # Load dataset input_path = Path('data/instances/denmark_complete_enriched.json') print(f"\nLoading dataset: {input_path}") with open(input_path, 'r', encoding='utf-8') as f: institutions = json.load(f) print(f" ✅ Loaded {len(institutions)} institutions") # Load validation results csv_path = Path('data/review/denmark_wikidata_fuzzy_matches.csv') print(f"\nLoading validation results: {csv_path}") if not csv_path.exists(): print(f" ❌ Validation CSV not found: {csv_path}") print(f" Please complete manual review first.") return validations = load_validation_results(csv_path) print(f" ✅ Loaded {len(validations)} validation results") if len(validations) == 0: print(f"\n⚠️ No validation results found in CSV.") print(f" Please fill in the 'validation_status' column with:") print(f" - CORRECT (keep Wikidata link)") print(f" - INCORRECT (remove Wikidata link)") print(f" - UNCERTAIN (flag for further review)") return # Apply validations print(f"\nApplying validation results...") updated_institutions, stats = apply_validations(institutions, validations) # Save updated dataset output_path = Path('data/instances/denmark_complete_validated.json') print(f"\nSaving validated dataset: {output_path}") with open(output_path, 'w', encoding='utf-8') as f: json.dump(updated_institutions, f, indent=2, ensure_ascii=False) size_mb = output_path.stat().st_size / (1024 * 1024) print(f" ✅ Saved ({size_mb:.2f} MB)") # Print statistics print("\n" + "=" * 70) print("Validation Statistics") print("=" * 70) print(f"Total reviewed: {stats['total_reviewed']}") print(f"Correct (kept): {stats['correct']}") print(f"Incorrect (removed): {stats['incorrect_removed']}") print(f"Uncertain (flagged): {stats['uncertain_flagged']}") if stats['not_found'] > 0: print(f"\n⚠️ Warning: {stats['not_found']} validation(s) did not match any institution") print("\n" + "=" * 70) print("Next Steps") print("=" * 70) print(""" 1. Review the updated dataset: data/instances/denmark_complete_validated.json 2. Re-export RDF with corrected Wikidata links: python scripts/export_denmark_rdf.py --input denmark_complete_validated.json 3. Update documentation with validation results 4. Commit changes to version control """) print("=" * 70) print("✅ Validation Applied Successfully") print("=" * 70) if __name__ == '__main__': main()