glam/scripts/reenrich_with_xpath.py
2025-12-21 22:12:34 +01:00

389 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Re-enrich custodian files with proper xpath provenance.
Uses httpx to fetch and extract web claims.
Per Rule 6 in AGENTS.md: "Every claim extracted from a webpage MUST have an XPath
pointer to the exact location in archived HTML where that value appears."
"""
import os
import re
import json
import yaml
import subprocess
import asyncio
import httpx
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import time
# Load queue
QUEUE_FILE = Path('/Users/kempersc/apps/glam/data/reenrich_queue.json')
ARCHIVE_DIR = Path('/Users/kempersc/apps/glam/data/custodian/web')
def get_xpath(element) -> str:
"""Generate XPath for a BeautifulSoup element."""
parts = []
child = element
while child.parent and child.parent.name:
siblings = list(child.parent.children)
tag_siblings = [s for s in siblings if hasattr(s, 'name') and s.name == child.name]
if len(tag_siblings) > 1:
index = tag_siblings.index(child) + 1
parts.insert(0, f"{child.name}[{index}]")
else:
parts.insert(0, child.name)
child = child.parent
return '/' + '/'.join(parts) if parts else '/'
def extract_claims_with_xpath(html: str, source_url: str) -> list[dict]:
"""
Extract claims from HTML with proper xpath provenance.
Returns list of claims with xpath, claim_type, claim_value, etc.
"""
soup = BeautifulSoup(html, 'html.parser')
claims = []
timestamp = datetime.now(timezone.utc).isoformat()
# 1. Extract title
title_tag = soup.find('title')
if title_tag and title_tag.string:
claims.append({
'claim_type': 'page_title',
'claim_value': title_tag.string.strip(),
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(title_tag),
'xpath_match_score': 1.0,
'extraction_method': 'title_tag'
})
# 2. Extract meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
content = meta_desc.get('content')
if content and isinstance(content, str):
claims.append({
'claim_type': 'description',
'claim_value': content.strip(),
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(meta_desc),
'xpath_match_score': 1.0,
'extraction_method': 'meta_description'
})
# 3. Extract og:title and og:description
og_title = soup.find('meta', property='og:title')
if og_title:
content = og_title.get('content')
if content and isinstance(content, str):
claims.append({
'claim_type': 'og_title',
'claim_value': content.strip(),
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(og_title),
'xpath_match_score': 1.0,
'extraction_method': 'og_title'
})
og_desc = soup.find('meta', property='og:description')
if og_desc:
content = og_desc.get('content')
if content and isinstance(content, str):
claims.append({
'claim_type': 'og_description',
'claim_value': content.strip(),
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(og_desc),
'xpath_match_score': 1.0,
'extraction_method': 'og_description'
})
# 4. Extract h1 headings
h1_tags = soup.find_all('h1')
for i, h1 in enumerate(h1_tags[:3]): # Limit to first 3
text = h1.get_text(strip=True)
if text and len(text) > 2:
claims.append({
'claim_type': 'heading_h1',
'claim_value': text,
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(h1),
'xpath_match_score': 1.0,
'extraction_method': 'h1_tag'
})
# 5. Extract contact info patterns
# Email
for a in soup.find_all('a', href=re.compile(r'^mailto:')):
href = a.get('href')
if href and isinstance(href, str):
email = href.replace('mailto:', '').split('?')[0]
if email:
claims.append({
'claim_type': 'email',
'claim_value': email,
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(a),
'xpath_match_score': 1.0,
'extraction_method': 'mailto_link'
})
# Phone
for a in soup.find_all('a', href=re.compile(r'^tel:')):
href = a.get('href')
if href and isinstance(href, str):
phone = href.replace('tel:', '')
if phone:
claims.append({
'claim_type': 'phone',
'claim_value': phone,
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(a),
'xpath_match_score': 1.0,
'extraction_method': 'tel_link'
})
# 6. Extract social media links
social_patterns = {
'facebook': r'facebook\.com',
'twitter': r'twitter\.com|x\.com',
'instagram': r'instagram\.com',
'linkedin': r'linkedin\.com',
'youtube': r'youtube\.com',
}
for platform, pattern in social_patterns.items():
for a in soup.find_all('a', href=re.compile(pattern)):
href = a.get('href')
if href and isinstance(href, str):
claims.append({
'claim_type': f'social_{platform}',
'claim_value': href,
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(a),
'xpath_match_score': 1.0,
'extraction_method': 'social_link'
})
break # Only first occurrence per platform
# 7. Extract address (look for common patterns)
address_containers = soup.find_all(['address', 'div', 'p'],
class_=re.compile(r'address|contact|location', re.I))
for container in address_containers[:2]:
text = container.get_text(strip=True)
if len(text) > 10 and len(text) < 300:
claims.append({
'claim_type': 'address_text',
'claim_value': text,
'source_url': source_url,
'retrieved_on': timestamp,
'xpath': get_xpath(container),
'xpath_match_score': 0.8, # Lower score for inferred
'extraction_method': 'address_container'
})
return claims
def remove_web_enrichment_block(content: str) -> str:
"""Remove the web_enrichment block from YAML content."""
# Match from ^web_enrichment: to next top-level key or end
pattern = r'^web_enrichment:.*?(?=^[a-z_]+:|\Z)'
result = re.sub(pattern, '', content, flags=re.MULTILINE | re.DOTALL)
# Clean up extra blank lines
result = re.sub(r'\n{3,}', '\n\n', result)
return result
def add_web_enrichment_block(content: str, enrichment: dict) -> str:
"""Add new web_enrichment block to YAML content."""
# Convert enrichment dict to YAML
enrichment_yaml = yaml.dump({'web_enrichment': enrichment},
default_flow_style=False,
allow_unicode=True,
sort_keys=False)
# Add at the end of the file
if not content.endswith('\n'):
content += '\n'
return content + '\n' + enrichment_yaml
async def fetch_and_enrich(entry: dict, session: httpx.AsyncClient) -> Optional[dict]:
"""
Fetch URL and extract claims with xpath provenance.
Returns enrichment dict or None on failure.
"""
url = entry['url']
ghcid = entry['ghcid']
try:
# Fetch page
response = await session.get(url, follow_redirects=True, timeout=30.0)
response.raise_for_status()
html = response.text
# Save HTML archive
parsed_url = urlparse(url)
archive_subdir = ARCHIVE_DIR / ghcid.replace('-', '/')
archive_subdir.mkdir(parents=True, exist_ok=True)
html_file = archive_subdir / f"{parsed_url.netloc}_index.html"
with open(html_file, 'w', encoding='utf-8') as f:
f.write(html)
# Extract claims
claims = extract_claims_with_xpath(html, url)
if not claims:
return None
# Update html_file path in claims (relative to data/custodian)
rel_html_path = str(html_file.relative_to(Path('/Users/kempersc/apps/glam/data/custodian')))
for claim in claims:
claim['html_file'] = rel_html_path
enrichment = {
'archive_metadata': {
'archive_method': 'httpx_fetch',
'archive_timestamp': datetime.now(timezone.utc).isoformat(),
'archive_location': str(archive_subdir.relative_to(Path('/Users/kempersc/apps/glam/data/custodian'))),
'source_url': url,
'html_file': rel_html_path
},
'claims': claims
}
return enrichment
except Exception as e:
print(f" Error fetching {url}: {e}")
return None
async def process_batch(entries: list[dict], batch_size: int = 5) -> dict:
"""Process a batch of entries concurrently."""
results = {}
async with httpx.AsyncClient(
headers={'User-Agent': 'GLAM Heritage Custodian Enrichment Bot/1.0'},
follow_redirects=True
) as session:
for i in range(0, len(entries), batch_size):
batch = entries[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1} ({len(batch)} entries)...")
tasks = [fetch_and_enrich(entry, session) for entry in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for entry, result in zip(batch, batch_results):
if isinstance(result, Exception):
print(f" Exception for {entry['ghcid']}: {result}")
results[entry['ghcid']] = None
else:
results[entry['ghcid']] = result
# Rate limiting
await asyncio.sleep(1)
return results
def update_custodian_file(filepath: str, enrichment: dict) -> bool:
"""
Update custodian YAML file with new web_enrichment.
Removes old fabricated block, adds new one with xpath provenance.
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Remove old web_enrichment block
content = remove_web_enrichment_block(content)
# Add new enrichment
content = add_web_enrichment_block(content, enrichment)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
async def main():
import argparse
parser = argparse.ArgumentParser(description='Re-enrich files with proper xpath provenance')
parser.add_argument('--limit', '-l', type=int, default=10, help='Max files to process')
parser.add_argument('--dry-run', '-n', action='store_true', help='Do not modify files')
parser.add_argument('--batch-size', '-b', type=int, default=5, help='Concurrent requests')
args = parser.parse_args()
# Load queue
with open(QUEUE_FILE) as f:
queue_data = json.load(f)
entries = queue_data['queue'][:args.limit]
print(f"Re-enriching {len(entries)} files with proper xpath provenance...")
print(f"Started at: {datetime.now().isoformat()}")
print()
# Process entries
results = await process_batch(entries, batch_size=args.batch_size)
# Update files
success = 0
failed = 0
skipped = 0
for entry in entries:
ghcid = entry['ghcid']
enrichment = results.get(ghcid)
if enrichment is None:
print(f" SKIP {ghcid}: No enrichment data")
skipped += 1
continue
if args.dry_run:
print(f" DRY-RUN {ghcid}: Would update with {len(enrichment['claims'])} claims")
success += 1
else:
if update_custodian_file(entry['filepath'], enrichment):
print(f" OK {ghcid}: Updated with {len(enrichment['claims'])} claims")
success += 1
else:
failed += 1
print()
print("=" * 60)
print("RESULTS")
print("=" * 60)
print(f"Successfully enriched: {success}")
print(f"Skipped (fetch failed): {skipped}")
print(f"Failed (update error): {failed}")
print(f"Completed at: {datetime.now().isoformat()}")
if __name__ == '__main__':
asyncio.run(main())