glam/scripts/discover_websites_linkup.py
2025-12-27 02:15:17 +01:00

370 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Website Discovery for Custodians using Linkup MCP.
This script identifies custodian files without websites and generates
search queries for Linkup MCP. Results can then be processed to update
YAML files.
Usage:
# List files needing website discovery:
python scripts/discover_websites_linkup.py --list
# Generate search queries for first N files:
python scripts/discover_websites_linkup.py --generate-queries --limit 10
# Update a file with discovered website:
python scripts/discover_websites_linkup.py --update JP-01-ABU-L-K --url https://example.com
# Batch update from results JSON:
python scripts/discover_websites_linkup.py --batch-update results.json
"""
import argparse
import json
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import yaml
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
# URL patterns to filter out (not useful as institutional websites)
EXCLUDED_URL_PATTERNS = [
r'wikipedia\.org',
r'wikidata\.org',
r'tripadvisor\.',
r'google\.com/maps',
r'maps\.google\.',
r'facebook\.com',
r'twitter\.com',
r'instagram\.com',
r'youtube\.com',
r'linkedin\.com',
r'amazon\.co',
r'booking\.com',
r'yelp\.',
]
def has_website(entry: dict) -> bool:
"""Check if custodian entry already has a website."""
# Check original_entry.identifiers
orig_ids = entry.get('original_entry', {}).get('identifiers', [])
for ident in orig_ids:
if isinstance(ident, dict) and ident.get('identifier_scheme') == 'Website':
return True
# Check wikidata_enrichment
wiki_web = entry.get('wikidata_enrichment', {}).get('wikidata_official_website')
if wiki_web:
return True
# Check website_discovery (already discovered)
if entry.get('website_discovery', {}).get('website_url'):
return True
return False
def get_custodian_name(entry: dict) -> str | None:
"""Extract the best name for searching."""
# Try custodian_name first
name = entry.get('custodian_name', {}).get('claim_value')
if name:
return name
# Try original_entry.name
name = entry.get('original_entry', {}).get('name')
if name:
return name
# Try wikidata label (Japanese preferred for Japanese institutions)
wikidata = entry.get('wikidata_enrichment', {})
ja_label = wikidata.get('wikidata_label_ja')
if ja_label:
return ja_label
en_label = wikidata.get('wikidata_label_en')
if en_label:
return en_label
return None
def get_location_info(entry: dict) -> dict:
"""Extract location information for search context."""
location = entry.get('location', {})
orig_loc = entry.get('original_entry', {}).get('locations', [{}])[0] if entry.get('original_entry', {}).get('locations') else {}
return {
'city': location.get('city') or orig_loc.get('city'),
'region': location.get('region') or orig_loc.get('region'),
'country': location.get('country') or orig_loc.get('country') or 'JP',
}
def generate_search_query(entry: dict) -> str | None:
"""Generate optimal search query for Linkup."""
name = get_custodian_name(entry)
if not name:
return None
location = get_location_info(entry)
inst_type = entry.get('original_entry', {}).get('institution_type', 'LIBRARY')
# Build query parts
parts = [name]
# Add Japanese label if different
wikidata = entry.get('wikidata_enrichment', {})
ja_label = wikidata.get('wikidata_label_ja')
if ja_label and ja_label != name:
parts.append(ja_label)
# Add location context
if location.get('city'):
parts.append(location['city'])
if location.get('region'):
parts.append(location['region'])
# Add institution type hint
type_hints = {
'LIBRARY': 'library 図書館',
'MUSEUM': 'museum 博物館',
'ARCHIVE': 'archive アーカイブ',
'GALLERY': 'gallery ギャラリー',
}
if inst_type in type_hints:
parts.append(type_hints[inst_type])
# Add country
parts.append('Japan website official')
return ' '.join(parts)
def is_valid_website_url(url: str) -> bool:
"""Check if URL is a valid institutional website."""
if not url:
return False
# Check against excluded patterns
for pattern in EXCLUDED_URL_PATTERNS:
if re.search(pattern, url, re.IGNORECASE):
return False
# Parse URL
try:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return False
except Exception:
return False
return True
def score_url(url: str, entry: dict) -> float:
"""Score URL relevance (0.0-1.0)."""
score = 0.5 # Base score
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Prefer .jp domains for Japanese institutions
if entry.get('location', {}).get('country') == 'JP':
if '.jp' in domain:
score += 0.2
if '.go.jp' in domain or '.lg.jp' in domain:
score += 0.1 # Government domains
# Prefer domains containing institution name parts
name = get_custodian_name(entry) or ''
name_parts = [p.lower() for p in re.split(r'\s+', name) if len(p) > 3]
for part in name_parts:
if part in domain:
score += 0.1
# Prefer shorter paths (homepage vs deep link)
path_depth = len([p for p in parsed.path.split('/') if p])
if path_depth <= 2:
score += 0.1
return min(score, 1.0)
def update_custodian_file(filepath: Path, website_url: str, discovery_method: str = 'linkup_search',
confidence: float = 0.9, search_query: str | None = None) -> bool:
"""Update custodian YAML file with discovered website."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
logger.error(f"Invalid file: {filepath}")
return False
# Add website discovery metadata
entry['website_discovery'] = {
'website_url': website_url,
'discovery_date': datetime.now(timezone.utc).isoformat(),
'discovery_method': discovery_method,
'confidence_score': confidence,
}
if search_query:
entry['website_discovery']['search_query'] = search_query
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
logger.info(f"Updated: {filepath.name}{website_url}")
return True
except Exception as e:
logger.error(f"Failed to update {filepath}: {e}")
return False
def list_files_without_websites(pattern: str = "JP-*.yaml", limit: int | None = None) -> list:
"""List custodian files that don't have websites."""
files = sorted(CUSTODIAN_DIR.glob(pattern))
results = []
for filepath in files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if entry and not has_website(entry):
name = get_custodian_name(entry) or filepath.stem
results.append({
'filepath': str(filepath),
'filename': filepath.name,
'ghcid': filepath.stem,
'name': name,
})
if limit and len(results) >= limit:
break
except Exception as e:
logger.warning(f"Error reading {filepath}: {e}")
return results
def generate_queries(pattern: str = "JP-*.yaml", limit: int | None = 10) -> list:
"""Generate search queries for files without websites."""
files_without = list_files_without_websites(pattern, limit)
queries = []
for item in files_without:
filepath = Path(item['filepath'])
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
query = generate_search_query(entry)
if query:
queries.append({
'ghcid': item['ghcid'],
'filename': item['filename'],
'name': item['name'],
'search_query': query,
})
return queries
def main():
parser = argparse.ArgumentParser(description='Website Discovery using Linkup MCP')
parser.add_argument('--list', action='store_true', help='List files without websites')
parser.add_argument('--generate-queries', action='store_true', help='Generate search queries')
parser.add_argument('--limit', type=int, default=10, help='Limit number of files to process')
parser.add_argument('--pattern', type=str, default='JP-*.yaml', help='File pattern to match')
parser.add_argument('--update', type=str, help='Update specific file (GHCID)')
parser.add_argument('--url', type=str, help='Website URL to add')
parser.add_argument('--batch-update', type=str, help='Batch update from JSON file')
parser.add_argument('--output', type=str, help='Output file for queries JSON')
args = parser.parse_args()
if args.list:
files = list_files_without_websites(args.pattern, args.limit)
print(f"\n=== Files Without Websites ({len(files)} found) ===\n")
for item in files:
print(f" {item['ghcid']}: {item['name'][:60]}")
print(f"\nTotal: {len(files)} files need website discovery")
elif args.generate_queries:
queries = generate_queries(args.pattern, args.limit)
print(f"\n=== Search Queries ({len(queries)} generated) ===\n")
for q in queries:
print(f"GHCID: {q['ghcid']}")
print(f"Name: {q['name'][:60]}")
print(f"Query: {q['search_query']}")
print()
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(queries, f, indent=2, ensure_ascii=False)
print(f"Saved to: {args.output}")
elif args.update and args.url:
# Find the file
matches = list(CUSTODIAN_DIR.glob(f"{args.update}*.yaml"))
if not matches:
print(f"Error: No file found matching {args.update}")
sys.exit(1)
filepath = matches[0]
if not is_valid_website_url(args.url):
print(f"Warning: URL may not be a valid institutional website: {args.url}")
if update_custodian_file(filepath, args.url):
print(f"✅ Updated {filepath.name} with {args.url}")
else:
print(f"❌ Failed to update {filepath.name}")
sys.exit(1)
elif args.batch_update:
with open(args.batch_update, 'r', encoding='utf-8') as f:
updates = json.load(f)
success = 0
failed = 0
for item in updates:
ghcid = item.get('ghcid')
url = item.get('website_url')
if not ghcid or not url:
continue
matches = list(CUSTODIAN_DIR.glob(f"{ghcid}*.yaml"))
if matches:
if update_custodian_file(matches[0], url,
search_query=item.get('search_query'),
confidence=item.get('confidence', 0.9)):
success += 1
else:
failed += 1
else:
logger.warning(f"File not found: {ghcid}")
failed += 1
print(f"\n=== Batch Update Complete ===")
print(f"Success: {success}")
print(f"Failed: {failed}")
else:
parser.print_help()
if __name__ == '__main__':
main()