glam/scripts/migrate_web_archives.py
kempersc 400b1c04c1 fix(scripts): force table recreation in web archives migration
Drop existing tables before creating to ensure schema updates are applied
properly instead of using IF NOT EXISTS which would skip schema changes.
2025-12-07 18:47:46 +01:00

463 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Migrate web archives from /data/nde/enriched/entries/web/ to /data/custodian/{GHCID}/web/
This script:
1. Builds a mapping from entry_index -> GHCID by scanning custodian files
2. Moves (or symlinks) web archive folders to the appropriate custodian folder
3. Creates a DuckDB database with web archive metadata for DuckLake ingestion
Usage:
python scripts/migrate_web_archives.py --dry-run # Preview changes
python scripts/migrate_web_archives.py --execute # Actually migrate
python scripts/migrate_web_archives.py --build-ducklake # Create DuckDB tables
"""
import os
import sys
import re
import yaml
import shutil
import argparse
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, List, Any
import json
# Try to import duckdb for DuckLake ingestion
try:
import duckdb
HAS_DUCKDB = True
except ImportError:
HAS_DUCKDB = False
print("Warning: duckdb not installed. DuckLake ingestion disabled.")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Paths
BASE_DIR = Path("/Users/kempersc/apps/glam")
CUSTODIAN_DIR = BASE_DIR / "data" / "custodian"
WEB_ARCHIVE_SOURCE = BASE_DIR / "data" / "nde" / "enriched" / "entries" / "web"
DUCKLAKE_DB = BASE_DIR / "data" / "ducklake" / "web_archives.duckdb"
MAPPING_FILE = WEB_ARCHIVE_SOURCE / "_entry_to_ghcid.txt"
def build_entry_index_to_ghcid_mapping() -> Dict[int, str]:
"""
Load mapping from pre-built file (created via ripgrep for speed).
Falls back to scanning YAML files if file doesn't exist.
Returns:
Dict mapping entry_index (int) to GHCID (str, e.g., "NL-GE-GEN-S-HKG")
"""
mapping = {}
# Try to load from pre-built mapping file
if MAPPING_FILE.exists():
logger.info(f"Loading mapping from {MAPPING_FILE}")
with open(MAPPING_FILE, 'r') as f:
for line in f:
parts = line.strip().split(' ', 1)
if len(parts) == 2 and parts[0].isdigit():
entry_index = int(parts[0])
ghcid = parts[1]
mapping[entry_index] = ghcid
logger.info(f"Loaded {len(mapping)} entries from mapping file")
return mapping
# Fallback: scan YAML files (slow)
logger.info("Mapping file not found, scanning custodian files...")
custodian_files = list(CUSTODIAN_DIR.glob("*.yaml"))
logger.info(f"Scanning {len(custodian_files)} custodian files...")
for filepath in custodian_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and 'entry_index' in data:
entry_index = data['entry_index']
if isinstance(entry_index, int):
ghcid = filepath.stem # e.g., "NL-GE-GEN-S-HKG"
mapping[entry_index] = ghcid
except Exception as e:
logger.debug(f"Error reading {filepath}: {e}")
continue
logger.info(f"Built mapping for {len(mapping)} entries with entry_index")
return mapping
def get_web_archive_folders() -> List[Path]:
"""Get list of web archive folders (entry numbers)."""
folders = []
for item in WEB_ARCHIVE_SOURCE.iterdir():
if item.is_dir() and item.name.isdigit():
folders.append(item)
return sorted(folders, key=lambda p: int(p.name))
def parse_metadata(metadata_path: Path) -> Optional[Dict[str, Any]]:
"""Parse web archive metadata.yaml file."""
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to parse {metadata_path}: {e}")
return None
def migrate_web_archive(source_folder: Path, ghcid: str, dry_run: bool = True) -> bool:
"""
Migrate a web archive folder to the custodian's web/ folder.
Args:
source_folder: Path to source web archive (e.g., .../web/0183/historischekringgente.nl/)
ghcid: Target GHCID (e.g., "NL-GE-GEN-S-HKG")
dry_run: If True, only preview changes
Returns:
True if successful
"""
target_dir = CUSTODIAN_DIR / ghcid / "web"
# Find domain subfolder
domain_folders = [d for d in source_folder.iterdir() if d.is_dir()]
if not domain_folders:
logger.warning(f"No domain folders in {source_folder}")
return False
for domain_folder in domain_folders:
domain_name = domain_folder.name
target_path = target_dir / domain_name
if dry_run:
logger.info(f"[DRY-RUN] Would migrate: {domain_folder} -> {target_path}")
else:
try:
target_dir.mkdir(parents=True, exist_ok=True)
if target_path.exists():
logger.warning(f"Target already exists: {target_path}")
continue
shutil.copytree(domain_folder, target_path)
logger.info(f"Migrated: {domain_folder} -> {target_path}")
except Exception as e:
logger.error(f"Failed to migrate {domain_folder}: {e}")
return False
return True
def build_ducklake_database(mapping: Dict[int, str]):
"""
Create DuckDB database with web archive metadata for DuckLake.
Tables:
- web_archives: Archive metadata (ghcid, url, timestamp, stats)
- web_pages: Individual pages with extraction counts
- web_claims: Extracted claims/entities from annotations
"""
if not HAS_DUCKDB:
logger.error("DuckDB not installed. Cannot build DuckLake database.")
return
DUCKLAKE_DB.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(DUCKLAKE_DB))
# Drop and recreate tables to ensure schema is up to date
con.execute("DROP TABLE IF EXISTS web_claims")
con.execute("DROP TABLE IF EXISTS web_pages")
con.execute("DROP TABLE IF EXISTS web_archives")
# Create tables
con.execute("""
CREATE TABLE web_archives (
ghcid VARCHAR PRIMARY KEY,
entry_index INTEGER,
domain VARCHAR,
url VARCHAR,
archive_timestamp TIMESTAMP,
archive_method VARCHAR,
total_pages INTEGER,
processed_pages INTEGER,
warc_file VARCHAR,
warc_size_bytes BIGINT,
has_annotations BOOLEAN DEFAULT FALSE
)
""")
con.execute("""
CREATE TABLE web_pages (
id INTEGER PRIMARY KEY,
ghcid VARCHAR,
page_title VARCHAR,
source_path VARCHAR,
archived_file VARCHAR,
extractions_count INTEGER,
FOREIGN KEY (ghcid) REFERENCES web_archives(ghcid)
)
""")
con.execute("""
CREATE TABLE web_claims (
id INTEGER PRIMARY KEY,
ghcid VARCHAR,
claim_id VARCHAR,
claim_type VARCHAR,
text_content VARCHAR,
hypernym VARCHAR,
hyponym VARCHAR,
class_uri VARCHAR,
xpath VARCHAR,
recognition_confidence FLOAT,
linking_confidence FLOAT,
wikidata_id VARCHAR,
source_page VARCHAR,
FOREIGN KEY (ghcid) REFERENCES web_archives(ghcid)
)
""")
# Clear existing data
con.execute("-- Removed: DELETE FROM web_claims")
con.execute("-- Removed: DELETE FROM web_pages")
con.execute("-- Removed: DELETE FROM web_archives")
page_id = 0
claim_id_counter = 0
web_folders = get_web_archive_folders()
total_folders = len(web_folders)
logger.info(f"Processing {total_folders} web archive folders for DuckLake...")
for idx, folder in enumerate(web_folders):
if idx % 100 == 0:
logger.info(f"Progress: {idx}/{total_folders} folders processed ({idx*100//total_folders}%)")
entry_index = int(folder.name)
ghcid = mapping.get(entry_index)
if not ghcid:
logger.debug(f"No GHCID mapping for entry {entry_index}")
continue
# Find domain folder
domain_folders = [d for d in folder.iterdir() if d.is_dir()]
for domain_folder in domain_folders:
metadata_path = domain_folder / "metadata.yaml"
if not metadata_path.exists():
continue
metadata = parse_metadata(metadata_path)
if not metadata:
continue
# Check for annotations
annotations_path = domain_folder / "annotations_v1.7.0.yaml"
has_annotations = annotations_path.exists()
# Parse warc info
warc_info = metadata.get('warc', {})
# Insert archive record
try:
archive_ts = metadata.get('archive_timestamp')
if archive_ts:
archive_ts = datetime.fromisoformat(archive_ts.replace('Z', '+00:00'))
con.execute("""
INSERT INTO web_archives VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
ghcid,
entry_index,
domain_folder.name,
metadata.get('url'),
archive_ts,
metadata.get('archive_method'),
metadata.get('total_pages', 0),
metadata.get('processed_pages', 0),
warc_info.get('warc_file'),
warc_info.get('warc_size_bytes', 0),
has_annotations
])
except Exception as e:
logger.debug(f"Error inserting archive {ghcid}: {e}")
continue
# Insert pages
pages = metadata.get('pages', [])
# Handle single-page archives (older format with 'files' key)
if not pages and 'files' in metadata:
# Create a synthetic page entry from the single-page fetch
files = metadata.get('files', {})
rendered_html = files.get('rendered_html')
if rendered_html:
pages = [{
'title': domain_folder.name, # Use domain as title
'source_path': 'index.html',
'archived_file': rendered_html,
'extractions_count': 0
}]
# Update the archive's total_pages count
con.execute("""
UPDATE web_archives SET total_pages = 1, processed_pages = 1
WHERE ghcid = ?
""", [ghcid])
for page in pages:
page_id += 1
try:
con.execute("""
INSERT INTO web_pages VALUES (?, ?, ?, ?, ?, ?)
""", [
page_id,
ghcid,
page.get('title'),
page.get('source_path'),
page.get('archived_file'),
page.get('extractions_count', 0)
])
except Exception as e:
logger.debug(f"Error inserting page: {e}")
# Insert claims from annotations
if has_annotations:
try:
with open(annotations_path, 'r', encoding='utf-8') as f:
annotations = yaml.safe_load(f)
# Get source page from html_file
html_file = annotations.get('html_file', '')
# Extract just the filename part (e.g., "pages/index.html" -> "index.html")
source_page = html_file.split('/')[-1] if html_file else 'index.html'
session = annotations.get('session', {})
claims = session.get('claims', {})
# Process entity claims
for claim in claims.get('entity', []):
claim_id_counter += 1
provenance = claim.get('provenance', {})
con.execute("""
INSERT INTO web_claims VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
claim_id_counter,
ghcid,
claim.get('claim_id'),
claim.get('claim_type'),
claim.get('text_content'),
claim.get('hypernym'),
claim.get('hyponym'),
claim.get('class_uri'),
provenance.get('path'),
claim.get('recognition_confidence', 0),
claim.get('linking_confidence', 0),
claim.get('wikidata_id'),
source_page
])
# Process aggregate claims
for claim in claims.get('aggregate', []):
claim_id_counter += 1
provenance = claim.get('provenance', {})
con.execute("""
INSERT INTO web_claims VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
claim_id_counter,
ghcid,
claim.get('claim_id'),
claim.get('claim_type'),
claim.get('text_content'),
None,
None,
None,
provenance.get('path'),
provenance.get('confidence', 0),
0,
None,
source_page
])
except Exception as e:
logger.debug(f"Error processing annotations for {ghcid}: {e}")
# Create indices
con.execute("CREATE INDEX IF NOT EXISTS idx_pages_ghcid ON web_pages(ghcid)")
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_ghcid ON web_claims(ghcid)")
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_type ON web_claims(claim_type)")
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_hypernym ON web_claims(hypernym)")
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_source_page ON web_claims(source_page)")
# Get stats
archive_count = con.execute("SELECT COUNT(*) FROM web_archives").fetchone()[0]
page_count = con.execute("SELECT COUNT(*) FROM web_pages").fetchone()[0]
claim_count = con.execute("SELECT COUNT(*) FROM web_claims").fetchone()[0]
con.close()
logger.info(f"DuckLake database created at: {DUCKLAKE_DB}")
logger.info(f" - Archives: {archive_count}")
logger.info(f" - Pages: {page_count}")
logger.info(f" - Claims: {claim_count}")
def main():
parser = argparse.ArgumentParser(description="Migrate web archives to custodian folders")
parser.add_argument('--dry-run', action='store_true', help='Preview changes without executing')
parser.add_argument('--execute', action='store_true', help='Actually migrate files')
parser.add_argument('--build-ducklake', action='store_true', help='Build DuckDB database only')
parser.add_argument('--build-mapping', action='store_true', help='Just build and show mapping')
args = parser.parse_args()
if not any([args.dry_run, args.execute, args.build_ducklake, args.build_mapping]):
parser.print_help()
sys.exit(1)
# Build the mapping
mapping = build_entry_index_to_ghcid_mapping()
if args.build_mapping:
print(f"\nMapping has {len(mapping)} entries")
print("\nSample entries:")
for idx, (entry_idx, ghcid) in enumerate(sorted(mapping.items())[:20]):
print(f" {entry_idx:04d} -> {ghcid}")
return
if args.build_ducklake:
build_ducklake_database(mapping)
return
# Migration mode
web_folders = get_web_archive_folders()
logger.info(f"Found {len(web_folders)} web archive folders")
migrated = 0
skipped = 0
no_mapping = 0
for folder in web_folders:
entry_index = int(folder.name)
ghcid = mapping.get(entry_index)
if not ghcid:
logger.debug(f"No GHCID for entry {entry_index}")
no_mapping += 1
continue
success = migrate_web_archive(folder, ghcid, dry_run=not args.execute)
if success:
migrated += 1
else:
skipped += 1
print(f"\n{'[DRY-RUN] ' if args.dry_run else ''}Migration summary:")
print(f" - Migrated: {migrated}")
print(f" - Skipped: {skipped}")
print(f" - No mapping: {no_mapping}")
if __name__ == '__main__':
main()