- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
365 lines
14 KiB
Python
365 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate Geocoding Results
|
|
|
|
Analyzes the geocoded global dataset and generates:
|
|
- Coverage statistics by country and institution type
|
|
- Quality checks (valid coordinates, outliers, etc.)
|
|
- Visualization data for mapping
|
|
- Detailed validation report
|
|
|
|
Usage:
|
|
python scripts/validate_geocoding_results.py
|
|
"""
|
|
|
|
import yaml
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
import json
|
|
|
|
|
|
def validate_coordinates(lat: float, lon: float) -> Tuple[bool, str]:
|
|
"""
|
|
Validate that coordinates are reasonable.
|
|
|
|
Returns:
|
|
(is_valid, error_message)
|
|
"""
|
|
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
|
|
return False, "Non-numeric coordinates"
|
|
|
|
if lat < -90 or lat > 90:
|
|
return False, f"Invalid latitude: {lat} (must be -90 to 90)"
|
|
|
|
if lon < -180 or lon > 180:
|
|
return False, f"Invalid longitude: {lon} (must be -180 to 180)"
|
|
|
|
# Check for common placeholder values
|
|
if lat == 0 and lon == 0:
|
|
return False, "Null Island (0,0) - likely geocoding failure"
|
|
|
|
return True, ""
|
|
|
|
|
|
def analyze_geocoding_coverage(institutions: List[Dict]) -> Dict:
|
|
"""Analyze geocoding coverage and quality."""
|
|
|
|
stats = {
|
|
'total_institutions': len(institutions),
|
|
'with_locations': 0,
|
|
'with_coordinates': 0,
|
|
'with_valid_coordinates': 0,
|
|
'invalid_coordinates': [],
|
|
'by_country': defaultdict(lambda: {
|
|
'total': 0,
|
|
'geocoded': 0,
|
|
'failed': 0,
|
|
'invalid': 0
|
|
}),
|
|
'by_institution_type': defaultdict(lambda: {
|
|
'total': 0,
|
|
'geocoded': 0,
|
|
'failed': 0
|
|
}),
|
|
'coordinate_ranges': {
|
|
'min_lat': 90,
|
|
'max_lat': -90,
|
|
'min_lon': 180,
|
|
'max_lon': -180
|
|
},
|
|
'geonames_ids': 0
|
|
}
|
|
|
|
for inst in institutions:
|
|
locations = inst.get('locations', [])
|
|
inst_type = inst.get('institution_type', 'UNKNOWN')
|
|
country = locations[0].get('country', 'Unknown') if locations else 'Unknown'
|
|
|
|
stats['by_country'][country]['total'] += 1
|
|
stats['by_institution_type'][inst_type]['total'] += 1
|
|
|
|
if locations:
|
|
stats['with_locations'] += 1
|
|
|
|
for loc in locations:
|
|
lat = loc.get('latitude')
|
|
lon = loc.get('longitude')
|
|
|
|
if lat is not None and lon is not None:
|
|
stats['with_coordinates'] += 1
|
|
stats['by_country'][country]['geocoded'] += 1
|
|
stats['by_institution_type'][inst_type]['geocoded'] += 1
|
|
|
|
# Validate coordinates
|
|
is_valid, error_msg = validate_coordinates(lat, lon)
|
|
if is_valid:
|
|
stats['with_valid_coordinates'] += 1
|
|
|
|
# Track coordinate ranges
|
|
stats['coordinate_ranges']['min_lat'] = min(stats['coordinate_ranges']['min_lat'], lat)
|
|
stats['coordinate_ranges']['max_lat'] = max(stats['coordinate_ranges']['max_lat'], lat)
|
|
stats['coordinate_ranges']['min_lon'] = min(stats['coordinate_ranges']['min_lon'], lon)
|
|
stats['coordinate_ranges']['max_lon'] = max(stats['coordinate_ranges']['max_lon'], lon)
|
|
|
|
# Check for GeoNames ID
|
|
if loc.get('geonames_id'):
|
|
stats['geonames_ids'] += 1
|
|
else:
|
|
stats['by_country'][country]['invalid'] += 1
|
|
stats['invalid_coordinates'].append({
|
|
'name': inst.get('name'),
|
|
'ghcid': inst.get('ghcid'),
|
|
'country': country,
|
|
'lat': lat,
|
|
'lon': lon,
|
|
'error': error_msg
|
|
})
|
|
else:
|
|
stats['by_country'][country]['failed'] += 1
|
|
stats['by_institution_type'][inst_type]['failed'] += 1
|
|
|
|
return stats
|
|
|
|
|
|
def generate_geojson(institutions: List[Dict], output_file: Path):
|
|
"""Generate GeoJSON for mapping."""
|
|
|
|
features = []
|
|
|
|
for inst in institutions:
|
|
locations = inst.get('locations', [])
|
|
if not locations:
|
|
continue
|
|
|
|
for loc in locations:
|
|
lat = loc.get('latitude')
|
|
lon = loc.get('longitude')
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
is_valid, _ = validate_coordinates(lat, lon)
|
|
if not is_valid:
|
|
continue
|
|
|
|
feature = {
|
|
'type': 'Feature',
|
|
'geometry': {
|
|
'type': 'Point',
|
|
'coordinates': [lon, lat] # GeoJSON uses [lon, lat] order
|
|
},
|
|
'properties': {
|
|
'name': inst.get('name', 'Unknown'),
|
|
'ghcid': inst.get('ghcid', 'Unknown'),
|
|
'institution_type': inst.get('institution_type', 'UNKNOWN'),
|
|
'country': loc.get('country', 'Unknown'),
|
|
'city': loc.get('city', 'Unknown'),
|
|
'homepage': inst.get('homepage', '')
|
|
}
|
|
}
|
|
features.append(feature)
|
|
|
|
geojson = {
|
|
'type': 'FeatureCollection',
|
|
'features': features
|
|
}
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(geojson, f, indent=2, ensure_ascii=False)
|
|
|
|
return len(features)
|
|
|
|
|
|
def generate_report(stats: Dict, output_file: Path):
|
|
"""Generate detailed validation report."""
|
|
|
|
total = stats['total_institutions']
|
|
geocoded = stats['with_valid_coordinates']
|
|
coverage = (geocoded / total * 100) if total > 0 else 0
|
|
|
|
report = f"""# Geocoding Validation Report
|
|
Generated: {Path(__file__).name}
|
|
|
|
## Summary
|
|
|
|
- **Total Institutions**: {total:,}
|
|
- **With Location Data**: {stats['with_locations']:,} ({stats['with_locations']/total*100:.1f}%)
|
|
- **Successfully Geocoded**: {geocoded:,} ({coverage:.1f}%)
|
|
- **Failed Geocoding**: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%)
|
|
- **Invalid Coordinates**: {len(stats['invalid_coordinates'])}
|
|
- **With GeoNames IDs**: {stats['geonames_ids']:,} ({stats['geonames_ids']/geocoded*100:.1f}% of geocoded)
|
|
|
|
## Coordinate Ranges
|
|
|
|
- **Latitude**: {stats['coordinate_ranges']['min_lat']:.4f} to {stats['coordinate_ranges']['max_lat']:.4f}
|
|
- **Longitude**: {stats['coordinate_ranges']['min_lon']:.4f} to {stats['coordinate_ranges']['max_lon']:.4f}
|
|
|
|
## Coverage by Country
|
|
|
|
| Country | Total | Geocoded | Failed | Invalid | Coverage |
|
|
|---------|-------|----------|--------|---------|----------|
|
|
"""
|
|
|
|
# Sort by total descending
|
|
for country, data in sorted(stats['by_country'].items(), key=lambda x: x[1]['total'], reverse=True):
|
|
cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0
|
|
report += f"| {country:<7} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {data['invalid']:>7} | {cov:>6.1f}% |\n"
|
|
|
|
report += "\n## Coverage by Institution Type\n\n"
|
|
report += "| Type | Total | Geocoded | Failed | Coverage |\n"
|
|
report += "|------|-------|----------|--------|---------|\n"
|
|
|
|
for inst_type, data in sorted(stats['by_institution_type'].items(), key=lambda x: x[1]['total'], reverse=True):
|
|
cov = (data['geocoded'] / data['total'] * 100) if data['total'] > 0 else 0
|
|
report += f"| {inst_type:<20} | {data['total']:>5} | {data['geocoded']:>8} | {data['failed']:>6} | {cov:>6.1f}% |\n"
|
|
|
|
# Invalid coordinates
|
|
if stats['invalid_coordinates']:
|
|
report += f"\n## Invalid Coordinates ({len(stats['invalid_coordinates'])})\n\n"
|
|
report += "| Institution | GHCID | Country | Lat | Lon | Error |\n"
|
|
report += "|-------------|-------|---------|-----|-----|-------|\n"
|
|
|
|
for inv in stats['invalid_coordinates'][:50]: # Limit to first 50
|
|
report += f"| {inv['name'][:30]} | {inv['ghcid']} | {inv['country']} | {inv['lat']} | {inv['lon']} | {inv['error']} |\n"
|
|
|
|
if len(stats['invalid_coordinates']) > 50:
|
|
report += f"\n*... and {len(stats['invalid_coordinates']) - 50} more*\n"
|
|
|
|
# Success indicators
|
|
report += "\n## Quality Indicators\n\n"
|
|
|
|
if coverage >= 95:
|
|
report += "✅ **Excellent Coverage** (≥95%)\n\n"
|
|
elif coverage >= 90:
|
|
report += "✅ **Good Coverage** (≥90%)\n\n"
|
|
elif coverage >= 80:
|
|
report += "⚠️ **Moderate Coverage** (≥80%)\n\n"
|
|
else:
|
|
report += "❌ **Low Coverage** (<80%)\n\n"
|
|
|
|
if len(stats['invalid_coordinates']) == 0:
|
|
report += "✅ **No Invalid Coordinates**\n\n"
|
|
elif len(stats['invalid_coordinates']) < 10:
|
|
report += f"⚠️ **Few Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n"
|
|
else:
|
|
report += f"❌ **Multiple Invalid Coordinates** ({len(stats['invalid_coordinates'])} found)\n\n"
|
|
|
|
geonames_coverage = (stats['geonames_ids'] / geocoded * 100) if geocoded > 0 else 0
|
|
if geonames_coverage >= 50:
|
|
report += f"✅ **Good GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
|
|
elif geonames_coverage >= 25:
|
|
report += f"⚠️ **Moderate GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
|
|
else:
|
|
report += f"❌ **Low GeoNames Coverage** ({geonames_coverage:.1f}%)\n\n"
|
|
|
|
# Recommendations
|
|
report += "## Recommendations\n\n"
|
|
|
|
if coverage < 95:
|
|
report += "1. **Improve Geocoding Coverage**:\n"
|
|
report += " - Review failed geocoding queries\n"
|
|
report += " - Try alternative query formats for low-coverage countries\n"
|
|
report += " - Consider manual geocoding for high-value institutions\n\n"
|
|
|
|
if len(stats['invalid_coordinates']) > 0:
|
|
report += "2. **Fix Invalid Coordinates**:\n"
|
|
report += " - Review institutions with (0,0) coordinates\n"
|
|
report += " - Verify coordinates are in correct country\n"
|
|
report += " - Check for coordinate swap (lat/lon reversed)\n\n"
|
|
|
|
if geonames_coverage < 50:
|
|
report += "3. **Enhance GeoNames IDs**:\n"
|
|
report += " - GeoNames IDs enable better geographic linking\n"
|
|
report += " - Consider querying GeoNames API directly\n"
|
|
report += " - Use reverse geocoding to find GeoNames IDs\n\n"
|
|
|
|
report += "---\n\n"
|
|
report += f"**Generated**: {yaml.dump(datetime.now(timezone.utc).isoformat())}\n"
|
|
report += f"**Source**: `data/instances/global/global_heritage_institutions.yaml`\n"
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
|
|
|
|
def main():
|
|
from datetime import datetime, timezone
|
|
|
|
base_dir = Path(__file__).parent.parent
|
|
global_file = base_dir / 'data' / 'instances' / 'global' / 'global_heritage_institutions.yaml'
|
|
report_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_validation_report.md'
|
|
geojson_file = base_dir / 'data' / 'instances' / 'global' / 'global_institutions.geojson'
|
|
stats_file = base_dir / 'data' / 'instances' / 'global' / 'geocoding_statistics.yaml'
|
|
|
|
print("=" * 80)
|
|
print("GEOCODING VALIDATION")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
# Load dataset
|
|
print(f"Loading dataset from {global_file}")
|
|
with open(global_file, 'r', encoding='utf-8') as f:
|
|
institutions = yaml.safe_load(f)
|
|
|
|
print(f"Loaded {len(institutions):,} institutions")
|
|
print()
|
|
|
|
# Analyze coverage
|
|
print("Analyzing geocoding coverage...")
|
|
stats = analyze_geocoding_coverage(institutions)
|
|
|
|
# Print summary
|
|
total = stats['total_institutions']
|
|
geocoded = stats['with_valid_coordinates']
|
|
coverage = (geocoded / total * 100) if total > 0 else 0
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print("SUMMARY")
|
|
print("=" * 80)
|
|
print(f"Total institutions: {total:,}")
|
|
print(f"Successfully geocoded: {geocoded:,} ({coverage:.1f}%)")
|
|
print(f"Failed geocoding: {total - geocoded:,} ({(total-geocoded)/total*100:.1f}%)")
|
|
print(f"Invalid coordinates: {len(stats['invalid_coordinates'])}")
|
|
print(f"With GeoNames IDs: {stats['geonames_ids']:,}")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
# Generate GeoJSON
|
|
print(f"Generating GeoJSON for mapping...")
|
|
feature_count = generate_geojson(institutions, geojson_file)
|
|
print(f"✅ Wrote {feature_count:,} features to {geojson_file}")
|
|
print()
|
|
|
|
# Generate report
|
|
print(f"Generating validation report...")
|
|
generate_report(stats, report_file)
|
|
print(f"✅ Wrote report to {report_file}")
|
|
print()
|
|
|
|
# Save statistics
|
|
print(f"Saving statistics...")
|
|
with open(stats_file, 'w', encoding='utf-8') as f:
|
|
# Convert defaultdict to regular dict for YAML serialization
|
|
stats_export = dict(stats)
|
|
stats_export['by_country'] = {k: dict(v) for k, v in stats['by_country'].items()}
|
|
stats_export['by_institution_type'] = {k: dict(v) for k, v in stats['by_institution_type'].items()}
|
|
yaml.dump(stats_export, f, default_flow_style=False, allow_unicode=True)
|
|
print(f"✅ Wrote statistics to {stats_file}")
|
|
print()
|
|
|
|
print("=" * 80)
|
|
print("VALIDATION COMPLETE")
|
|
print("=" * 80)
|
|
print()
|
|
print(f"Next steps:")
|
|
print(f"1. Review validation report: {report_file}")
|
|
print(f"2. Visualize on map: {geojson_file}")
|
|
print(f"3. If coverage is good (≥95%), proceed with Wikidata enrichment")
|
|
print(f"4. If coverage is low, review failed geocoding and retry")
|
|
print()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|