glam/src/glam_extractor/geocoding/city_code_disambiguation.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

337 lines
12 KiB
Python

"""
City code disambiguation for GHCID generation.
Resolves conflicts when multiple cities in the same country generate identical
3-letter codes. Uses progressive disambiguation:
1. Try 3-letter code (e.g., HAR)
2. If collision, extend to 4 letters (HARD, HARL, HARE)
3. If still collision, add numeric suffix (HARD1, HARD2)
Disambiguation table is built per-country and persisted for consistency.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from collections import defaultdict
import unicodedata
@dataclass
class CityCodeEntry:
"""Entry in the city code disambiguation table."""
city_name: str
ascii_name: str
geonames_id: int
country_code: str
code_3: str # Original 3-letter code
code_4: str # 4-letter extension
final_code: str # Disambiguated code (may be 3, 4, or 4+digit)
population: Optional[int] = None
class CityCodeDisambiguator:
"""
Builds disambiguation tables for city codes within countries.
Strategy:
1. Group cities by 3-letter code
2. For collision groups, try 4-letter codes
3. For remaining collisions, add numeric suffix by population rank
4. Store final mappings for consistent lookups
Example:
>>> disambiguator = CityCodeDisambiguator()
>>> disambiguator.add_city("Hardenberg", "Hardenberg", 123, "NL", 50000)
>>> disambiguator.add_city("Harlingen", "Harlingen", 456, "NL", 16000)
>>> disambiguator.add_city("Haren", "Haren", 789, "NL", 19000)
>>> disambiguator.build()
>>> disambiguator.get_code("Hardenberg", "NL")
'HARD'
>>> disambiguator.get_code("Harlingen", "NL")
'HARL'
>>> disambiguator.get_code("Haren", "NL")
'HARE'
"""
def __init__(self):
"""Initialize empty disambiguation tables."""
# Cities by country: country_code -> list of CityCodeEntry
self.cities_by_country: Dict[str, List[CityCodeEntry]] = defaultdict(list)
# Final lookup: (city_name, country_code) -> code
self.code_map: Dict[tuple, str] = {}
# Built flag
self.is_built = False
@staticmethod
def _get_3_letter_code(ascii_name: str) -> str:
"""
Get 3-letter code from ASCII city name.
Matches the logic in CityInfo.get_abbreviation() but simplified.
"""
# Remove accents
normalized = unicodedata.normalize('NFD', ascii_name)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Split into words
words = normalized.split()
# Handle multi-word cities with articles
if len(words) > 1 and words[0].lower() in ['la', 'el', 'los', 'las', 'le', 'the', 'o', 'a']:
# Article + main word: first letter of article + first 2 of next word
if len(words) > 1:
code = (words[0][0] + words[1][:2]).upper()
else:
cleaned = ''.join(c for c in normalized if c.isalnum())
code = cleaned[:3].upper()
else:
# Remove non-alphanumeric characters
cleaned = ''.join(c for c in normalized if c.isalnum())
code = cleaned[:3].upper()
return code
@staticmethod
def _get_4_letter_code(ascii_name: str) -> str:
"""
Get 4-letter code from ASCII city name.
Extends the 3-letter logic to 4 letters.
"""
# Remove accents
normalized = unicodedata.normalize('NFD', ascii_name)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Remove non-alphanumeric characters
cleaned = ''.join(c for c in normalized if c.isalnum())
code = cleaned[:4].upper()
return code
def add_city(
self,
city_name: str,
ascii_name: str,
geonames_id: int,
country_code: str,
population: Optional[int] = None
):
"""
Add a city to the disambiguation table.
Args:
city_name: Original city name
ascii_name: ASCII-normalized name
geonames_id: GeoNames ID
country_code: ISO 3166-1 alpha-2 country code
population: Population (used for ranking in tie-breaks)
"""
code_3 = self._get_3_letter_code(ascii_name)
code_4 = self._get_4_letter_code(ascii_name)
entry = CityCodeEntry(
city_name=city_name,
ascii_name=ascii_name,
geonames_id=geonames_id,
country_code=country_code.upper(),
code_3=code_3,
code_4=code_4,
final_code=code_3, # Default to 3-letter, will be updated in build()
population=population
)
self.cities_by_country[country_code.upper()].append(entry)
self.is_built = False
def build(self):
"""
Build disambiguation tables for all countries.
Resolves collisions by:
1. Grouping cities by 3-letter code
2. Extending to 4 letters for collision groups
3. Adding numeric suffix for remaining collisions (by population)
"""
for country_code, cities in self.cities_by_country.items():
# Group by 3-letter code
code_3_groups: Dict[str, List[CityCodeEntry]] = defaultdict(list)
for city in cities:
code_3_groups[city.code_3].append(city)
# Process each 3-letter group
for code_3, group in code_3_groups.items():
if len(group) == 1:
# No collision - use 3-letter code
city = group[0]
city.final_code = code_3
self.code_map[(city.city_name, country_code)] = code_3
else:
# Collision - try 4-letter codes
self._disambiguate_group(group, country_code)
self.is_built = True
def _disambiguate_group(self, group: List[CityCodeEntry], country_code: str):
"""
Disambiguate a collision group using 4-letter codes and numeric suffixes.
Args:
group: Cities with same 3-letter code
country_code: Country code
"""
# Try 4-letter codes first
code_4_groups: Dict[str, List[CityCodeEntry]] = defaultdict(list)
for city in group:
code_4_groups[city.code_4].append(city)
# Process 4-letter groups
for code_4, subgroup in code_4_groups.items():
if len(subgroup) == 1:
# 4-letter code is unique
city = subgroup[0]
city.final_code = code_4
self.code_map[(city.city_name, country_code)] = code_4
else:
# Still collision - add numeric suffix
# Sort by population (descending, None last)
sorted_cities = sorted(
subgroup,
key=lambda c: (c.population is None, -(c.population or 0))
)
for idx, city in enumerate(sorted_cities):
# Largest city gets base code, others get numeric suffix
if idx == 0:
city.final_code = code_4
else:
city.final_code = f"{code_4}{idx}"
self.code_map[(city.city_name, country_code)] = city.final_code
def get_code(self, city_name: str, country_code: str) -> Optional[str]:
"""
Get disambiguated code for a city.
Args:
city_name: City name
country_code: ISO 3166-1 alpha-2 country code
Returns:
Disambiguated code or None if city not in table
Raises:
ValueError: If build() hasn't been called yet
"""
if not self.is_built:
raise ValueError("Must call build() before get_code()")
return self.code_map.get((city_name, country_code.upper()))
def get_all_codes(self, country_code: str) -> Dict[str, str]:
"""
Get all city codes for a country.
Args:
country_code: ISO 3166-1 alpha-2 country code
Returns:
Dictionary mapping city_name -> code
Raises:
ValueError: If build() hasn't been called yet
"""
if not self.is_built:
raise ValueError("Must call build() before get_all_codes()")
return {
city_name: code
for (city_name, cc), code in self.code_map.items()
if cc == country_code.upper()
}
def get_collision_report(self, country_code: str) -> str:
"""
Generate a human-readable collision report for a country.
Args:
country_code: ISO 3166-1 alpha-2 country code
Returns:
Formatted report showing collision groups and resolutions
"""
if not self.is_built:
raise ValueError("Must call build() before get_collision_report()")
cities = self.cities_by_country.get(country_code.upper(), [])
# Group by original 3-letter code
code_3_groups: Dict[str, List[CityCodeEntry]] = defaultdict(list)
for city in cities:
code_3_groups[city.code_3].append(city)
# Filter to only collision groups
collision_groups = {
code_3: group
for code_3, group in code_3_groups.items()
if len(group) > 1
}
if not collision_groups:
return f"No collisions found in {country_code}"
report = []
report.append(f"City Code Collision Report - {country_code}")
report.append("=" * 60)
report.append(f"Total collision groups: {len(collision_groups)}")
report.append(f"Total cities affected: {sum(len(g) for g in collision_groups.values())}")
report.append("")
for code_3, group in sorted(collision_groups.items()):
report.append(f"3-letter code: {code_3} ({len(group)} cities)")
for city in sorted(group, key=lambda c: c.final_code):
pop_str = f"pop: {city.population:,}" if city.population else "pop: unknown"
report.append(f" {city.city_name:25s}{city.final_code:6s} ({pop_str})")
report.append("")
return "\n".join(report)
def get_statistics(self, country_code: str) -> dict:
"""
Get disambiguation statistics for a country.
Args:
country_code: ISO 3166-1 alpha-2 country code
Returns:
Dictionary with statistics
"""
if not self.is_built:
raise ValueError("Must call build() before get_statistics()")
cities = self.cities_by_country.get(country_code.upper(), [])
code_3_count = sum(1 for c in cities if len(c.final_code) == 3)
code_4_count = sum(1 for c in cities if len(c.final_code) == 4)
code_numeric_count = sum(1 for c in cities if len(c.final_code) > 4)
# Count collision groups
code_3_groups: Dict[str, List[CityCodeEntry]] = defaultdict(list)
for city in cities:
code_3_groups[city.code_3].append(city)
collision_groups = sum(1 for g in code_3_groups.values() if len(g) > 1)
cities_in_collisions = sum(len(g) for g in code_3_groups.values() if len(g) > 1)
return {
'country_code': country_code.upper(),
'total_cities': len(cities),
'code_3_final': code_3_count,
'code_4_final': code_4_count,
'code_numeric_final': code_numeric_count,
'collision_groups': collision_groups,
'cities_in_collisions': cities_in_collisions,
'collision_rate': cities_in_collisions / len(cities) if cities else 0.0
}