glam/scripts/validate_geocoding_results.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

365 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Validate Geocoding Results
Analyzes the geocoded global dataset and generates:
- Coverage statistics by country and institution type
- Quality checks (valid coordinates, outliers, etc.)
- Visualization data for mapping
- Detailed validation report
Usage:
python scripts/validate_geocoding_results.py
"""
import yaml
from pathlib import Path
from typing import Dict, List, Tuple
from collections import defaultdict
from datetime import datetime, timezone
import json
def validate_coordinates(lat: float, lon: float) -> Tuple[bool, str]:
"""
Validate that coordinates are reasonable.
Returns:
(is_valid, error_message)
"""
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
return False, "Non-numeric coordinates"
if lat < -90 or lat > 90:
return False, f"Invalid latitude: {lat} (must be -90 to 90)"
if lon < -180 or lon > 180:
return False, f"Invalid longitude: {lon} (must be -180 to 180)"
# Check for common placeholder values
if lat == 0 and lon == 0:
return False, "Null Island (0,0) - likely geocoding failure"
return True, ""
def analyze_geocoding_coverage(institutions: List[Dict]) -> Dict:
"""Analyze geocoding coverage and quality."""
stats = {
'total_institutions': len(institutions),
'with_locations': 0,
'with_coordinates': 0,
'with_valid_coordinates': 0,
'invalid_coordinates': [],
'by_country': defaultdict(lambda: {
'total': 0,
'geocoded': 0,
'failed': 0,
'invalid': 0
}),
'by_institution_type': defaultdict(lambda: {
'total': 0,
'geocoded': 0,
'failed': 0
}),
'coordinate_ranges': {
'min_lat': 90,
'max_lat': -90,
'min_lon': 180,
'max_lon': -180
},
'geonames_ids': 0
}
for inst in institutions:
locations = inst.get('locations', [])
inst_type = inst.get('institution_type', 'UNKNOWN')
country = locations[0].get('country', 'Unknown') if locations else 'Unknown'
stats['by_country'][country]['total'] += 1
stats['by_institution_type'][inst_type]['total'] += 1
if locations:
stats['with_locations'] += 1
for loc in locations:
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is not None and lon is not None:
stats['with_coordinates'] += 1
stats['by_country'][country]['geocoded'] += 1
stats['by_institution_type'][inst_type]['geocoded'] += 1
# Validate coordinates
is_valid, error_msg = validate_coordinates(lat, lon)
if is_valid:
stats['with_valid_coordinates'] += 1
# Track coordinate ranges
stats['coordinate_ranges']['min_lat'] = min(stats['coordinate_ranges']['min_lat'], lat)
stats['coordinate_ranges']['max_lat'] = max(stats['coordinate_ranges']['max_lat'], lat)
stats['coordinate_ranges']['min_lon'] = min(stats['coordinate_ranges']['min_lon'], lon)
stats['coordinate_ranges']['max_lon'] = max(stats['coordinate_ranges']['max_lon'], lon)
# Check for GeoNames ID
if loc.get('geonames_id'):
stats['geonames_ids'] += 1
else:
stats['by_country'][country]['invalid'] += 1
stats['invalid_coordinates'].append({
'name': inst.get('name'),
'ghcid': inst.get('ghcid'),
'country': country,
'lat': lat,
'lon': lon,
'error': error_msg
})
else:
stats['by_country'][country]['failed'] += 1
stats['by_institution_type'][inst_type]['failed'] += 1
return stats
def generate_geojson(institutions: List[Dict], output_file: Path):
"""Generate GeoJSON for mapping."""
features = []
for inst in institutions:
locations = inst.get('locations', [])
if not locations:
continue
for loc in locations:
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is None or lon is None:
continue
is_valid, _ = validate_coordinates(lat, lon)
if not is_valid:
continue
feature = {
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': [lon, lat] # GeoJSON uses [lon, lat] order
},
'properties': {
'name': inst.get('name', 'Unknown'),
'ghcid': inst.get('ghcid', 'Unknown'),
'institution_type': inst.get('institution_type', 'UNKNOWN'),
'country': loc.get('country', 'Unknown'),
'city': loc.get('city', 'Unknown'),
'homepage': inst.get('homepage', '')
}
}
features.append(feature)
geojson = {
'type': 'FeatureCollection',
'features': features
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(geojson, f, indent=2, ensure_ascii=False)
return len(features)
def generate_report(stats: Dict, output_file: Path):
"""Generate detailed validation report."""
total = stats['total_institutions']
geocoded = stats['with_valid_coordinates']
coverage = (geocoded / total * 100) if total > 0 else 0
report = f"""# Geocoding Validation Report
Generated: {Path(__file__).name}
## Summary
- **Total Institutions**: {total:,}
- **With Location Data**: {stats['with_locations']:,} ({stats['with_locations']/total*100:.1f}%)
- **Successfully Geocoded**: {geocoded:,} ({coverage:.1f}%)
- **Failed Geocoding**: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%)
- **Invalid Coordinates**: {len(stats['invalid_coordinates'])}
- **With GeoNames IDs**: {stats['geonames_ids']:,} ({stats['geonames_ids']/geocoded*100:.1f}% of geocoded)
## Coordinate Ranges
- **Latitude**: {stats['coordinate_ranges']['min_lat']:.4f} to {stats['coordinate_ranges']['max_lat']:.4f}
- **Longitude**: {stats['coordinate_ranges']['min_lon']:.4f} to {stats['coordinate_ranges']['max_lon']:.4f}
## Coverage by Country
| Country | Total | Geocoded | Failed | Invalid | Coverage |
|---------|-------|----------|--------|---------|----------|
"""
# Sort by total descending
for country, data in sorted(stats['by_country'].items(), key=lambda x: x[1]['total'], reverse=True):
cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0
report += f"| {country:<7} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {data['invalid']:>7} | {cov:>6.1f}% |\n"
report += "\n## Coverage by Institution Type\n\n"
report += "| Type | Total | Geocoded | Failed | Coverage |\n"
report += "|------|-------|----------|--------|---------|\n"
for inst_type, data in sorted(stats['by_institution_type'].items(), key=lambda x: x[1]['total'], reverse=True):
cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0
report += f"| {inst_type:<20} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {cov:>6.1f}% |\n"
# Invalid coordinates
if stats['invalid_coordinates']:
report += f"\n## Invalid Coordinates ({len(stats['invalid_coordinates'])})\n\n"
report += "| Institution | GHCID | Country | Lat | Lon | Error |\n"
report += "|-------------|-------|---------|-----|-----|-------|\n"
for inv in stats['invalid_coordinates'][:50]: # Limit to first 50
report += f"| {inv['name'][:30]} | {inv['ghcid']} | {inv['country']} | {inv['lat']} | {inv['lon']} | {inv['error']} |\n"
if len(stats['invalid_coordinates']) > 50:
report += f"\n*... and {len(stats['invalid_coordinates']) - 50} more*\n"
# Success indicators
report += "\n## Quality Indicators\n\n"
if coverage >= 95:
report += "✅ **Excellent Coverage** (≥95%)\n\n"
elif coverage >= 90:
report += "✅ **Good Coverage** (≥90%)\n\n"
elif coverage >= 80:
report += "⚠️ **Moderate Coverage** (≥80%)\n\n"
else:
report += "❌ **Low Coverage** (<80%)\n\n"
if len(stats['invalid_coordinates']) == 0:
report += "✅ **No Invalid Coordinates**\n\n"
elif len(stats['invalid_coordinates']) < 10:
report += f"⚠️ **Few Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n"
else:
report += f"❌ **Multiple Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n"
geonames_coverage = (stats['geonames_ids'] / geocoded * 100) if geocoded > 0 else 0
if geonames_coverage >= 50:
report += f"✅ **Good GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
elif geonames_coverage >= 25:
report += f"⚠️ **Moderate GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
else:
report += f"❌ **Low GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
# Recommendations
report += "## Recommendations\n\n"
if coverage < 95:
report += "1. **Improve Geocoding Coverage**:\n"
report += " - Review failed geocoding queries\n"
report += " - Try alternative query formats for low-coverage countries\n"
report += " - Consider manual geocoding for high-value institutions\n\n"
if len(stats['invalid_coordinates']) > 0:
report += "2. **Fix Invalid Coordinates**:\n"
report += " - Review institutions with (0,0) coordinates\n"
report += " - Verify coordinates are in correct country\n"
report += " - Check for coordinate swap (lat/lon reversed)\n\n"
if geonames_coverage < 50:
report += "3. **Enhance GeoNames IDs**:\n"
report += " - GeoNames IDs enable better geographic linking\n"
report += " - Consider querying GeoNames API directly\n"
report += " - Use reverse geocoding to find GeoNames IDs\n\n"
report += "---\n\n"
report += f"**Generated**: {yaml.dump(datetime.now(timezone.utc).isoformat())}\n"
report += f"**Source**: `data/instances/global/global_heritage_institutions.yaml`\n"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
def main():
from datetime import datetime, timezone
base_dir = Path(__file__).parent.parent
global_file = base_dir / 'data' / 'instances' / 'global' / 'global_heritage_institutions.yaml'
report_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_validation_report.md'
geojson_file = base_dir / 'data' / 'instances' / 'global' / 'global_institutions.geojson'
stats_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_statistics.yaml'
print("=" * 80)
print("GEOCODING VALIDATION")
print("=" * 80)
print()
# Load dataset
print(f"Loading dataset from {global_file}")
with open(global_file, 'r', encoding='utf-8') as f:
institutions = yaml.safe_load(f)
print(f"Loaded {len(institutions):,} institutions")
print()
# Analyze coverage
print("Analyzing geocoding coverage...")
stats = analyze_geocoding_coverage(institutions)
# Print summary
total = stats['total_institutions']
geocoded = stats['with_valid_coordinates']
coverage = (geocoded / total * 100) if total > 0 else 0
print()
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total institutions: {total:,}")
print(f"Successfully geocoded: {geocoded:,} ({coverage:.1f}%)")
print(f"Failed geocoding: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%)")
print(f"Invalid coordinates: {len(stats['invalid_coordinates'])}")
print(f"With GeoNames IDs: {stats['geonames_ids']:,}")
print("=" * 80)
print()
# Generate GeoJSON
print(f"Generating GeoJSON for mapping...")
feature_count = generate_geojson(institutions, geojson_file)
print(f"✅ Wrote {feature_count:,} features to {geojson_file}")
print()
# Generate report
print(f"Generating validation report...")
generate_report(stats, report_file)
print(f"✅ Wrote report to {report_file}")
print()
# Save statistics
print(f"Saving statistics...")
with open(stats_file, 'w', encoding='utf-8') as f:
# Convert defaultdict to regular dict for YAML serialization
stats_export = dict(stats)
stats_export['by_country'] = {k: dict(v) for k, v in stats['by_country'].items()}
stats_export['by_institution_type'] = {k: dict(v) for k, v in stats['by_institution_type'].items()}
yaml.dump(stats_export, f, default_flow_style=False, allow_unicode=True)
print(f"✅ Wrote statistics to {stats_file}")
print()
print("=" * 80)
print("VALIDATION COMPLETE")
print("=" * 80)
print()
print(f"Next steps:")
print(f"1. Review validation report: {report_file}")
print(f"2. Visualize on map: {geojson_file}")
print(f"3. If coverage is good (≥95%), proceed with Wikidata enrichment")
print(f"4. If coverage is low, review failed geocoding and retry")
print()
if __name__ == '__main__':
main()