370 lines
12 KiB
Python
370 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Website Discovery for Custodians using Linkup MCP.
|
|
|
|
This script identifies custodian files without websites and generates
|
|
search queries for Linkup MCP. Results can then be processed to update
|
|
YAML files.
|
|
|
|
Usage:
|
|
# List files needing website discovery:
|
|
python scripts/discover_websites_linkup.py --list
|
|
|
|
# Generate search queries for first N files:
|
|
python scripts/discover_websites_linkup.py --generate-queries --limit 10
|
|
|
|
# Update a file with discovered website:
|
|
python scripts/discover_websites_linkup.py --update JP-01-ABU-L-K --url https://example.com
|
|
|
|
# Batch update from results JSON:
|
|
python scripts/discover_websites_linkup.py --batch-update results.json
|
|
"""
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import yaml
|
|
|
|
# Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
|
|
|
|
# URL patterns to filter out (not useful as institutional websites)
|
|
EXCLUDED_URL_PATTERNS = [
|
|
r'wikipedia\.org',
|
|
r'wikidata\.org',
|
|
r'tripadvisor\.',
|
|
r'google\.com/maps',
|
|
r'maps\.google\.',
|
|
r'facebook\.com',
|
|
r'twitter\.com',
|
|
r'instagram\.com',
|
|
r'youtube\.com',
|
|
r'linkedin\.com',
|
|
r'amazon\.co',
|
|
r'booking\.com',
|
|
r'yelp\.',
|
|
]
|
|
|
|
|
|
def has_website(entry: dict) -> bool:
|
|
"""Check if custodian entry already has a website."""
|
|
# Check original_entry.identifiers
|
|
orig_ids = entry.get('original_entry', {}).get('identifiers', [])
|
|
for ident in orig_ids:
|
|
if isinstance(ident, dict) and ident.get('identifier_scheme') == 'Website':
|
|
return True
|
|
|
|
# Check wikidata_enrichment
|
|
wiki_web = entry.get('wikidata_enrichment', {}).get('wikidata_official_website')
|
|
if wiki_web:
|
|
return True
|
|
|
|
# Check website_discovery (already discovered)
|
|
if entry.get('website_discovery', {}).get('website_url'):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_custodian_name(entry: dict) -> str | None:
|
|
"""Extract the best name for searching."""
|
|
# Try custodian_name first
|
|
name = entry.get('custodian_name', {}).get('claim_value')
|
|
if name:
|
|
return name
|
|
|
|
# Try original_entry.name
|
|
name = entry.get('original_entry', {}).get('name')
|
|
if name:
|
|
return name
|
|
|
|
# Try wikidata label (Japanese preferred for Japanese institutions)
|
|
wikidata = entry.get('wikidata_enrichment', {})
|
|
ja_label = wikidata.get('wikidata_label_ja')
|
|
if ja_label:
|
|
return ja_label
|
|
en_label = wikidata.get('wikidata_label_en')
|
|
if en_label:
|
|
return en_label
|
|
|
|
return None
|
|
|
|
|
|
def get_location_info(entry: dict) -> dict:
|
|
"""Extract location information for search context."""
|
|
location = entry.get('location', {})
|
|
orig_loc = entry.get('original_entry', {}).get('locations', [{}])[0] if entry.get('original_entry', {}).get('locations') else {}
|
|
|
|
return {
|
|
'city': location.get('city') or orig_loc.get('city'),
|
|
'region': location.get('region') or orig_loc.get('region'),
|
|
'country': location.get('country') or orig_loc.get('country') or 'JP',
|
|
}
|
|
|
|
|
|
def generate_search_query(entry: dict) -> str | None:
|
|
"""Generate optimal search query for Linkup."""
|
|
name = get_custodian_name(entry)
|
|
if not name:
|
|
return None
|
|
|
|
location = get_location_info(entry)
|
|
inst_type = entry.get('original_entry', {}).get('institution_type', 'LIBRARY')
|
|
|
|
# Build query parts
|
|
parts = [name]
|
|
|
|
# Add Japanese label if different
|
|
wikidata = entry.get('wikidata_enrichment', {})
|
|
ja_label = wikidata.get('wikidata_label_ja')
|
|
if ja_label and ja_label != name:
|
|
parts.append(ja_label)
|
|
|
|
# Add location context
|
|
if location.get('city'):
|
|
parts.append(location['city'])
|
|
if location.get('region'):
|
|
parts.append(location['region'])
|
|
|
|
# Add institution type hint
|
|
type_hints = {
|
|
'LIBRARY': 'library 図書館',
|
|
'MUSEUM': 'museum 博物館',
|
|
'ARCHIVE': 'archive アーカイブ',
|
|
'GALLERY': 'gallery ギャラリー',
|
|
}
|
|
if inst_type in type_hints:
|
|
parts.append(type_hints[inst_type])
|
|
|
|
# Add country
|
|
parts.append('Japan website official')
|
|
|
|
return ' '.join(parts)
|
|
|
|
|
|
def is_valid_website_url(url: str) -> bool:
|
|
"""Check if URL is a valid institutional website."""
|
|
if not url:
|
|
return False
|
|
|
|
# Check against excluded patterns
|
|
for pattern in EXCLUDED_URL_PATTERNS:
|
|
if re.search(pattern, url, re.IGNORECASE):
|
|
return False
|
|
|
|
# Parse URL
|
|
try:
|
|
parsed = urlparse(url)
|
|
if not parsed.scheme or not parsed.netloc:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def score_url(url: str, entry: dict) -> float:
|
|
"""Score URL relevance (0.0-1.0)."""
|
|
score = 0.5 # Base score
|
|
|
|
parsed = urlparse(url)
|
|
domain = parsed.netloc.lower()
|
|
|
|
# Prefer .jp domains for Japanese institutions
|
|
if entry.get('location', {}).get('country') == 'JP':
|
|
if '.jp' in domain:
|
|
score += 0.2
|
|
if '.go.jp' in domain or '.lg.jp' in domain:
|
|
score += 0.1 # Government domains
|
|
|
|
# Prefer domains containing institution name parts
|
|
name = get_custodian_name(entry) or ''
|
|
name_parts = [p.lower() for p in re.split(r'\s+', name) if len(p) > 3]
|
|
for part in name_parts:
|
|
if part in domain:
|
|
score += 0.1
|
|
|
|
# Prefer shorter paths (homepage vs deep link)
|
|
path_depth = len([p for p in parsed.path.split('/') if p])
|
|
if path_depth <= 2:
|
|
score += 0.1
|
|
|
|
return min(score, 1.0)
|
|
|
|
|
|
def update_custodian_file(filepath: Path, website_url: str, discovery_method: str = 'linkup_search',
|
|
confidence: float = 0.9, search_query: str | None = None) -> bool:
|
|
"""Update custodian YAML file with discovered website."""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
if not entry:
|
|
logger.error(f"Invalid file: {filepath}")
|
|
return False
|
|
|
|
# Add website discovery metadata
|
|
entry['website_discovery'] = {
|
|
'website_url': website_url,
|
|
'discovery_date': datetime.now(timezone.utc).isoformat(),
|
|
'discovery_method': discovery_method,
|
|
'confidence_score': confidence,
|
|
}
|
|
if search_query:
|
|
entry['website_discovery']['search_query'] = search_query
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info(f"Updated: {filepath.name} → {website_url}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to update {filepath}: {e}")
|
|
return False
|
|
|
|
|
|
def list_files_without_websites(pattern: str = "JP-*.yaml", limit: int | None = None) -> list:
|
|
"""List custodian files that don't have websites."""
|
|
files = sorted(CUSTODIAN_DIR.glob(pattern))
|
|
results = []
|
|
|
|
for filepath in files:
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
if entry and not has_website(entry):
|
|
name = get_custodian_name(entry) or filepath.stem
|
|
results.append({
|
|
'filepath': str(filepath),
|
|
'filename': filepath.name,
|
|
'ghcid': filepath.stem,
|
|
'name': name,
|
|
})
|
|
|
|
if limit and len(results) >= limit:
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Error reading {filepath}: {e}")
|
|
|
|
return results
|
|
|
|
|
|
def generate_queries(pattern: str = "JP-*.yaml", limit: int | None = 10) -> list:
|
|
"""Generate search queries for files without websites."""
|
|
files_without = list_files_without_websites(pattern, limit)
|
|
queries = []
|
|
|
|
for item in files_without:
|
|
filepath = Path(item['filepath'])
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
query = generate_search_query(entry)
|
|
if query:
|
|
queries.append({
|
|
'ghcid': item['ghcid'],
|
|
'filename': item['filename'],
|
|
'name': item['name'],
|
|
'search_query': query,
|
|
})
|
|
|
|
return queries
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Website Discovery using Linkup MCP')
|
|
parser.add_argument('--list', action='store_true', help='List files without websites')
|
|
parser.add_argument('--generate-queries', action='store_true', help='Generate search queries')
|
|
parser.add_argument('--limit', type=int, default=10, help='Limit number of files to process')
|
|
parser.add_argument('--pattern', type=str, default='JP-*.yaml', help='File pattern to match')
|
|
parser.add_argument('--update', type=str, help='Update specific file (GHCID)')
|
|
parser.add_argument('--url', type=str, help='Website URL to add')
|
|
parser.add_argument('--batch-update', type=str, help='Batch update from JSON file')
|
|
parser.add_argument('--output', type=str, help='Output file for queries JSON')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list:
|
|
files = list_files_without_websites(args.pattern, args.limit)
|
|
print(f"\n=== Files Without Websites ({len(files)} found) ===\n")
|
|
for item in files:
|
|
print(f" {item['ghcid']}: {item['name'][:60]}")
|
|
print(f"\nTotal: {len(files)} files need website discovery")
|
|
|
|
elif args.generate_queries:
|
|
queries = generate_queries(args.pattern, args.limit)
|
|
print(f"\n=== Search Queries ({len(queries)} generated) ===\n")
|
|
for q in queries:
|
|
print(f"GHCID: {q['ghcid']}")
|
|
print(f"Name: {q['name'][:60]}")
|
|
print(f"Query: {q['search_query']}")
|
|
print()
|
|
|
|
if args.output:
|
|
with open(args.output, 'w', encoding='utf-8') as f:
|
|
json.dump(queries, f, indent=2, ensure_ascii=False)
|
|
print(f"Saved to: {args.output}")
|
|
|
|
elif args.update and args.url:
|
|
# Find the file
|
|
matches = list(CUSTODIAN_DIR.glob(f"{args.update}*.yaml"))
|
|
if not matches:
|
|
print(f"Error: No file found matching {args.update}")
|
|
sys.exit(1)
|
|
filepath = matches[0]
|
|
|
|
if not is_valid_website_url(args.url):
|
|
print(f"Warning: URL may not be a valid institutional website: {args.url}")
|
|
|
|
if update_custodian_file(filepath, args.url):
|
|
print(f"✅ Updated {filepath.name} with {args.url}")
|
|
else:
|
|
print(f"❌ Failed to update {filepath.name}")
|
|
sys.exit(1)
|
|
|
|
elif args.batch_update:
|
|
with open(args.batch_update, 'r', encoding='utf-8') as f:
|
|
updates = json.load(f)
|
|
|
|
success = 0
|
|
failed = 0
|
|
for item in updates:
|
|
ghcid = item.get('ghcid')
|
|
url = item.get('website_url')
|
|
if not ghcid or not url:
|
|
continue
|
|
|
|
matches = list(CUSTODIAN_DIR.glob(f"{ghcid}*.yaml"))
|
|
if matches:
|
|
if update_custodian_file(matches[0], url,
|
|
search_query=item.get('search_query'),
|
|
confidence=item.get('confidence', 0.9)):
|
|
success += 1
|
|
else:
|
|
failed += 1
|
|
else:
|
|
logger.warning(f"File not found: {ghcid}")
|
|
failed += 1
|
|
|
|
print(f"\n=== Batch Update Complete ===")
|
|
print(f"Success: {success}")
|
|
print(f"Failed: {failed}")
|
|
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|