#!/usr/bin/env python3 """ Validate Geocoding Results Analyzes the geocoded global dataset and generates: - Coverage statistics by country and institution type - Quality checks (valid coordinates, outliers, etc.) - Visualization data for mapping - Detailed validation report Usage: python scripts/validate_geocoding_results.py """ import yaml from pathlib import Path from typing import Dict, List, Tuple from collections import defaultdict from datetime import datetime, timezone import json def validate_coordinates(lat: float, lon: float) -> Tuple[bool, str]: """ Validate that coordinates are reasonable. Returns: (is_valid, error_message) """ if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)): return False, "Non-numeric coordinates" if lat < -90 or lat > 90: return False, f"Invalid latitude: {lat} (must be -90 to 90)" if lon < -180 or lon > 180: return False, f"Invalid longitude: {lon} (must be -180 to 180)" # Check for common placeholder values if lat == 0 and lon == 0: return False, "Null Island (0,0) - likely geocoding failure" return True, "" def analyze_geocoding_coverage(institutions: List[Dict]) -> Dict: """Analyze geocoding coverage and quality.""" stats = { 'total_institutions': len(institutions), 'with_locations': 0, 'with_coordinates': 0, 'with_valid_coordinates': 0, 'invalid_coordinates': [], 'by_country': defaultdict(lambda: { 'total': 0, 'geocoded': 0, 'failed': 0, 'invalid': 0 }), 'by_institution_type': defaultdict(lambda: { 'total': 0, 'geocoded': 0, 'failed': 0 }), 'coordinate_ranges': { 'min_lat': 90, 'max_lat': -90, 'min_lon': 180, 'max_lon': -180 }, 'geonames_ids': 0 } for inst in institutions: locations = inst.get('locations', []) inst_type = inst.get('institution_type', 'UNKNOWN') country = locations[0].get('country', 'Unknown') if locations else 'Unknown' stats['by_country'][country]['total'] += 1 stats['by_institution_type'][inst_type]['total'] += 1 if locations: stats['with_locations'] += 1 for loc in locations: lat = loc.get('latitude') lon = loc.get('longitude') if lat is not None and lon is not None: stats['with_coordinates'] += 1 stats['by_country'][country]['geocoded'] += 1 stats['by_institution_type'][inst_type]['geocoded'] += 1 # Validate coordinates is_valid, error_msg = validate_coordinates(lat, lon) if is_valid: stats['with_valid_coordinates'] += 1 # Track coordinate ranges stats['coordinate_ranges']['min_lat'] = min(stats['coordinate_ranges']['min_lat'], lat) stats['coordinate_ranges']['max_lat'] = max(stats['coordinate_ranges']['max_lat'], lat) stats['coordinate_ranges']['min_lon'] = min(stats['coordinate_ranges']['min_lon'], lon) stats['coordinate_ranges']['max_lon'] = max(stats['coordinate_ranges']['max_lon'], lon) # Check for GeoNames ID if loc.get('geonames_id'): stats['geonames_ids'] += 1 else: stats['by_country'][country]['invalid'] += 1 stats['invalid_coordinates'].append({ 'name': inst.get('name'), 'ghcid': inst.get('ghcid'), 'country': country, 'lat': lat, 'lon': lon, 'error': error_msg }) else: stats['by_country'][country]['failed'] += 1 stats['by_institution_type'][inst_type]['failed'] += 1 return stats def generate_geojson(institutions: List[Dict], output_file: Path): """Generate GeoJSON for mapping.""" features = [] for inst in institutions: locations = inst.get('locations', []) if not locations: continue for loc in locations: lat = loc.get('latitude') lon = loc.get('longitude') if lat is None or lon is None: continue is_valid, _ = validate_coordinates(lat, lon) if not is_valid: continue feature = { 'type': 'Feature', 'geometry': { 'type': 'Point', 'coordinates': [lon, lat] # GeoJSON uses [lon, lat] order }, 'properties': { 'name': inst.get('name', 'Unknown'), 'ghcid': inst.get('ghcid', 'Unknown'), 'institution_type': inst.get('institution_type', 'UNKNOWN'), 'country': loc.get('country', 'Unknown'), 'city': loc.get('city', 'Unknown'), 'homepage': inst.get('homepage', '') } } features.append(feature) geojson = { 'type': 'FeatureCollection', 'features': features } with open(output_file, 'w', encoding='utf-8') as f: json.dump(geojson, f, indent=2, ensure_ascii=False) return len(features) def generate_report(stats: Dict, output_file: Path): """Generate detailed validation report.""" total = stats['total_institutions'] geocoded = stats['with_valid_coordinates'] coverage = (geocoded / total * 100) if total > 0 else 0 report = f"""# Geocoding Validation Report Generated: {Path(__file__).name} ## Summary - **Total Institutions**: {total:,} - **With Location Data**: {stats['with_locations']:,} ({stats['with_locations']/total*100:.1f}%) - **Successfully Geocoded**: {geocoded:,} ({coverage:.1f}%) - **Failed Geocoding**: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%) - **Invalid Coordinates**: {len(stats['invalid_coordinates'])} - **With GeoNames IDs**: {stats['geonames_ids']:,} ({stats['geonames_ids']/geocoded*100:.1f}% of geocoded) ## Coordinate Ranges - **Latitude**: {stats['coordinate_ranges']['min_lat']:.4f} to {stats['coordinate_ranges']['max_lat']:.4f} - **Longitude**: {stats['coordinate_ranges']['min_lon']:.4f} to {stats['coordinate_ranges']['max_lon']:.4f} ## Coverage by Country | Country | Total | Geocoded | Failed | Invalid | Coverage | |---------|-------|----------|--------|---------|----------| """ # Sort by total descending for country, data in sorted(stats['by_country'].items(), key=lambda x: x[1]['total'], reverse=True): cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0 report += f"| {country:<7} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {data['invalid']:>7} | {cov:>6.1f}% |\n" report += "\n## Coverage by Institution Type\n\n" report += "| Type | Total | Geocoded | Failed | Coverage |\n" report += "|------|-------|----------|--------|---------|\n" for inst_type, data in sorted(stats['by_institution_type'].items(), key=lambda x: x[1]['total'], reverse=True): cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0 report += f"| {inst_type:<20} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {cov:>6.1f}% |\n" # Invalid coordinates if stats['invalid_coordinates']: report += f"\n## Invalid Coordinates ({len(stats['invalid_coordinates'])})\n\n" report += "| Institution | GHCID | Country | Lat | Lon | Error |\n" report += "|-------------|-------|---------|-----|-----|-------|\n" for inv in stats['invalid_coordinates'][:50]: # Limit to first 50 report += f"| {inv['name'][:30]} | {inv['ghcid']} | {inv['country']} | {inv['lat']} | {inv['lon']} | {inv['error']} |\n" if len(stats['invalid_coordinates']) > 50: report += f"\n*... and {len(stats['invalid_coordinates']) - 50} more*\n" # Success indicators report += "\n## Quality Indicators\n\n" if coverage >= 95: report += "✅ **Excellent Coverage** (≥95%)\n\n" elif coverage >= 90: report += "✅ **Good Coverage** (≥90%)\n\n" elif coverage >= 80: report += "⚠️ **Moderate Coverage** (≥80%)\n\n" else: report += "❌ **Low Coverage** (<80%)\n\n" if len(stats['invalid_coordinates']) == 0: report += "✅ **No Invalid Coordinates**\n\n" elif len(stats['invalid_coordinates']) < 10: report += f"⚠️ **Few Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n" else: report += f"❌ **Multiple Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n" geonames_coverage = (stats['geonames_ids'] / geocoded * 100) if geocoded > 0 else 0 if geonames_coverage >= 50: report += f"✅ **Good GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n" elif geonames_coverage >= 25: report += f"⚠️ **Moderate GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n" else: report += f"❌ **Low GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n" # Recommendations report += "## Recommendations\n\n" if coverage < 95: report += "1. **Improve Geocoding Coverage**:\n" report += " - Review failed geocoding queries\n" report += " - Try alternative query formats for low-coverage countries\n" report += " - Consider manual geocoding for high-value institutions\n\n" if len(stats['invalid_coordinates']) > 0: report += "2. **Fix Invalid Coordinates**:\n" report += " - Review institutions with (0,0) coordinates\n" report += " - Verify coordinates are in correct country\n" report += " - Check for coordinate swap (lat/lon reversed)\n\n" if geonames_coverage < 50: report += "3. **Enhance GeoNames IDs**:\n" report += " - GeoNames IDs enable better geographic linking\n" report += " - Consider querying GeoNames API directly\n" report += " - Use reverse geocoding to find GeoNames IDs\n\n" report += "---\n\n" report += f"**Generated**: {yaml.dump(datetime.now(timezone.utc).isoformat())}\n" report += f"**Source**: `data/instances/global/global_heritage_institutions.yaml`\n" with open(output_file, 'w', encoding='utf-8') as f: f.write(report) def main(): from datetime import datetime, timezone base_dir = Path(__file__).parent.parent global_file = base_dir / 'data' / 'instances' / 'global' / 'global_heritage_institutions.yaml' report_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_validation_report.md' geojson_file = base_dir / 'data' / 'instances' / 'global' / 'global_institutions.geojson' stats_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_statistics.yaml' print("=" * 80) print("GEOCODING VALIDATION") print("=" * 80) print() # Load dataset print(f"Loading dataset from {global_file}") with open(global_file, 'r', encoding='utf-8') as f: institutions = yaml.safe_load(f) print(f"Loaded {len(institutions):,} institutions") print() # Analyze coverage print("Analyzing geocoding coverage...") stats = analyze_geocoding_coverage(institutions) # Print summary total = stats['total_institutions'] geocoded = stats['with_valid_coordinates'] coverage = (geocoded / total * 100) if total > 0 else 0 print() print("=" * 80) print("SUMMARY") print("=" * 80) print(f"Total institutions: {total:,}") print(f"Successfully geocoded: {geocoded:,} ({coverage:.1f}%)") print(f"Failed geocoding: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%)") print(f"Invalid coordinates: {len(stats['invalid_coordinates'])}") print(f"With GeoNames IDs: {stats['geonames_ids']:,}") print("=" * 80) print() # Generate GeoJSON print(f"Generating GeoJSON for mapping...") feature_count = generate_geojson(institutions, geojson_file) print(f"✅ Wrote {feature_count:,} features to {geojson_file}") print() # Generate report print(f"Generating validation report...") generate_report(stats, report_file) print(f"✅ Wrote report to {report_file}") print() # Save statistics print(f"Saving statistics...") with open(stats_file, 'w', encoding='utf-8') as f: # Convert defaultdict to regular dict for YAML serialization stats_export = dict(stats) stats_export['by_country'] = {k: dict(v) for k, v in stats['by_country'].items()} stats_export['by_institution_type'] = {k: dict(v) for k, v in stats['by_institution_type'].items()} yaml.dump(stats_export, f, default_flow_style=False, allow_unicode=True) print(f"✅ Wrote statistics to {stats_file}") print() print("=" * 80) print("VALIDATION COMPLETE") print("=" * 80) print() print(f"Next steps:") print(f"1. Review validation report: {report_file}") print(f"2. Visualize on map: {geojson_file}") print(f"3. If coverage is good (≥95%), proceed with Wikidata enrichment") print(f"4. If coverage is low, review failed geocoding and retry") print() if __name__ == '__main__': main()