Drop existing tables before creating to ensure schema updates are applied properly instead of using IF NOT EXISTS which would skip schema changes.
463 lines
17 KiB
Python
463 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migrate web archives from /data/nde/enriched/entries/web/ to /data/custodian/{GHCID}/web/
|
|
|
|
This script:
|
|
1. Builds a mapping from entry_index -> GHCID by scanning custodian files
|
|
2. Moves (or symlinks) web archive folders to the appropriate custodian folder
|
|
3. Creates a DuckDB database with web archive metadata for DuckLake ingestion
|
|
|
|
Usage:
|
|
python scripts/migrate_web_archives.py --dry-run # Preview changes
|
|
python scripts/migrate_web_archives.py --execute # Actually migrate
|
|
python scripts/migrate_web_archives.py --build-ducklake # Create DuckDB tables
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import yaml
|
|
import shutil
|
|
import argparse
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Optional, List, Any
|
|
import json
|
|
|
|
# Try to import duckdb for DuckLake ingestion
|
|
try:
|
|
import duckdb
|
|
HAS_DUCKDB = True
|
|
except ImportError:
|
|
HAS_DUCKDB = False
|
|
print("Warning: duckdb not installed. DuckLake ingestion disabled.")
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Paths
|
|
BASE_DIR = Path("/Users/kempersc/apps/glam")
|
|
CUSTODIAN_DIR = BASE_DIR / "data" / "custodian"
|
|
WEB_ARCHIVE_SOURCE = BASE_DIR / "data" / "nde" / "enriched" / "entries" / "web"
|
|
DUCKLAKE_DB = BASE_DIR / "data" / "ducklake" / "web_archives.duckdb"
|
|
MAPPING_FILE = WEB_ARCHIVE_SOURCE / "_entry_to_ghcid.txt"
|
|
|
|
|
|
def build_entry_index_to_ghcid_mapping() -> Dict[int, str]:
|
|
"""
|
|
Load mapping from pre-built file (created via ripgrep for speed).
|
|
Falls back to scanning YAML files if file doesn't exist.
|
|
|
|
Returns:
|
|
Dict mapping entry_index (int) to GHCID (str, e.g., "NL-GE-GEN-S-HKG")
|
|
"""
|
|
mapping = {}
|
|
|
|
# Try to load from pre-built mapping file
|
|
if MAPPING_FILE.exists():
|
|
logger.info(f"Loading mapping from {MAPPING_FILE}")
|
|
with open(MAPPING_FILE, 'r') as f:
|
|
for line in f:
|
|
parts = line.strip().split(' ', 1)
|
|
if len(parts) == 2 and parts[0].isdigit():
|
|
entry_index = int(parts[0])
|
|
ghcid = parts[1]
|
|
mapping[entry_index] = ghcid
|
|
logger.info(f"Loaded {len(mapping)} entries from mapping file")
|
|
return mapping
|
|
|
|
# Fallback: scan YAML files (slow)
|
|
logger.info("Mapping file not found, scanning custodian files...")
|
|
custodian_files = list(CUSTODIAN_DIR.glob("*.yaml"))
|
|
logger.info(f"Scanning {len(custodian_files)} custodian files...")
|
|
|
|
for filepath in custodian_files:
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data and 'entry_index' in data:
|
|
entry_index = data['entry_index']
|
|
if isinstance(entry_index, int):
|
|
ghcid = filepath.stem # e.g., "NL-GE-GEN-S-HKG"
|
|
mapping[entry_index] = ghcid
|
|
except Exception as e:
|
|
logger.debug(f"Error reading {filepath}: {e}")
|
|
continue
|
|
|
|
logger.info(f"Built mapping for {len(mapping)} entries with entry_index")
|
|
return mapping
|
|
|
|
|
|
def get_web_archive_folders() -> List[Path]:
|
|
"""Get list of web archive folders (entry numbers)."""
|
|
folders = []
|
|
for item in WEB_ARCHIVE_SOURCE.iterdir():
|
|
if item.is_dir() and item.name.isdigit():
|
|
folders.append(item)
|
|
return sorted(folders, key=lambda p: int(p.name))
|
|
|
|
|
|
def parse_metadata(metadata_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Parse web archive metadata.yaml file."""
|
|
try:
|
|
with open(metadata_path, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse {metadata_path}: {e}")
|
|
return None
|
|
|
|
|
|
def migrate_web_archive(source_folder: Path, ghcid: str, dry_run: bool = True) -> bool:
|
|
"""
|
|
Migrate a web archive folder to the custodian's web/ folder.
|
|
|
|
Args:
|
|
source_folder: Path to source web archive (e.g., .../web/0183/historischekringgente.nl/)
|
|
ghcid: Target GHCID (e.g., "NL-GE-GEN-S-HKG")
|
|
dry_run: If True, only preview changes
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
target_dir = CUSTODIAN_DIR / ghcid / "web"
|
|
|
|
# Find domain subfolder
|
|
domain_folders = [d for d in source_folder.iterdir() if d.is_dir()]
|
|
|
|
if not domain_folders:
|
|
logger.warning(f"No domain folders in {source_folder}")
|
|
return False
|
|
|
|
for domain_folder in domain_folders:
|
|
domain_name = domain_folder.name
|
|
target_path = target_dir / domain_name
|
|
|
|
if dry_run:
|
|
logger.info(f"[DRY-RUN] Would migrate: {domain_folder} -> {target_path}")
|
|
else:
|
|
try:
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
if target_path.exists():
|
|
logger.warning(f"Target already exists: {target_path}")
|
|
continue
|
|
shutil.copytree(domain_folder, target_path)
|
|
logger.info(f"Migrated: {domain_folder} -> {target_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to migrate {domain_folder}: {e}")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def build_ducklake_database(mapping: Dict[int, str]):
|
|
"""
|
|
Create DuckDB database with web archive metadata for DuckLake.
|
|
|
|
Tables:
|
|
- web_archives: Archive metadata (ghcid, url, timestamp, stats)
|
|
- web_pages: Individual pages with extraction counts
|
|
- web_claims: Extracted claims/entities from annotations
|
|
"""
|
|
if not HAS_DUCKDB:
|
|
logger.error("DuckDB not installed. Cannot build DuckLake database.")
|
|
return
|
|
|
|
DUCKLAKE_DB.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
con = duckdb.connect(str(DUCKLAKE_DB))
|
|
|
|
# Drop and recreate tables to ensure schema is up to date
|
|
con.execute("DROP TABLE IF EXISTS web_claims")
|
|
con.execute("DROP TABLE IF EXISTS web_pages")
|
|
con.execute("DROP TABLE IF EXISTS web_archives")
|
|
|
|
# Create tables
|
|
con.execute("""
|
|
CREATE TABLE web_archives (
|
|
ghcid VARCHAR PRIMARY KEY,
|
|
entry_index INTEGER,
|
|
domain VARCHAR,
|
|
url VARCHAR,
|
|
archive_timestamp TIMESTAMP,
|
|
archive_method VARCHAR,
|
|
total_pages INTEGER,
|
|
processed_pages INTEGER,
|
|
warc_file VARCHAR,
|
|
warc_size_bytes BIGINT,
|
|
has_annotations BOOLEAN DEFAULT FALSE
|
|
)
|
|
""")
|
|
|
|
con.execute("""
|
|
CREATE TABLE web_pages (
|
|
id INTEGER PRIMARY KEY,
|
|
ghcid VARCHAR,
|
|
page_title VARCHAR,
|
|
source_path VARCHAR,
|
|
archived_file VARCHAR,
|
|
extractions_count INTEGER,
|
|
FOREIGN KEY (ghcid) REFERENCES web_archives(ghcid)
|
|
)
|
|
""")
|
|
|
|
con.execute("""
|
|
CREATE TABLE web_claims (
|
|
id INTEGER PRIMARY KEY,
|
|
ghcid VARCHAR,
|
|
claim_id VARCHAR,
|
|
claim_type VARCHAR,
|
|
text_content VARCHAR,
|
|
hypernym VARCHAR,
|
|
hyponym VARCHAR,
|
|
class_uri VARCHAR,
|
|
xpath VARCHAR,
|
|
recognition_confidence FLOAT,
|
|
linking_confidence FLOAT,
|
|
wikidata_id VARCHAR,
|
|
source_page VARCHAR,
|
|
FOREIGN KEY (ghcid) REFERENCES web_archives(ghcid)
|
|
)
|
|
""")
|
|
|
|
# Clear existing data
|
|
con.execute("-- Removed: DELETE FROM web_claims")
|
|
con.execute("-- Removed: DELETE FROM web_pages")
|
|
con.execute("-- Removed: DELETE FROM web_archives")
|
|
|
|
page_id = 0
|
|
claim_id_counter = 0
|
|
|
|
web_folders = get_web_archive_folders()
|
|
total_folders = len(web_folders)
|
|
logger.info(f"Processing {total_folders} web archive folders for DuckLake...")
|
|
|
|
for idx, folder in enumerate(web_folders):
|
|
if idx % 100 == 0:
|
|
logger.info(f"Progress: {idx}/{total_folders} folders processed ({idx*100//total_folders}%)")
|
|
entry_index = int(folder.name)
|
|
ghcid = mapping.get(entry_index)
|
|
|
|
if not ghcid:
|
|
logger.debug(f"No GHCID mapping for entry {entry_index}")
|
|
continue
|
|
|
|
# Find domain folder
|
|
domain_folders = [d for d in folder.iterdir() if d.is_dir()]
|
|
|
|
for domain_folder in domain_folders:
|
|
metadata_path = domain_folder / "metadata.yaml"
|
|
if not metadata_path.exists():
|
|
continue
|
|
|
|
metadata = parse_metadata(metadata_path)
|
|
if not metadata:
|
|
continue
|
|
|
|
# Check for annotations
|
|
annotations_path = domain_folder / "annotations_v1.7.0.yaml"
|
|
has_annotations = annotations_path.exists()
|
|
|
|
# Parse warc info
|
|
warc_info = metadata.get('warc', {})
|
|
|
|
# Insert archive record
|
|
try:
|
|
archive_ts = metadata.get('archive_timestamp')
|
|
if archive_ts:
|
|
archive_ts = datetime.fromisoformat(archive_ts.replace('Z', '+00:00'))
|
|
|
|
con.execute("""
|
|
INSERT INTO web_archives VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
ghcid,
|
|
entry_index,
|
|
domain_folder.name,
|
|
metadata.get('url'),
|
|
archive_ts,
|
|
metadata.get('archive_method'),
|
|
metadata.get('total_pages', 0),
|
|
metadata.get('processed_pages', 0),
|
|
warc_info.get('warc_file'),
|
|
warc_info.get('warc_size_bytes', 0),
|
|
has_annotations
|
|
])
|
|
except Exception as e:
|
|
logger.debug(f"Error inserting archive {ghcid}: {e}")
|
|
continue
|
|
|
|
# Insert pages
|
|
pages = metadata.get('pages', [])
|
|
|
|
# Handle single-page archives (older format with 'files' key)
|
|
if not pages and 'files' in metadata:
|
|
# Create a synthetic page entry from the single-page fetch
|
|
files = metadata.get('files', {})
|
|
rendered_html = files.get('rendered_html')
|
|
if rendered_html:
|
|
pages = [{
|
|
'title': domain_folder.name, # Use domain as title
|
|
'source_path': 'index.html',
|
|
'archived_file': rendered_html,
|
|
'extractions_count': 0
|
|
}]
|
|
# Update the archive's total_pages count
|
|
con.execute("""
|
|
UPDATE web_archives SET total_pages = 1, processed_pages = 1
|
|
WHERE ghcid = ?
|
|
""", [ghcid])
|
|
|
|
for page in pages:
|
|
page_id += 1
|
|
try:
|
|
con.execute("""
|
|
INSERT INTO web_pages VALUES (?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
page_id,
|
|
ghcid,
|
|
page.get('title'),
|
|
page.get('source_path'),
|
|
page.get('archived_file'),
|
|
page.get('extractions_count', 0)
|
|
])
|
|
except Exception as e:
|
|
logger.debug(f"Error inserting page: {e}")
|
|
|
|
# Insert claims from annotations
|
|
if has_annotations:
|
|
try:
|
|
with open(annotations_path, 'r', encoding='utf-8') as f:
|
|
annotations = yaml.safe_load(f)
|
|
|
|
# Get source page from html_file
|
|
html_file = annotations.get('html_file', '')
|
|
# Extract just the filename part (e.g., "pages/index.html" -> "index.html")
|
|
source_page = html_file.split('/')[-1] if html_file else 'index.html'
|
|
|
|
session = annotations.get('session', {})
|
|
claims = session.get('claims', {})
|
|
|
|
# Process entity claims
|
|
for claim in claims.get('entity', []):
|
|
claim_id_counter += 1
|
|
provenance = claim.get('provenance', {})
|
|
con.execute("""
|
|
INSERT INTO web_claims VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
claim_id_counter,
|
|
ghcid,
|
|
claim.get('claim_id'),
|
|
claim.get('claim_type'),
|
|
claim.get('text_content'),
|
|
claim.get('hypernym'),
|
|
claim.get('hyponym'),
|
|
claim.get('class_uri'),
|
|
provenance.get('path'),
|
|
claim.get('recognition_confidence', 0),
|
|
claim.get('linking_confidence', 0),
|
|
claim.get('wikidata_id'),
|
|
source_page
|
|
])
|
|
|
|
# Process aggregate claims
|
|
for claim in claims.get('aggregate', []):
|
|
claim_id_counter += 1
|
|
provenance = claim.get('provenance', {})
|
|
con.execute("""
|
|
INSERT INTO web_claims VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
claim_id_counter,
|
|
ghcid,
|
|
claim.get('claim_id'),
|
|
claim.get('claim_type'),
|
|
claim.get('text_content'),
|
|
None,
|
|
None,
|
|
None,
|
|
provenance.get('path'),
|
|
provenance.get('confidence', 0),
|
|
0,
|
|
None,
|
|
source_page
|
|
])
|
|
except Exception as e:
|
|
logger.debug(f"Error processing annotations for {ghcid}: {e}")
|
|
|
|
# Create indices
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_pages_ghcid ON web_pages(ghcid)")
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_ghcid ON web_claims(ghcid)")
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_type ON web_claims(claim_type)")
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_hypernym ON web_claims(hypernym)")
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_claims_source_page ON web_claims(source_page)")
|
|
|
|
# Get stats
|
|
archive_count = con.execute("SELECT COUNT(*) FROM web_archives").fetchone()[0]
|
|
page_count = con.execute("SELECT COUNT(*) FROM web_pages").fetchone()[0]
|
|
claim_count = con.execute("SELECT COUNT(*) FROM web_claims").fetchone()[0]
|
|
|
|
con.close()
|
|
|
|
logger.info(f"DuckLake database created at: {DUCKLAKE_DB}")
|
|
logger.info(f" - Archives: {archive_count}")
|
|
logger.info(f" - Pages: {page_count}")
|
|
logger.info(f" - Claims: {claim_count}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Migrate web archives to custodian folders")
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview changes without executing')
|
|
parser.add_argument('--execute', action='store_true', help='Actually migrate files')
|
|
parser.add_argument('--build-ducklake', action='store_true', help='Build DuckDB database only')
|
|
parser.add_argument('--build-mapping', action='store_true', help='Just build and show mapping')
|
|
args = parser.parse_args()
|
|
|
|
if not any([args.dry_run, args.execute, args.build_ducklake, args.build_mapping]):
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Build the mapping
|
|
mapping = build_entry_index_to_ghcid_mapping()
|
|
|
|
if args.build_mapping:
|
|
print(f"\nMapping has {len(mapping)} entries")
|
|
print("\nSample entries:")
|
|
for idx, (entry_idx, ghcid) in enumerate(sorted(mapping.items())[:20]):
|
|
print(f" {entry_idx:04d} -> {ghcid}")
|
|
return
|
|
|
|
if args.build_ducklake:
|
|
build_ducklake_database(mapping)
|
|
return
|
|
|
|
# Migration mode
|
|
web_folders = get_web_archive_folders()
|
|
logger.info(f"Found {len(web_folders)} web archive folders")
|
|
|
|
migrated = 0
|
|
skipped = 0
|
|
no_mapping = 0
|
|
|
|
for folder in web_folders:
|
|
entry_index = int(folder.name)
|
|
ghcid = mapping.get(entry_index)
|
|
|
|
if not ghcid:
|
|
logger.debug(f"No GHCID for entry {entry_index}")
|
|
no_mapping += 1
|
|
continue
|
|
|
|
success = migrate_web_archive(folder, ghcid, dry_run=not args.execute)
|
|
if success:
|
|
migrated += 1
|
|
else:
|
|
skipped += 1
|
|
|
|
print(f"\n{'[DRY-RUN] ' if args.dry_run else ''}Migration summary:")
|
|
print(f" - Migrated: {migrated}")
|
|
print(f" - Skipped: {skipped}")
|
|
print(f" - No mapping: {no_mapping}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|