Backend: - Attach web_archives.duckdb as read-only database in DuckLake - Create views for web_archives, web_pages, web_claims in heritage schema Scripts: - enrich_cities_google.py: Add batch processing and retry logic - migrate_web_archives.py: Improve schema handling and error recovery Frontend: - DuckLakePanel: Add web archives query support - Database.css: Improve layout for query results display
595 lines
19 KiB
Python
Executable file
595 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Enrich custodian files with city/region data using Google Places API.
|
|
|
|
This is a generic script that works for any country's XXX files.
|
|
|
|
Usage:
|
|
python scripts/enrich_cities_google.py --country KR [--dry-run] [--limit N]
|
|
python scripts/enrich_cities_google.py --country AR [--dry-run] [--limit N]
|
|
python scripts/enrich_cities_google.py --all [--dry-run] [--limit N]
|
|
|
|
Environment Variables:
|
|
GOOGLE_PLACES_TOKEN - Required: Google Cloud API key with Places API enabled
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import sqlite3
|
|
import re
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import yaml
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
GOOGLE_PLACES_TOKEN = os.getenv("GOOGLE_PLACES_TOKEN", "")
|
|
GEONAMES_DB = Path("/Users/kempersc/apps/glam/data/reference/geonames.db")
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
|
|
# Google Places API
|
|
TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
|
|
REQUEST_DELAY = 0.3
|
|
|
|
# Country name mapping for search queries
|
|
COUNTRY_NAMES = {
|
|
'KR': 'South Korea',
|
|
'AR': 'Argentina',
|
|
'US': 'United States',
|
|
'IN': 'India',
|
|
'JM': 'Jamaica',
|
|
'UZ': 'Uzbekistan',
|
|
'UA': 'Ukraine',
|
|
'TJ': 'Tajikistan',
|
|
'OM': 'Oman',
|
|
'NL': 'Netherlands',
|
|
'NA': 'Namibia',
|
|
'ML': 'Mali',
|
|
'LK': 'Sri Lanka',
|
|
'LB': 'Lebanon',
|
|
'IT': 'Italy',
|
|
'IR': 'Iran',
|
|
'EC': 'Ecuador',
|
|
'DK': 'Denmark',
|
|
'CU': 'Cuba',
|
|
'CO': 'Colombia',
|
|
'BR': 'Brazil',
|
|
'MX': 'Mexico',
|
|
'JP': 'Japan',
|
|
'CZ': 'Czech Republic',
|
|
'DE': 'Germany',
|
|
'FR': 'France',
|
|
'GB': 'United Kingdom',
|
|
'EE': 'Estonia',
|
|
'PH': 'Philippines',
|
|
'CL': 'Chile',
|
|
'CH': 'Switzerland',
|
|
}
|
|
|
|
|
|
def get_city_code(city_name: str) -> str:
|
|
"""Generate 3-letter city code from city name."""
|
|
name = city_name.strip()
|
|
# Remove common suffixes
|
|
for suffix in [' City', ' Town', '-shi', '-ku', '-gun', '-cho', ' District']:
|
|
if name.endswith(suffix):
|
|
name = name[:-len(suffix)]
|
|
|
|
words = name.split()
|
|
|
|
if len(words) == 1:
|
|
return name[:3].upper()
|
|
elif len(words) == 2:
|
|
return (words[0][0] + words[1][:2]).upper()
|
|
else:
|
|
return ''.join(w[0] for w in words[:3]).upper()
|
|
|
|
|
|
def search_google_places(query: str, api_key: str) -> Optional[dict]:
|
|
"""Search Google Places API for a location."""
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"X-Goog-Api-Key": api_key,
|
|
"X-Goog-FieldMask": "places.displayName,places.formattedAddress,places.location,places.addressComponents,places.types,places.id,places.websiteUri"
|
|
}
|
|
|
|
payload = {
|
|
"textQuery": query,
|
|
"languageCode": "en"
|
|
}
|
|
|
|
try:
|
|
response = httpx.post(TEXT_SEARCH_URL, json=payload, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if "places" in data and len(data["places"]) > 0:
|
|
return data["places"][0]
|
|
return None
|
|
except Exception as e:
|
|
print(f" Error searching Google Places: {e}")
|
|
return None
|
|
|
|
|
|
def extract_location_from_google(place: dict) -> dict:
|
|
"""Extract location information from Google Places result."""
|
|
result = {
|
|
'city': None,
|
|
'region': None,
|
|
'latitude': None,
|
|
'longitude': None,
|
|
'formatted_address': None,
|
|
'place_id': None,
|
|
'website': None,
|
|
}
|
|
|
|
if not place:
|
|
return result
|
|
|
|
result['place_id'] = place.get('id')
|
|
result['formatted_address'] = place.get('formattedAddress')
|
|
result['website'] = place.get('websiteUri')
|
|
|
|
location = place.get('location', {})
|
|
result['latitude'] = location.get('latitude')
|
|
result['longitude'] = location.get('longitude')
|
|
|
|
components = place.get('addressComponents', [])
|
|
for comp in components:
|
|
types = comp.get('types', [])
|
|
long_name = comp.get('longText', '')
|
|
|
|
if 'locality' in types:
|
|
result['city'] = long_name
|
|
elif 'administrative_area_level_1' in types:
|
|
result['region'] = long_name
|
|
elif 'sublocality_level_1' in types and not result['city']:
|
|
result['city'] = long_name
|
|
|
|
return result
|
|
|
|
|
|
def lookup_city_geonames(conn: sqlite3.Connection, lat: float, lon: float, country_code: str) -> Optional[dict]:
|
|
"""Reverse geocode coordinates to find nearest city in GeoNames."""
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT name, ascii_name, admin1_code, admin1_name, geonames_id,
|
|
latitude, longitude, population, feature_code,
|
|
((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as dist_sq
|
|
FROM cities
|
|
WHERE country_code = ?
|
|
AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
|
|
ORDER BY dist_sq
|
|
LIMIT 1
|
|
""", (lat, lat, lon, lon, country_code))
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {
|
|
'name': row[0],
|
|
'ascii_name': row[1],
|
|
'admin1_code': row[2],
|
|
'admin1_name': row[3],
|
|
'geonames_id': row[4],
|
|
'latitude': row[5],
|
|
'longitude': row[6],
|
|
'population': row[7],
|
|
'feature_code': row[8],
|
|
}
|
|
return None
|
|
|
|
|
|
# Brazil: GeoNames admin1_code → ISO 3166-2:BR state code
|
|
BRAZIL_STATE_CODES = {
|
|
'01': 'AC', # Acre
|
|
'02': 'AL', # Alagoas
|
|
'03': 'AP', # Amapá
|
|
'04': 'AM', # Amazonas
|
|
'05': 'BA', # Bahia
|
|
'06': 'CE', # Ceará
|
|
'07': 'DF', # Federal District (Distrito Federal)
|
|
'08': 'ES', # Espírito Santo
|
|
'11': 'MS', # Mato Grosso do Sul
|
|
'13': 'MA', # Maranhão
|
|
'14': 'MT', # Mato Grosso
|
|
'15': 'MG', # Minas Gerais
|
|
'16': 'PA', # Pará
|
|
'17': 'PB', # Paraíba
|
|
'18': 'PR', # Paraná
|
|
'20': 'PI', # Piauí
|
|
'21': 'RJ', # Rio de Janeiro
|
|
'22': 'RN', # Rio Grande do Norte
|
|
'23': 'RS', # Rio Grande do Sul
|
|
'24': 'RO', # Rondônia
|
|
'25': 'RR', # Roraima
|
|
'26': 'SC', # Santa Catarina
|
|
'27': 'SP', # São Paulo
|
|
'28': 'SE', # Sergipe
|
|
'29': 'GO', # Goiás
|
|
'30': 'PE', # Pernambuco
|
|
'31': 'TO', # Tocantins
|
|
}
|
|
|
|
# Switzerland: GeoNames admin1_code → ISO 3166-2:CH canton code
|
|
SWITZERLAND_CANTON_CODES = {
|
|
'AG': 'AG', # Aargau
|
|
'AI': 'AI', # Appenzell Innerrhoden
|
|
'AR': 'AR', # Appenzell Ausserrhoden
|
|
'BE': 'BE', # Bern
|
|
'BL': 'BL', # Basel-Landschaft
|
|
'BS': 'BS', # Basel-Stadt
|
|
'FR': 'FR', # Fribourg
|
|
'GE': 'GE', # Geneva
|
|
'GL': 'GL', # Glarus
|
|
'GR': 'GR', # Graubünden
|
|
'JU': 'JU', # Jura
|
|
'LU': 'LU', # Lucerne
|
|
'NE': 'NE', # Neuchâtel
|
|
'NW': 'NW', # Nidwalden
|
|
'OW': 'OW', # Obwalden
|
|
'SG': 'SG', # St. Gallen
|
|
'SH': 'SH', # Schaffhausen
|
|
'SO': 'SO', # Solothurn
|
|
'SZ': 'SZ', # Schwyz
|
|
'TG': 'TG', # Thurgau
|
|
'TI': 'TI', # Ticino
|
|
'UR': 'UR', # Uri
|
|
'VD': 'VD', # Vaud
|
|
'VS': 'VS', # Valais
|
|
'ZG': 'ZG', # Zug
|
|
'ZH': 'ZH', # Zürich
|
|
}
|
|
|
|
# Mexico: GeoNames admin1_code → ISO 3166-2:MX state code
|
|
MEXICO_STATE_CODES = {
|
|
'01': 'AGU', # Aguascalientes
|
|
'02': 'BCN', # Baja California
|
|
'03': 'BCS', # Baja California Sur
|
|
'04': 'CAM', # Campeche
|
|
'05': 'COA', # Coahuila
|
|
'06': 'COL', # Colima
|
|
'07': 'CHP', # Chiapas
|
|
'08': 'CHH', # Chihuahua
|
|
'09': 'CMX', # Ciudad de México (CDMX)
|
|
'10': 'DUR', # Durango
|
|
'11': 'GUA', # Guanajuato
|
|
'12': 'GRO', # Guerrero
|
|
'13': 'HID', # Hidalgo
|
|
'14': 'JAL', # Jalisco
|
|
'15': 'MEX', # México (State of Mexico)
|
|
'16': 'MIC', # Michoacán
|
|
'17': 'MOR', # Morelos
|
|
'18': 'NAY', # Nayarit
|
|
'19': 'NLE', # Nuevo León
|
|
'20': 'OAX', # Oaxaca
|
|
'21': 'PUE', # Puebla
|
|
'22': 'QUE', # Querétaro
|
|
'23': 'ROO', # Quintana Roo
|
|
'24': 'SLP', # San Luis Potosí
|
|
'25': 'SIN', # Sinaloa
|
|
'26': 'SON', # Sonora
|
|
'27': 'TAB', # Tabasco
|
|
'28': 'TAM', # Tamaulipas
|
|
'29': 'TLA', # Tlaxcala
|
|
'30': 'VER', # Veracruz
|
|
'31': 'YUC', # Yucatán
|
|
'32': 'ZAC', # Zacatecas
|
|
}
|
|
|
|
# Chile: GeoNames admin1_code → ISO 3166-2:CL region code
|
|
CHILE_REGION_CODES = {
|
|
'01': 'TA', # Tarapacá
|
|
'02': 'AN', # Antofagasta
|
|
'03': 'AT', # Atacama
|
|
'04': 'CO', # Coquimbo
|
|
'05': 'VS', # Valparaíso
|
|
'06': 'LI', # Libertador General Bernardo O'Higgins
|
|
'07': 'ML', # Maule
|
|
'08': 'BI', # Biobío
|
|
'09': 'AR', # La Araucanía
|
|
'10': 'LL', # Los Lagos
|
|
'11': 'AI', # Aisén del General Carlos Ibáñez del Campo
|
|
'12': 'MA', # Magallanes y de la Antártica Chilena
|
|
'13': 'RM', # Región Metropolitana de Santiago
|
|
'14': 'LR', # Los Ríos
|
|
'15': 'AP', # Arica y Parinacota
|
|
'16': 'NB', # Ñuble
|
|
}
|
|
|
|
|
|
def get_region_code(admin1_code: str, country_code: str, admin1_name: str) -> str:
|
|
"""Get ISO-style region code from GeoNames admin1_code."""
|
|
if not admin1_code:
|
|
return 'XX'
|
|
|
|
# Country-specific mappings
|
|
if country_code == 'BR' and admin1_code in BRAZIL_STATE_CODES:
|
|
return BRAZIL_STATE_CODES[admin1_code]
|
|
|
|
if country_code == 'CH' and admin1_code in SWITZERLAND_CANTON_CODES:
|
|
return SWITZERLAND_CANTON_CODES[admin1_code]
|
|
|
|
if country_code == 'MX' and admin1_code in MEXICO_STATE_CODES:
|
|
return MEXICO_STATE_CODES[admin1_code]
|
|
|
|
if country_code == 'CL' and admin1_code in CHILE_REGION_CODES:
|
|
return CHILE_REGION_CODES[admin1_code]
|
|
|
|
# For most countries, use first 2-3 characters of admin1_code or name
|
|
if len(admin1_code) <= 3:
|
|
return admin1_code.upper()
|
|
|
|
# Use abbreviation from name
|
|
if admin1_name:
|
|
words = admin1_name.split()
|
|
if len(words) == 1:
|
|
return admin1_name[:2].upper()
|
|
else:
|
|
return ''.join(w[0] for w in words[:2]).upper()
|
|
|
|
return admin1_code[:2].upper()
|
|
|
|
|
|
def process_file(filepath: Path, conn: sqlite3.Connection, api_key: str,
|
|
country_code: str, country_name: str, dry_run: bool = False) -> dict:
|
|
"""Process a single custodian file."""
|
|
result = {
|
|
'file': str(filepath),
|
|
'status': 'skipped',
|
|
'old_ghcid': None,
|
|
'new_ghcid': None,
|
|
'city': None,
|
|
'region': None,
|
|
'error': None,
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
result['status'] = 'error'
|
|
result['error'] = f'Failed to load YAML: {e}'
|
|
return result
|
|
|
|
if not data:
|
|
result['status'] = 'error'
|
|
result['error'] = 'Empty YAML file'
|
|
return result
|
|
|
|
ghcid_data = data.get('ghcid', {})
|
|
old_ghcid = ghcid_data.get('ghcid_current', '')
|
|
result['old_ghcid'] = old_ghcid
|
|
|
|
# Match patterns with XXX city code:
|
|
# - {country}-XX-XXX-... (2-letter region like XX, BE, GE)
|
|
# - {country}-10-XXX-... (2-digit region like 10, 52, 37)
|
|
# - {country}-UKM-XXX-... (3-letter region like UKM, IDF, CMX)
|
|
xxx_pattern = re.compile(rf'^{country_code}-[A-Z0-9]{{2,3}}-XXX-')
|
|
if not xxx_pattern.match(old_ghcid):
|
|
result['status'] = 'skipped'
|
|
result['error'] = f'Not a {country_code}-*-XXX file'
|
|
return result
|
|
|
|
# Get institution name
|
|
name = data.get('custodian_name', {}).get('claim_value', '')
|
|
if not name:
|
|
name = data.get('original_entry', {}).get('name', '')
|
|
|
|
if not name:
|
|
result['status'] = 'error'
|
|
result['error'] = 'No institution name found'
|
|
return result
|
|
|
|
# Search Google Places
|
|
search_query = f"{name} {country_name}"
|
|
print(f" Searching: {name[:50]}...")
|
|
place = search_google_places(search_query, api_key)
|
|
time.sleep(REQUEST_DELAY)
|
|
|
|
if not place:
|
|
result['status'] = 'error'
|
|
result['error'] = 'Not found in Google Places'
|
|
return result
|
|
|
|
location_info = extract_location_from_google(place)
|
|
|
|
if not location_info['latitude'] or not location_info['longitude']:
|
|
result['status'] = 'error'
|
|
result['error'] = 'No coordinates from Google'
|
|
return result
|
|
|
|
# Lookup in GeoNames
|
|
city_info = lookup_city_geonames(conn, location_info['latitude'],
|
|
location_info['longitude'], country_code)
|
|
|
|
if not city_info:
|
|
result['status'] = 'error'
|
|
result['error'] = 'City not found in GeoNames'
|
|
return result
|
|
|
|
region_code = get_region_code(city_info['admin1_code'], country_code, city_info['admin1_name'])
|
|
city_code = get_city_code(city_info['ascii_name'])
|
|
|
|
result['city'] = city_info['ascii_name']
|
|
result['region'] = city_info['admin1_name']
|
|
|
|
# Build new GHCID
|
|
parts = old_ghcid.split('-')
|
|
if len(parts) >= 5:
|
|
inst_type = parts[3]
|
|
abbreviation = '-'.join(parts[4:])
|
|
else:
|
|
result['status'] = 'error'
|
|
result['error'] = f'Invalid GHCID format: {old_ghcid}'
|
|
return result
|
|
|
|
new_ghcid = f'{country_code}-{region_code}-{city_code}-{inst_type}-{abbreviation}'
|
|
result['new_ghcid'] = new_ghcid
|
|
|
|
if dry_run:
|
|
result['status'] = 'would_update'
|
|
return result
|
|
|
|
# Update the data
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
data['ghcid']['ghcid_current'] = new_ghcid
|
|
data['ghcid']['location_resolution'] = {
|
|
'method': 'GOOGLE_PLACES_GEONAMES',
|
|
'country_code': country_code,
|
|
'region_code': region_code,
|
|
'region_name': city_info['admin1_name'],
|
|
'city_code': city_code,
|
|
'city_name': city_info['ascii_name'],
|
|
'geonames_id': city_info['geonames_id'],
|
|
'feature_code': city_info['feature_code'],
|
|
'google_place_id': location_info.get('place_id'),
|
|
'latitude': location_info['latitude'],
|
|
'longitude': location_info['longitude'],
|
|
'resolution_date': timestamp,
|
|
}
|
|
|
|
data['google_maps_enrichment'] = {
|
|
'place_id': location_info.get('place_id'),
|
|
'formatted_address': location_info.get('formatted_address'),
|
|
'website': location_info.get('website'),
|
|
'latitude': location_info['latitude'],
|
|
'longitude': location_info['longitude'],
|
|
'enriched_at': timestamp,
|
|
'source': 'Google Places API (New)',
|
|
}
|
|
|
|
# Update GHCID history
|
|
if 'ghcid_history' not in data['ghcid']:
|
|
data['ghcid']['ghcid_history'] = []
|
|
|
|
for entry in data['ghcid']['ghcid_history']:
|
|
if entry.get('ghcid') == old_ghcid and not entry.get('valid_to'):
|
|
entry['valid_to'] = timestamp
|
|
|
|
data['ghcid']['ghcid_history'].append({
|
|
'ghcid': new_ghcid,
|
|
'ghcid_numeric': data['ghcid'].get('ghcid_numeric'),
|
|
'valid_from': timestamp,
|
|
'reason': f'Location resolved via Google Places + GeoNames: {city_info["ascii_name"]} ({region_code})',
|
|
})
|
|
|
|
if 'identifiers' in data:
|
|
for identifier in data['identifiers']:
|
|
if identifier.get('identifier_scheme') == 'GHCID':
|
|
identifier['identifier_value'] = new_ghcid
|
|
|
|
# Write and rename
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
new_filename = f'{new_ghcid}.yaml'
|
|
new_filepath = filepath.parent / new_filename
|
|
|
|
if filepath != new_filepath and not new_filepath.exists():
|
|
filepath.rename(new_filepath)
|
|
result['new_file'] = str(new_filepath)
|
|
elif new_filepath.exists() and filepath != new_filepath:
|
|
result['status'] = 'collision'
|
|
result['error'] = f'Target file exists: {new_filepath.name}'
|
|
return result
|
|
|
|
result['status'] = 'updated'
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Enrich custodian files with Google Places data')
|
|
parser.add_argument('--country', type=str, help='Country code (e.g., KR, AR, US)')
|
|
parser.add_argument('--all', action='store_true', help='Process all countries with XXX files')
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
|
|
parser.add_argument('--limit', type=int, help='Limit number of files per country')
|
|
args = parser.parse_args()
|
|
|
|
if not GOOGLE_PLACES_TOKEN:
|
|
print("ERROR: GOOGLE_PLACES_TOKEN environment variable is required")
|
|
sys.exit(1)
|
|
|
|
if not GEONAMES_DB.exists():
|
|
print(f"ERROR: GeoNames database not found: {GEONAMES_DB}")
|
|
sys.exit(1)
|
|
|
|
# Determine which countries to process
|
|
if args.all:
|
|
# Find all countries with XXX files (either XX-XXX or {region}-XXX)
|
|
countries = set()
|
|
for f in CUSTODIAN_DIR.glob('*-*-XXX-*.yaml'):
|
|
cc = f.name[:2]
|
|
if cc in COUNTRY_NAMES:
|
|
countries.add(cc)
|
|
countries = sorted(countries)
|
|
elif args.country:
|
|
countries = [args.country.upper()]
|
|
else:
|
|
print("ERROR: Specify --country CODE or --all")
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(str(GEONAMES_DB))
|
|
|
|
total_stats = {'updated': 0, 'error': 0, 'skipped': 0, 'would_update': 0, 'collision': 0}
|
|
|
|
for country_code in countries:
|
|
country_name = COUNTRY_NAMES.get(country_code, country_code)
|
|
|
|
files = sorted(CUSTODIAN_DIR.glob(f'{country_code}-*-XXX-*.yaml'))
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
|
|
if not files:
|
|
continue
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Processing {country_code} ({country_name}): {len(files)} files")
|
|
print('='*60)
|
|
|
|
stats = {'updated': 0, 'error': 0, 'skipped': 0, 'would_update': 0, 'collision': 0}
|
|
|
|
for filepath in files:
|
|
print(f"Processing: {filepath.name}")
|
|
result = process_file(filepath, conn, GOOGLE_PLACES_TOKEN,
|
|
country_code, country_name, dry_run=args.dry_run)
|
|
stats[result['status']] = stats.get(result['status'], 0) + 1
|
|
|
|
if result['status'] in ('updated', 'would_update'):
|
|
print(f" ✓ {result['city']} ({result['region']}): {result['old_ghcid']} → {result['new_ghcid']}")
|
|
elif result['status'] == 'error':
|
|
print(f" ✗ {result['error']}")
|
|
elif result['status'] == 'collision':
|
|
print(f" ⚠ {result['error']}")
|
|
|
|
print(f"\n{country_code} Summary: Updated={stats.get('updated', 0)}, "
|
|
f"Would update={stats.get('would_update', 0)}, "
|
|
f"Errors={stats.get('error', 0)}")
|
|
|
|
for k, v in stats.items():
|
|
total_stats[k] = total_stats.get(k, 0) + v
|
|
|
|
conn.close()
|
|
|
|
print()
|
|
print('='*60)
|
|
print('TOTAL Summary:')
|
|
print(f" Updated: {total_stats.get('updated', 0)}")
|
|
print(f" Would update: {total_stats.get('would_update', 0)}")
|
|
print(f" Errors: {total_stats.get('error', 0)}")
|
|
print(f" Collisions: {total_stats.get('collision', 0)}")
|
|
print(f" Skipped: {total_stats.get('skipped', 0)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|