glam/scripts/discover_custodian_websites.py
2025-12-26 14:30:31 +01:00

561 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Discover website URLs for custodian YAML files that are missing them.
This script uses web search (via DuckDuckGo or Google) to find official websites
for heritage institutions based on their name and location.
Search strategy:
1. Search for institution name + city + country
2. Search for institution name + "official website"
3. Search for institution name + institution type (museum, library, archive)
Output:
- Updates custodian YAML files with discovered website URLs
- Stores provenance for discovered URLs
Usage:
python scripts/discover_custodian_websites.py [options]
Options:
--dry-run Show what would be discovered without modifying files
--limit N Process only first N files (for testing)
--file PATH Process a single specific file
--country CODE Filter by country code (e.g., JP, CZ)
--resume Resume from last checkpoint
Requirements:
pip install duckduckgo-search pyyaml httpx
"""
import argparse
import asyncio
import json
import logging
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import yaml
try:
from duckduckgo_search import DDGS
except ImportError:
print("Please install duckduckgo-search: pip install duckduckgo-search")
sys.exit(1)
try:
import httpx
except ImportError:
print("Please install httpx: pip install httpx")
sys.exit(1)
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
CHECKPOINT_FILE = CUSTODIAN_DIR / ".website_discovery_checkpoint.json"
REQUEST_DELAY = 3.0 # seconds between searches (be nice to search engines)
# Domain blacklist (not actual institution websites)
DOMAIN_BLACKLIST = {
'wikipedia.org', 'wikidata.org', 'wikimedia.org',
'facebook.com', 'twitter.com', 'instagram.com', 'linkedin.com',
'youtube.com', 'tiktok.com', 'pinterest.com',
'tripadvisor.com', 'tripadvisor.jp', 'yelp.com',
'google.com', 'google.co.jp', 'maps.google.com',
'amazon.com', 'amazon.co.jp', 'ebay.com',
'booking.com', 'expedia.com', 'hotels.com',
'foursquare.com', 'bing.com', 'yahoo.com',
'findagrave.com', 'ancestry.com', 'familysearch.org',
'academia.edu', 'researchgate.net',
'timeanddate.com', 'weather.com',
}
# Domain preferences (prefer these TLDs for official sites)
PREFERRED_TLDS = {
'JP': ['.go.jp', '.lg.jp', '.ac.jp', '.or.jp', '.jp'],
'CZ': ['.cz', '.gov.cz'],
'NL': ['.nl', '.gov.nl'],
'BE': ['.be', '.gov.be'],
'DE': ['.de', '.gov.de'],
'AT': ['.at', '.gv.at'],
'CH': ['.ch', '.admin.ch'],
}
def get_custodian_name(entry: dict) -> str | None:
"""Extract institution name from entry."""
# Priority 1: Emic name (native language official name)
if entry.get('custodian_name', {}).get('emic_name'):
return entry['custodian_name']['emic_name']
# Priority 2: Wikidata native language label (ja, zh, ko, etc.)
wikidata = entry.get('wikidata_enrichment', {})
country = get_country_from_entry(entry)
# Map country to preferred label language
country_lang_map = {
'JP': 'ja',
'CN': 'zh',
'KR': 'ko',
'TW': 'zh',
'TH': 'th',
'VN': 'vi',
'RU': 'ru',
'GR': 'el',
'IL': 'he',
'SA': 'ar',
'IR': 'fa',
}
if country in country_lang_map:
lang = country_lang_map[country]
native_label = wikidata.get(f'wikidata_label_{lang}') or wikidata.get('wikidata_labels', {}).get(lang)
if native_label:
return native_label
# Priority 3: Claim value
if entry.get('custodian_name', {}).get('claim_value'):
return entry['custodian_name']['claim_value']
# Priority 4: Original entry name
if entry.get('original_entry', {}).get('name'):
return entry['original_entry']['name']
# Priority 5: Organisatie (Dutch)
if entry.get('original_entry', {}).get('organisatie'):
return entry['original_entry']['organisatie']
return None
def get_country_from_entry(entry: dict) -> str | None:
"""Extract country code from entry."""
# Check location.country
if entry.get('location', {}).get('country'):
return entry['location']['country']
# Check original_entry.locations
if entry.get('original_entry', {}).get('locations'):
loc = entry['original_entry']['locations'][0]
if loc.get('country'):
return loc['country']
return None
def get_location_info(entry: dict) -> dict:
"""Extract location information from entry."""
location = {}
# Check original_entry.locations
if entry.get('original_entry', {}).get('locations'):
loc = entry['original_entry']['locations'][0]
location['city'] = loc.get('city')
location['region'] = loc.get('region')
location['country'] = loc.get('country')
location['street_address'] = loc.get('street_address')
# Check original_entry directly
if not location.get('city'):
orig = entry.get('original_entry', {})
location['city'] = orig.get('city') or orig.get('plaats')
location['country'] = orig.get('country')
return location
def get_institution_type(entry: dict) -> str | None:
"""Get institution type for search refinement."""
inst_type = entry.get('original_entry', {}).get('institution_type')
if inst_type:
type_map = {
'LIBRARY': 'library',
'MUSEUM': 'museum',
'ARCHIVE': 'archive',
'GALLERY': 'gallery',
'RESEARCH_CENTER': 'research center',
'EDUCATION_PROVIDER': 'university',
}
return type_map.get(inst_type)
return None
def has_website(entry: dict) -> bool:
"""Check if entry already has a website."""
# Check various website fields
if entry.get('original_entry', {}).get('webadres_organisatie'):
return True
# Check identifiers
for ident in entry.get('original_entry', {}).get('identifiers', []):
if ident.get('identifier_scheme') == 'Website':
return True
# Check enrichment fields
if entry.get('website_discovery', {}).get('website_url'):
return True
if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'):
return True
if entry.get('google_maps_enrichment', {}).get('website'):
return True
return False
def is_valid_website(url: str, country: str | None = None) -> bool:
"""Check if URL is a valid institutional website."""
if not url:
return False
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www prefix
if domain.startswith('www.'):
domain = domain[4:]
# Check blacklist
for blacklisted in DOMAIN_BLACKLIST:
if blacklisted in domain:
return False
return True
except Exception:
return False
def score_website(url: str, country: str, name: str) -> int:
"""Score a website URL based on likelihood of being official site."""
score = 0
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Prefer country-specific TLDs
preferred = PREFERRED_TLDS.get(country, [])
for i, tld in enumerate(preferred):
if domain.endswith(tld):
score += (len(preferred) - i) * 10
break
# Prefer HTTPS
if parsed.scheme == 'https':
score += 5
# Prefer shorter paths (homepage vs deep link)
path_depth = len([p for p in parsed.path.split('/') if p])
score -= path_depth * 2
# Check if institution name words appear in domain
name_words = set(re.findall(r'\w+', name.lower()))
domain_words = set(re.findall(r'\w+', domain))
common_words = name_words & domain_words
score += len(common_words) * 5
except Exception:
pass
return score
def search_for_website(name: str, location: dict, inst_type: str | None = None) -> list[dict]:
"""Search for institution website using DuckDuckGo."""
results = []
# Build search queries
queries = []
city = location.get('city', '')
country = location.get('country', '')
# Primary query: name + city
if city:
queries.append(f'"{name}" {city}')
# Secondary query: name + country + institution type
if inst_type:
queries.append(f'"{name}" {country} {inst_type} official')
# Tertiary: just the name with "official website"
queries.append(f'"{name}" official website')
ddgs = DDGS()
for query in queries[:2]: # Limit to 2 queries per institution
try:
search_results = list(ddgs.text(query, max_results=5))
for r in search_results:
url = r.get('href') or r.get('url')
if url and is_valid_website(url, country):
results.append({
'url': url,
'title': r.get('title', ''),
'snippet': r.get('body', ''),
'query': query,
'score': score_website(url, country, name)
})
time.sleep(1) # Rate limit between queries
except Exception as e:
logger.warning(f"Search error for '{query}': {e}")
time.sleep(2)
# Sort by score and deduplicate
seen_domains = set()
unique_results = []
for r in sorted(results, key=lambda x: -x['score']):
domain = urlparse(r['url']).netloc.lower()
if domain not in seen_domains:
seen_domains.add(domain)
unique_results.append(r)
return unique_results[:3] # Return top 3 unique results
async def verify_website(url: str) -> dict:
"""Verify that a website is accessible and get basic info."""
result = {
'accessible': False,
'final_url': url,
'status_code': None,
'title': None,
}
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client:
response = await client.get(url)
result['accessible'] = response.status_code == 200
result['status_code'] = response.status_code
result['final_url'] = str(response.url)
# Extract title
if result['accessible']:
match = re.search(r'<title[^>]*>([^<]+)</title>', response.text, re.I)
if match:
result['title'] = match.group(1).strip()
except Exception as e:
logger.debug(f"Failed to verify {url}: {e}")
return result
def load_checkpoint() -> dict:
"""Load progress checkpoint."""
if CHECKPOINT_FILE.exists():
with open(CHECKPOINT_FILE, 'r') as f:
return json.load(f)
return {'processed_files': [], 'found_count': 0, 'not_found_count': 0}
def save_checkpoint(checkpoint: dict):
"""Save progress checkpoint."""
with open(CHECKPOINT_FILE, 'w') as f:
json.dump(checkpoint, f, indent=2)
def update_custodian_file(filepath: Path, website_url: str, discovery_info: dict) -> bool:
"""Update custodian YAML file with discovered website."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return False
# Add website discovery section
entry['website_discovery'] = {
'website_url': website_url,
'discovery_date': datetime.now(timezone.utc).isoformat(),
'discovery_method': 'duckduckgo_search',
'search_query': discovery_info.get('query', ''),
'confidence_score': min(discovery_info.get('score', 0) / 50, 1.0), # Normalize to 0-1
'verification': {
'accessible': discovery_info.get('verification', {}).get('accessible', False),
'page_title': discovery_info.get('verification', {}).get('title'),
'final_url': discovery_info.get('verification', {}).get('final_url'),
}
}
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
logger.error(f"Failed to update {filepath}: {e}")
return False
async def process_file(filepath: Path, dry_run: bool = False) -> dict:
"""Process a single custodian file."""
result = {
'filename': filepath.name,
'status': 'skipped',
'website': None,
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
result['status'] = 'empty'
return result
# Skip if already has website
if has_website(entry):
result['status'] = 'has_website'
return result
# Get institution info
name = get_custodian_name(entry)
if not name:
result['status'] = 'no_name'
return result
location = get_location_info(entry)
inst_type = get_institution_type(entry)
country = location.get('country', filepath.name[:2])
logger.info(f"Searching for: {name} ({location.get('city', 'unknown city')}, {country})")
# Search for website
search_results = search_for_website(name, location, inst_type)
if not search_results:
result['status'] = 'not_found'
return result
# Verify top result
best = search_results[0]
verification = await verify_website(best['url'])
best['verification'] = verification
if verification['accessible']:
result['website'] = verification['final_url']
result['status'] = 'found'
result['discovery_info'] = best
if not dry_run:
update_custodian_file(filepath, verification['final_url'], best)
logger.info(f" → Found: {verification['final_url']}")
else:
# Try second result if first is inaccessible
if len(search_results) > 1:
second = search_results[1]
verification2 = await verify_website(second['url'])
if verification2['accessible']:
second['verification'] = verification2
result['website'] = verification2['final_url']
result['status'] = 'found'
result['discovery_info'] = second
if not dry_run:
update_custodian_file(filepath, verification2['final_url'], second)
logger.info(f" → Found (2nd): {verification2['final_url']}")
else:
result['status'] = 'inaccessible'
else:
result['status'] = 'inaccessible'
except Exception as e:
result['status'] = 'error'
result['error'] = str(e)
logger.error(f"Error processing {filepath}: {e}")
return result
async def main():
parser = argparse.ArgumentParser(description='Discover websites for custodian files')
parser.add_argument('--dry-run', action='store_true', help='Show what would be discovered')
parser.add_argument('--limit', type=int, help='Process only first N files')
parser.add_argument('--file', type=str, help='Process a single specific file')
parser.add_argument('--country', type=str, help='Filter by country code (e.g., JP, CZ)')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
args = parser.parse_args()
# Get files to process
if args.file:
files = [Path(args.file)]
else:
pattern = f"{args.country}-*.yaml" if args.country else "*.yaml"
files = sorted(CUSTODIAN_DIR.glob(pattern))
# Filter out non-custodian files
files = [f for f in files if f.name[0].isupper() and '-' in f.name]
# Load checkpoint
checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'found_count': 0, 'not_found_count': 0}
processed_set = set(checkpoint['processed_files'])
if args.resume:
files = [f for f in files if f.name not in processed_set]
logger.info(f"Resuming: {len(processed_set)} files already processed, {len(files)} remaining")
# Apply limit
if args.limit:
files = files[:args.limit]
logger.info(f"Processing {len(files)} custodian files...")
# Process files
found_count = checkpoint.get('found_count', 0)
not_found_count = checkpoint.get('not_found_count', 0)
for i, filepath in enumerate(files):
result = await process_file(filepath, args.dry_run)
# Update counts
if result['status'] == 'found':
found_count += 1
elif result['status'] in ('not_found', 'inaccessible'):
not_found_count += 1
# Update checkpoint
if not args.dry_run:
checkpoint['processed_files'].append(filepath.name)
checkpoint['found_count'] = found_count
checkpoint['not_found_count'] = not_found_count
if (i + 1) % 10 == 0:
save_checkpoint(checkpoint)
# Progress update
if (i + 1) % 10 == 0:
logger.info(f"Progress: {i + 1}/{len(files)} - Found: {found_count}, Not found: {not_found_count}")
# Rate limiting
time.sleep(REQUEST_DELAY)
# Final checkpoint save
if not args.dry_run:
save_checkpoint(checkpoint)
# Summary
logger.info(f"\n{'='*50}")
logger.info(f"Discovery complete!")
logger.info(f" Files processed: {len(files)}")
logger.info(f" Websites found: {found_count}")
logger.info(f" Not found: {not_found_count}")
logger.info(f"{'='*50}")
if __name__ == '__main__':
asyncio.run(main())