235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
"""
|
|
Apply Manual Validation Results to Danish Dataset
|
|
|
|
Reads the validated CSV review file and updates the denmark_complete_enriched.json
|
|
dataset by removing incorrect Wikidata links and updating provenance metadata.
|
|
"""
|
|
|
|
import json
|
|
import csv
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def parse_identifier_string(identifier_str: str) -> Optional[Dict]:
|
|
"""Parse identifier from string representation."""
|
|
if not identifier_str or not isinstance(identifier_str, str):
|
|
return None
|
|
|
|
scheme_match = re.search(r"'identifier_scheme':\s*'([^']+)'", identifier_str)
|
|
value_match = re.search(r"'identifier_value':\s*'([^']+)'", identifier_str)
|
|
url_match = re.search(r"'identifier_url':\s*'([^']+)'", identifier_str)
|
|
|
|
if scheme_match and value_match:
|
|
return {
|
|
'scheme': scheme_match.group(1),
|
|
'value': value_match.group(1),
|
|
'url': url_match.group(1) if url_match else None
|
|
}
|
|
return None
|
|
|
|
|
|
def load_validation_results(csv_path: Path) -> Dict[str, Dict]:
|
|
"""
|
|
Load validation results from CSV.
|
|
|
|
Returns dict mapping institution_id -> validation info
|
|
"""
|
|
validations = {}
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
|
|
for row in reader:
|
|
inst_id = row['institution_id']
|
|
status = row['validation_status'].strip().upper()
|
|
|
|
# Only process rows with validation status
|
|
if status in ['CORRECT', 'INCORRECT', 'UNCERTAIN']:
|
|
validations[inst_id] = {
|
|
'status': status,
|
|
'notes': row['validation_notes'].strip(),
|
|
'wikidata_qid': row['wikidata_qid'],
|
|
'match_score': float(row['match_score'])
|
|
}
|
|
|
|
return validations
|
|
|
|
|
|
def apply_validations(
|
|
institutions: List[Dict],
|
|
validations: Dict[str, Dict]
|
|
) -> tuple[List[Dict], Dict]:
|
|
"""
|
|
Apply validation results to institutions.
|
|
|
|
Returns:
|
|
(updated_institutions, statistics)
|
|
"""
|
|
stats = {
|
|
'total_reviewed': len(validations),
|
|
'correct': 0,
|
|
'incorrect_removed': 0,
|
|
'uncertain_flagged': 0,
|
|
'not_found': 0
|
|
}
|
|
|
|
updated = []
|
|
|
|
for inst in institutions:
|
|
inst_id = inst.get('id', '')
|
|
|
|
if inst_id in validations:
|
|
validation = validations[inst_id]
|
|
status = validation['status']
|
|
|
|
if status == 'CORRECT':
|
|
# Keep Wikidata link, update provenance
|
|
stats['correct'] += 1
|
|
|
|
# Update enrichment history
|
|
if inst.get('enrichment_history'):
|
|
for enrichment in inst['enrichment_history']:
|
|
if enrichment.get('match_score') == validation['match_score']:
|
|
enrichment['manual_validation'] = {
|
|
'validated_date': datetime.now(timezone.utc).isoformat(),
|
|
'validation_status': 'CORRECT',
|
|
'validation_notes': validation['notes']
|
|
}
|
|
|
|
elif status == 'INCORRECT':
|
|
# Remove Wikidata identifier
|
|
stats['incorrect_removed'] += 1
|
|
|
|
# Filter out Wikidata identifier
|
|
if inst.get('identifiers'):
|
|
new_identifiers = []
|
|
for identifier_data in inst['identifiers']:
|
|
identifier = parse_identifier_string(identifier_data) if isinstance(identifier_data, str) else identifier_data
|
|
|
|
# Skip Wikidata identifier with matching QID
|
|
if identifier and isinstance(identifier, dict):
|
|
if identifier.get('scheme') == 'Wikidata' and identifier.get('value') == validation['wikidata_qid']:
|
|
continue
|
|
|
|
new_identifiers.append(identifier_data)
|
|
|
|
inst['identifiers'] = new_identifiers
|
|
|
|
# Update enrichment history
|
|
if inst.get('enrichment_history'):
|
|
for enrichment in inst['enrichment_history']:
|
|
if enrichment.get('match_score') == validation['match_score']:
|
|
enrichment['manual_validation'] = {
|
|
'validated_date': datetime.now(timezone.utc).isoformat(),
|
|
'validation_status': 'INCORRECT',
|
|
'validation_notes': validation['notes'],
|
|
'action_taken': 'Wikidata link removed'
|
|
}
|
|
|
|
elif status == 'UNCERTAIN':
|
|
# Keep link but flag for further review
|
|
stats['uncertain_flagged'] += 1
|
|
|
|
# Update enrichment history
|
|
if inst.get('enrichment_history'):
|
|
for enrichment in inst['enrichment_history']:
|
|
if enrichment.get('match_score') == validation['match_score']:
|
|
enrichment['manual_validation'] = {
|
|
'validated_date': datetime.now(timezone.utc).isoformat(),
|
|
'validation_status': 'UNCERTAIN',
|
|
'validation_notes': validation['notes'],
|
|
'requires_further_review': True
|
|
}
|
|
|
|
updated.append(inst)
|
|
|
|
# Check for validations that didn't match any institution
|
|
matched_ids = {inst.get('id', '') for inst in institutions}
|
|
for inst_id in validations:
|
|
if inst_id not in matched_ids:
|
|
stats['not_found'] += 1
|
|
|
|
return updated, stats
|
|
|
|
|
|
def main():
|
|
print("=" * 70)
|
|
print("Apply Wikidata Validation Results")
|
|
print("=" * 70)
|
|
|
|
# Load dataset
|
|
input_path = Path('data/instances/denmark_complete_enriched.json')
|
|
print(f"\nLoading dataset: {input_path}")
|
|
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
institutions = json.load(f)
|
|
|
|
print(f" ✅ Loaded {len(institutions)} institutions")
|
|
|
|
# Load validation results
|
|
csv_path = Path('data/review/denmark_wikidata_fuzzy_matches.csv')
|
|
print(f"\nLoading validation results: {csv_path}")
|
|
|
|
if not csv_path.exists():
|
|
print(f" ❌ Validation CSV not found: {csv_path}")
|
|
print(f" Please complete manual review first.")
|
|
return
|
|
|
|
validations = load_validation_results(csv_path)
|
|
print(f" ✅ Loaded {len(validations)} validation results")
|
|
|
|
if len(validations) == 0:
|
|
print(f"\n⚠️ No validation results found in CSV.")
|
|
print(f" Please fill in the 'validation_status' column with:")
|
|
print(f" - CORRECT (keep Wikidata link)")
|
|
print(f" - INCORRECT (remove Wikidata link)")
|
|
print(f" - UNCERTAIN (flag for further review)")
|
|
return
|
|
|
|
# Apply validations
|
|
print(f"\nApplying validation results...")
|
|
updated_institutions, stats = apply_validations(institutions, validations)
|
|
|
|
# Save updated dataset
|
|
output_path = Path('data/instances/denmark_complete_validated.json')
|
|
print(f"\nSaving validated dataset: {output_path}")
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(updated_institutions, f, indent=2, ensure_ascii=False)
|
|
|
|
size_mb = output_path.stat().st_size / (1024 * 1024)
|
|
print(f" ✅ Saved ({size_mb:.2f} MB)")
|
|
|
|
# Print statistics
|
|
print("\n" + "=" * 70)
|
|
print("Validation Statistics")
|
|
print("=" * 70)
|
|
print(f"Total reviewed: {stats['total_reviewed']}")
|
|
print(f"Correct (kept): {stats['correct']}")
|
|
print(f"Incorrect (removed): {stats['incorrect_removed']}")
|
|
print(f"Uncertain (flagged): {stats['uncertain_flagged']}")
|
|
|
|
if stats['not_found'] > 0:
|
|
print(f"\n⚠️ Warning: {stats['not_found']} validation(s) did not match any institution")
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Next Steps")
|
|
print("=" * 70)
|
|
print("""
|
|
1. Review the updated dataset: data/instances/denmark_complete_validated.json
|
|
2. Re-export RDF with corrected Wikidata links:
|
|
python scripts/export_denmark_rdf.py --input denmark_complete_validated.json
|
|
3. Update documentation with validation results
|
|
4. Commit changes to version control
|
|
""")
|
|
|
|
print("=" * 70)
|
|
print("✅ Validation Applied Successfully")
|
|
print("=" * 70)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|