#!/usr/bin/env python3 """ Archive entire websites using wget with WARC format (Internet Archive standard). RESUMABLE: This script maintains state and can be interrupted and resumed at any time. State is tracked in data/nde/enriched/entries/web/_archive_state.yaml PARALLEL: Uses ThreadPoolExecutor for concurrent website archiving. - Level 1: Multiple websites archived simultaneously (--workers) - Level 2: Deep recursive crawling (level 5) for comprehensive content This script: 1. Uses wget to recursively download entire websites (5 levels deep) 2. Creates ISO 28500 WARC files (.warc.gz) for archival 3. Generates CDX index files for random access 4. Extracts content with XPath provenance for verification 5. Stores everything in data/nde/enriched/entries/web/{entry_number}/ Requirements: - wget (with WARC support - wget 1.14+) - beautifulsoup4 Usage: # Run continuously with 4 parallel workers (resumable) python scripts/archive_website_full.py --continuous --workers 4 # Run with limit python scripts/archive_website_full.py --limit 50 --workers 8 # Process specific entry python scripts/archive_website_full.py --entry 0423 # Show current progress python scripts/archive_website_full.py --status # Reset state and start fresh python scripts/archive_website_full.py --reset """ import argparse import os import re import signal import subprocess import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from queue import Queue from threading import Lock from urllib.parse import urlparse import shutil import yaml try: from bs4 import BeautifulSoup HAS_DEPS = True except ImportError as e: HAS_DEPS = False print(f"Warning: Missing dependency: {e}") print("Install with: pip install beautifulsoup4") # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') WEB_DIR = ENTRIES_DIR / 'web' STATE_FILE = WEB_DIR / '_archive_state.yaml' LOG_FILE = WEB_DIR / '_archive_log.txt' # Thread-safe logging LOG_LOCK = Lock() # Global flag for graceful shutdown SHUTDOWN_REQUESTED = False def signal_handler(signum, frame): """Handle interrupt signals gracefully.""" global SHUTDOWN_REQUESTED print("\n\nāš ļø Shutdown requested. Finishing current entries and saving state...") SHUTDOWN_REQUESTED = True def load_state() -> dict: """Load archive state from file.""" if STATE_FILE.exists(): with open(STATE_FILE, 'r', encoding='utf-8') as f: return yaml.safe_load(f) or {} return { 'completed_entries': [], 'failed_entries': [], 'skipped_entries': [], 'in_progress': [], 'total_pages_archived': 0, 'total_sites_failed': 0, 'started_at': None, 'last_updated': None, 'run_count': 0 } def save_state(state: dict): """Save archive state to file (thread-safe).""" with LOG_LOCK: state['last_updated'] = datetime.now(timezone.utc).isoformat() WEB_DIR.mkdir(parents=True, exist_ok=True) # Write atomically temp_file = STATE_FILE.with_suffix('.tmp') with open(temp_file, 'w', encoding='utf-8') as f: yaml.dump(state, f, default_flow_style=False, allow_unicode=True, sort_keys=False) temp_file.rename(STATE_FILE) def log_message(message: str, also_print: bool = True): """Append message to log file (thread-safe).""" timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') with LOG_LOCK: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(f"[{timestamp}] {message}\n") if also_print: print(message) def sanitize_dirname(url: str) -> str: """Create a safe directory name from a URL.""" parsed = urlparse(url) name = parsed.netloc.replace('www.', '') name = re.sub(r'[^\w\-.]', '_', name) return name def get_xpath(element) -> str: """Generate XPath for a BeautifulSoup element.""" parts = [] while element and element.name: siblings = element.find_previous_siblings(element.name) index = len(siblings) + 1 parts.insert(0, f"{element.name}[{index}]") element = element.parent return '/' + '/'.join(parts) if parts else '/' def extract_text_with_xpaths(soup, max_items: int = 300) -> list[dict]: """Extract text content with XPath locations.""" extractions = [] # Extract headings for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: for elem in soup.find_all(tag): text = elem.get_text(strip=True) if text and len(text) > 2: extractions.append({ 'text': text, 'xpath': get_xpath(elem), 'tag': tag, 'classes': list(elem.get('class', [])) }) # Extract paragraphs for elem in soup.find_all('p'): text = elem.get_text(strip=True) if text and len(text) > 15: extractions.append({ 'text': text[:500], 'xpath': get_xpath(elem), 'tag': 'p', 'classes': list(elem.get('class', [])) }) # Extract list items for elem in soup.find_all('li'): text = elem.get_text(strip=True) if text and len(text) > 8: extractions.append({ 'text': text[:200], 'xpath': get_xpath(elem), 'tag': 'li', 'classes': list(elem.get('class', [])) }) # Extract table cells for elem in soup.find_all(['td', 'th']): text = elem.get_text(strip=True) if text and len(text) > 5: extractions.append({ 'text': text[:200], 'xpath': get_xpath(elem), 'tag': elem.name, 'classes': list(elem.get('class', [])) }) # Extract links with href for elem in soup.find_all('a', href=True): text = elem.get_text(strip=True) href = elem.get('href', '') if text and len(text) > 2: extractions.append({ 'text': text[:200], 'xpath': get_xpath(elem), 'tag': 'a', 'href': href[:500], 'classes': list(elem.get('class', [])) }) # Extract spans/divs with semantic classes for elem in soup.find_all(['address', 'span', 'div', 'section', 'article'], class_=lambda x: x and any( t in str(x).lower() for t in ['address', 'contact', 'phone', 'email', 'location', 'description', 'content', 'main', 'body', 'text', 'info', 'detail', 'about', 'bio', 'summary'] )): text = elem.get_text(strip=True) if text and len(text) > 10: extractions.append({ 'text': text[:500], 'xpath': get_xpath(elem), 'tag': elem.name, 'classes': list(elem.get('class', [])), 'type': 'semantic_content' }) return extractions[:max_items] def wget_warc_archive(url: str, output_dir: Path, timeout: int = 180) -> tuple[bool, str, Path | None]: """ Use wget to create a comprehensive WARC archive of a website. Deep crawling with level=5 for maximum content extraction. Optimized for speed with reduced wait times. Args: url: Starting URL output_dir: Directory to store archive timeout: Total timeout in seconds (default 180 = 3 minutes) Returns: Tuple of (success, error_message, warc_file_path) """ mirror_dir = output_dir / 'mirror' mirror_dir.mkdir(parents=True, exist_ok=True) warc_base = output_dir / 'archive' # Optimized wget command for deep, fast crawling cmd = [ 'wget', # WARC options (Internet Archive standard) f'--warc-file={warc_base}', '--warc-cdx', f'--warc-header=operator: GLAM Heritage Custodian Project', f'--warc-header=source: {url}', f'--warc-header=format: WARC/1.1', f'--warc-header=date: {datetime.now(timezone.utc).isoformat()}', # Deep recursive crawling (5 levels for comprehensive capture) '--recursive', '--level=5', '--page-requisites', '--no-parent', '--convert-links', '--adjust-extension', '--restrict-file-names=windows', # Speed optimizations '--wait=0', # No wait between requests (fast!) '--quota=100m', # Allow up to 100MB per site '--timeout=10', # Quick timeout per request '--tries=1', # Single try per resource '--no-verbose', '--no-check-certificate', # Handle SSL certificate issues # Accept more content types '--accept=html,htm,php,asp,aspx,jsp,xhtml,shtml', '--ignore-tags=img,script,link,style', # Skip heavy assets for speed # Output '--directory-prefix=' + str(mirror_dir), # User agent '--user-agent=Mozilla/5.0 (compatible; GLAMBot/1.0; Heritage Archiver)', # DNS and connection optimization '--dns-timeout=5', '--connect-timeout=10', '--read-timeout=15', url ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) warc_file = Path(str(warc_base) + '.warc.gz') # wget returns 0 (success) or 8 (some errors but got content) if result.returncode in [0, 8]: if warc_file.exists() and warc_file.stat().st_size > 500: return True, "", warc_file # Check for uncompressed warc_uncompressed = Path(str(warc_base) + '.warc') if warc_uncompressed.exists(): return True, "", warc_uncompressed return False, "WARC file not created or too small", None else: # Check if partial WARC exists if warc_file.exists() and warc_file.stat().st_size > 500: return True, f"partial (exit {result.returncode})", warc_file return False, f"wget exit {result.returncode}", None except subprocess.TimeoutExpired: # Check for partial WARC on timeout warc_file = Path(str(warc_base) + '.warc.gz') if warc_file.exists() and warc_file.stat().st_size > 500: return True, "timeout but partial saved", warc_file return False, f"Timeout after {timeout}s", None except Exception as e: return False, str(e), None def process_mirror(mirror_dir: Path, output_dir: Path, base_url: str) -> dict: """Process mirrored files to extract content and XPaths.""" pages_dir = output_dir / 'pages' pages_dir.mkdir(parents=True, exist_ok=True) pages = [] all_extractions = [] if not mirror_dir.exists(): return {'pages': [], 'extractions': [], 'total_pages': 0, 'processed_pages': 0} # Find all HTML files html_files = list(mirror_dir.rglob('*.html')) + list(mirror_dir.rglob('*.htm')) for html_file in html_files[:200]: # Process up to 200 pages try: with open(html_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() soup = BeautifulSoup(content, 'html.parser') title_elem = soup.find('title') title = title_elem.get_text(strip=True) if title_elem else html_file.stem extractions = extract_text_with_xpaths(soup) rel_path = html_file.relative_to(mirror_dir) flat_name = str(rel_path).replace('/', '_').replace('\\', '_') dest_file = pages_dir / flat_name shutil.copy2(html_file, dest_file) page_info = { 'title': title[:200], 'source_path': str(rel_path), 'archived_file': str(dest_file.relative_to(output_dir)), 'extractions_count': len(extractions) } pages.append(page_info) for ext in extractions: ext['page'] = str(rel_path) ext['html_file'] = str(dest_file.relative_to(ENTRIES_DIR)) all_extractions.extend(extractions) except Exception: pass # Skip problematic files silently return { 'pages': pages, 'extractions': all_extractions[:1000], # Keep more extractions 'total_pages': len(html_files), 'processed_pages': len(pages) } def get_urls_from_entry(data: dict) -> list[str]: """Extract all source URLs from an entry. Checks multiple sources for URLs: - web_enrichment.source_url and raw_sources - original_entry.webadres_organisatie - original_entry.reference[*].label (when URL) - wikidata_enrichment.wikidata_official_website - contact.website - digital_presence.website - nde_metadata.original_entry.webadres_organisatie """ urls = set() # 1. Web enrichment sources if 'web_enrichment' in data: we = data['web_enrichment'] if we.get('source_url'): urls.add(we['source_url']) for source in we.get('raw_sources', []): if source.get('url'): urls.add(source['url']) # 2. Original entry sources if 'original_entry' in data: oe = data['original_entry'] if oe.get('webadres_organisatie'): urls.add(oe['webadres_organisatie']) # Check reference array for URLs for ref in oe.get('reference', []): if isinstance(ref, dict) and ref.get('label', '').startswith('http'): urls.add(ref['label']) elif isinstance(ref, str) and ref.startswith('http'): urls.add(ref) # 3. Wikidata official website if 'wikidata_enrichment' in data: wd = data['wikidata_enrichment'] if wd.get('wikidata_official_website'): urls.add(wd['wikidata_official_website']) # 4. Contact section if 'contact' in data: contact = data['contact'] if contact.get('website'): urls.add(contact['website']) # 5. Digital presence section if 'digital_presence' in data: dp = data['digital_presence'] if dp.get('website'): urls.add(dp['website']) # 6. NDE metadata (nested original_entry) if 'nde_metadata' in data: nde = data['nde_metadata'] if 'original_entry' in nde: nde_oe = nde['original_entry'] if nde_oe.get('webadres_organisatie'): urls.add(nde_oe['webadres_organisatie']) # 7. Google Maps enrichment website if 'google_maps_enrichment' in data: gm = data['google_maps_enrichment'] if gm.get('website'): urls.add(gm['website']) return [u for u in urls if u and u.startswith('http')] def extract_entry_number(filename: str) -> str: """Extract entry number from filename.""" match = re.match(r'^(\d+)', filename) return match.group(1) if match else filename.replace('.yaml', '') def has_valid_warc(entry_num: str) -> bool: """Check if entry already has a valid WARC archive.""" entry_web_dir = WEB_DIR / entry_num if not entry_web_dir.exists(): return False for warc in entry_web_dir.rglob('*.warc.gz'): if warc.stat().st_size > 1000: return True return False def archive_single_entry(filepath: Path, force: bool = False) -> dict: """ Archive websites for a single entry. Returns dict with results for thread-safe processing. """ result = { 'filepath': filepath, 'entry_name': filepath.name, 'pages_archived': 0, 'sites_failed': 0, 'errors': [], 'status': 'pending' # pending, completed, failed, skipped } try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: result['status'] = 'failed' result['errors'].append(f"Read error: {e}") return result if not data: result['status'] = 'skipped' return result urls = get_urls_from_entry(data) if not urls: result['status'] = 'skipped' return result entry_num = extract_entry_number(filepath.name) # Check existing WARC if not force and has_valid_warc(entry_num): result['status'] = 'completed' return result web_archives = [] for url in urls[:2]: if SHUTDOWN_REQUESTED: break dirname = sanitize_dirname(url) site_dir = WEB_DIR / entry_num / dirname # Skip if already has WARC if not force and (site_dir / 'archive.warc.gz').exists(): if (site_dir / 'archive.warc.gz').stat().st_size > 1000: continue log_message(f" [{entry_num}] Archiving: {url}", also_print=True) # Clean and create directory if site_dir.exists(): shutil.rmtree(site_dir) site_dir.mkdir(parents=True, exist_ok=True) # Create WARC archive success, error, warc_file = wget_warc_archive(url, site_dir) if not success: # Check for partial WARC partial = site_dir / 'archive.warc.gz' if partial.exists() and partial.stat().st_size > 1000: warc_file = partial success = True log_message(f" [{entry_num}] Partial WARC saved ({partial.stat().st_size} bytes)") else: result['errors'].append(f"{url}: {error}") result['sites_failed'] += 1 continue mirror_dir = site_dir / 'mirror' # Find content directory content_dirs = [d for d in mirror_dir.iterdir() if d.is_dir()] if mirror_dir.exists() else [] actual_mirror = content_dirs[0] if content_dirs else mirror_dir # Process mirror proc_result = process_mirror(actual_mirror, site_dir, url) # WARC metadata warc_info = {} if warc_file and warc_file.exists(): warc_info = { 'warc_file': str(warc_file.relative_to(site_dir)), 'warc_size_bytes': warc_file.stat().st_size, 'warc_format': 'WARC/1.1', 'warc_standard': 'ISO 28500' } cdx_file = site_dir / 'archive.cdx' if cdx_file.exists(): warc_info['cdx_index'] = str(cdx_file.relative_to(site_dir)) # Save metadata metadata = { 'url': url, 'archive_timestamp': datetime.now(timezone.utc).isoformat(), 'archive_method': 'wget_warc_deep', 'archive_standard': 'Internet Archive WARC/1.1 (ISO 28500)', 'crawl_depth': 5, 'entry_file': filepath.name, 'warc': warc_info, 'total_pages': proc_result['total_pages'], 'processed_pages': proc_result['processed_pages'], 'pages': proc_result['pages'], 'extractions': proc_result['extractions'] } with open(site_dir / 'metadata.yaml', 'w', encoding='utf-8') as f: yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False) web_archives.append({ 'url': url, 'directory': str(site_dir.relative_to(ENTRIES_DIR)), 'pages_archived': proc_result['processed_pages'], 'archive_method': 'wget_warc_deep', 'warc_file': warc_info.get('warc_file'), 'warc_size_bytes': warc_info.get('warc_size_bytes'), 'warc_format': 'ISO 28500' }) result['pages_archived'] += proc_result['processed_pages'] log_message(f" [{entry_num}] āœ“ {proc_result['processed_pages']} pages from {proc_result['total_pages']} found") # Update entry YAML if web_archives: if 'web_enrichment' not in data: data['web_enrichment'] = {} data['web_enrichment']['web_archives'] = web_archives data['web_enrichment']['full_site_archive_timestamp'] = datetime.now(timezone.utc).isoformat() with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) result['status'] = 'failed' if result['sites_failed'] > 0 else 'completed' return result def get_entries_to_process(state: dict, force: bool = False) -> list[Path]: """Get list of entry files that need processing.""" all_files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if not f.name.startswith('.')]) if force: return all_files completed = set(state.get('completed_entries', [])) skipped = set(state.get('skipped_entries', [])) return [f for f in all_files if f.name not in completed and f.name not in skipped] def print_status(state: dict): """Print current archiving status.""" print("\n" + "="*60) print("šŸ“Š WARC ARCHIVING STATUS (Deep Parallel Mode)") print("="*60) completed = len(state.get('completed_entries', [])) failed = len(state.get('failed_entries', [])) skipped = len(state.get('skipped_entries', [])) # Count total entries with URLs total_with_urls = 0 total_entries = 0 for f in ENTRIES_DIR.glob('*.yaml'): if f.name.startswith('.'): continue total_entries += 1 try: with open(f, 'r', encoding='utf-8') as fh: data = yaml.safe_load(fh) if data and get_urls_from_entry(data): total_with_urls += 1 except: pass # Count WARC files and total size warc_files = list(WEB_DIR.rglob('*.warc.gz')) warc_count = len(warc_files) total_size = sum(f.stat().st_size for f in warc_files) print(f"\nšŸ“ Total entries: {total_entries}") print(f"🌐 Entries with URLs: {total_with_urls}") print(f"āœ… Completed: {completed}") print(f"āŒ Failed: {failed}") print(f"ā­ļø Skipped (no URL): {skipped}") print(f"šŸ“¦ WARC files: {warc_count}") print(f"šŸ’¾ Total WARC size: {total_size / (1024*1024):.1f} MB") print(f"šŸ“„ Pages archived: {state.get('total_pages_archived', 0)}") remaining = total_with_urls - completed - failed print(f"\nā³ Remaining: {remaining}") in_progress = state.get('in_progress', []) if in_progress: print(f"šŸ”„ In progress: {len(in_progress)} entries") if state.get('last_updated'): print(f"šŸ• Last updated: {state['last_updated']}") if state.get('run_count'): print(f"šŸ” Run count: {state['run_count']}") print("\n" + "="*60) def main(): global SHUTDOWN_REQUESTED parser = argparse.ArgumentParser( description='Archive websites with WARC format (deep parallel crawling)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --continuous --workers 4 Run with 4 parallel workers %(prog)s --limit 50 --workers 8 Process 50 entries with 8 workers %(prog)s --entry 0423 Process specific entry %(prog)s --status Show current progress %(prog)s --reset Reset state and start fresh """ ) parser.add_argument('--continuous', action='store_true', help='Run continuously until all entries are processed') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries to process') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers (default: 4)') parser.add_argument('--force', action='store_true', help='Re-archive even if exists') parser.add_argument('--status', action='store_true', help='Show current archiving status') parser.add_argument('--reset', action='store_true', help='Reset state and start fresh') args = parser.parse_args() # Load state state = load_state() if args.status: print_status(state) return 0 if args.reset: if STATE_FILE.exists(): STATE_FILE.unlink() if LOG_FILE.exists(): LOG_FILE.unlink() print("āœ… State reset. Run again to start fresh.") return 0 if not HAS_DEPS: print("Error: Required dependencies not installed.") print("Run: pip install beautifulsoup4") return 1 # Check wget try: subprocess.run(['wget', '--version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): print("Error: wget not found. Please install wget.") return 1 # Set up signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Update run count state['run_count'] = state.get('run_count', 0) + 1 if not state.get('started_at'): state['started_at'] = datetime.now(timezone.utc).isoformat() # Get files to process if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = get_entries_to_process(state, args.force) if args.limit: files = files[:args.limit] if not files: log_message("āœ… No entries to process. All done!") print_status(state) return 0 num_workers = min(args.workers, len(files)) log_message(f"\nšŸš€ Starting DEEP WARC archiving (Run #{state['run_count']})") log_message(f" Entries to process: {len(files)}") log_message(f" Parallel workers: {num_workers}") log_message(f" Crawl depth: 5 levels") session_pages = 0 session_failed = 0 session_completed = 0 # Process entries in parallel with ThreadPoolExecutor(max_workers=num_workers) as executor: # Submit all tasks future_to_file = { executor.submit(archive_single_entry, f, args.force): f for f in files } state['in_progress'] = [f.name for f in files[:num_workers]] save_state(state) for future in as_completed(future_to_file): if SHUTDOWN_REQUESTED: log_message("šŸ›‘ Shutdown requested, waiting for current tasks...") executor.shutdown(wait=True, cancel_futures=True) break filepath = future_to_file[future] try: result = future.result() session_pages += result['pages_archived'] state['total_pages_archived'] = state.get('total_pages_archived', 0) + result['pages_archived'] if result['status'] == 'completed': state['completed_entries'].append(result['entry_name']) session_completed += 1 elif result['status'] == 'failed': state['failed_entries'].append(result['entry_name']) session_failed += result['sites_failed'] state['total_sites_failed'] = state.get('total_sites_failed', 0) + result['sites_failed'] for e in result['errors']: log_message(f" āŒ {result['entry_name']}: {e}") elif result['status'] == 'skipped': state['skipped_entries'].append(result['entry_name']) # Update in_progress if result['entry_name'] in state.get('in_progress', []): state['in_progress'].remove(result['entry_name']) save_state(state) except Exception as e: log_message(f"āŒ Exception processing {filepath.name}: {e}") state['failed_entries'].append(filepath.name) save_state(state) state['in_progress'] = [] save_state(state) # Final summary log_message(f"\n{'='*60}") log_message(f"šŸ“Š SESSION SUMMARY") log_message(f"{'='*60}") log_message(f" Entries completed: {session_completed}") log_message(f" Pages archived: {session_pages}") log_message(f" Sites failed: {session_failed}") if SHUTDOWN_REQUESTED: log_message(f"\nšŸ’¾ State saved. Run again to resume.") else: remaining = len(get_entries_to_process(state, False)) if remaining > 0: log_message(f"\nā³ {remaining} entries remaining. Run again to continue.") else: log_message(f"\nšŸŽ‰ All entries processed!") print_status(state) return 0 if session_failed == 0 else 1 if __name__ == '__main__': sys.exit(main())