833 lines
29 KiB
Python
833 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Archive entire websites using wget with WARC format (Internet Archive standard).
|
|
|
|
RESUMABLE: This script maintains state and can be interrupted and resumed at any time.
|
|
State is tracked in data/nde/enriched/entries/web/_archive_state.yaml
|
|
|
|
PARALLEL: Uses ThreadPoolExecutor for concurrent website archiving.
|
|
- Level 1: Multiple websites archived simultaneously (--workers)
|
|
- Level 2: Deep recursive crawling (level 5) for comprehensive content
|
|
|
|
This script:
|
|
1. Uses wget to recursively download entire websites (5 levels deep)
|
|
2. Creates ISO 28500 WARC files (.warc.gz) for archival
|
|
3. Generates CDX index files for random access
|
|
4. Extracts content with XPath provenance for verification
|
|
5. Stores everything in data/nde/enriched/entries/web/{entry_number}/
|
|
|
|
Requirements:
|
|
- wget (with WARC support - wget 1.14+)
|
|
- beautifulsoup4
|
|
|
|
Usage:
|
|
# Run continuously with 4 parallel workers (resumable)
|
|
python scripts/archive_website_full.py --continuous --workers 4
|
|
|
|
# Run with limit
|
|
python scripts/archive_website_full.py --limit 50 --workers 8
|
|
|
|
# Process specific entry
|
|
python scripts/archive_website_full.py --entry 0423
|
|
|
|
# Show current progress
|
|
python scripts/archive_website_full.py --status
|
|
|
|
# Reset state and start fresh
|
|
python scripts/archive_website_full.py --reset
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Lock
|
|
from urllib.parse import urlparse
|
|
import shutil
|
|
|
|
import yaml
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
HAS_DEPS = True
|
|
except ImportError as e:
|
|
HAS_DEPS = False
|
|
print(f"Warning: Missing dependency: {e}")
|
|
print("Install with: pip install beautifulsoup4")
|
|
|
|
# Directories
|
|
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
|
|
WEB_DIR = ENTRIES_DIR / 'web'
|
|
STATE_FILE = WEB_DIR / '_archive_state.yaml'
|
|
LOG_FILE = WEB_DIR / '_archive_log.txt'
|
|
|
|
# Thread-safe logging
|
|
LOG_LOCK = Lock()
|
|
|
|
# Global flag for graceful shutdown
|
|
SHUTDOWN_REQUESTED = False
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle interrupt signals gracefully."""
|
|
global SHUTDOWN_REQUESTED
|
|
print("\n\n⚠️ Shutdown requested. Finishing current entries and saving state...")
|
|
SHUTDOWN_REQUESTED = True
|
|
|
|
|
|
def load_state() -> dict:
|
|
"""Load archive state from file."""
|
|
if STATE_FILE.exists():
|
|
with open(STATE_FILE, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f) or {}
|
|
return {
|
|
'completed_entries': [],
|
|
'failed_entries': [],
|
|
'skipped_entries': [],
|
|
'in_progress': [],
|
|
'total_pages_archived': 0,
|
|
'total_sites_failed': 0,
|
|
'started_at': None,
|
|
'last_updated': None,
|
|
'run_count': 0
|
|
}
|
|
|
|
|
|
def save_state(state: dict):
|
|
"""Save archive state to file (thread-safe)."""
|
|
with LOG_LOCK:
|
|
state['last_updated'] = datetime.now(timezone.utc).isoformat()
|
|
WEB_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write atomically
|
|
temp_file = STATE_FILE.with_suffix('.tmp')
|
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(state, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
temp_file.rename(STATE_FILE)
|
|
|
|
|
|
def log_message(message: str, also_print: bool = True):
|
|
"""Append message to log file (thread-safe)."""
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
|
with LOG_LOCK:
|
|
with open(LOG_FILE, 'a', encoding='utf-8') as f:
|
|
f.write(f"[{timestamp}] {message}\n")
|
|
if also_print:
|
|
print(message)
|
|
|
|
|
|
def sanitize_dirname(url: str) -> str:
|
|
"""Create a safe directory name from a URL."""
|
|
parsed = urlparse(url)
|
|
name = parsed.netloc.replace('www.', '')
|
|
name = re.sub(r'[^\w\-.]', '_', name)
|
|
return name
|
|
|
|
|
|
def get_xpath(element) -> str:
|
|
"""Generate XPath for a BeautifulSoup element."""
|
|
parts = []
|
|
while element and element.name:
|
|
siblings = element.find_previous_siblings(element.name)
|
|
index = len(siblings) + 1
|
|
parts.insert(0, f"{element.name}[{index}]")
|
|
element = element.parent
|
|
return '/' + '/'.join(parts) if parts else '/'
|
|
|
|
|
|
def extract_text_with_xpaths(soup, max_items: int = 300) -> list[dict]:
|
|
"""Extract text content with XPath locations."""
|
|
extractions = []
|
|
|
|
# Extract headings
|
|
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
|
|
for elem in soup.find_all(tag):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 2:
|
|
extractions.append({
|
|
'text': text,
|
|
'xpath': get_xpath(elem),
|
|
'tag': tag,
|
|
'classes': list(elem.get('class', []))
|
|
})
|
|
|
|
# Extract paragraphs
|
|
for elem in soup.find_all('p'):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 15:
|
|
extractions.append({
|
|
'text': text[:500],
|
|
'xpath': get_xpath(elem),
|
|
'tag': 'p',
|
|
'classes': list(elem.get('class', []))
|
|
})
|
|
|
|
# Extract list items
|
|
for elem in soup.find_all('li'):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 8:
|
|
extractions.append({
|
|
'text': text[:200],
|
|
'xpath': get_xpath(elem),
|
|
'tag': 'li',
|
|
'classes': list(elem.get('class', []))
|
|
})
|
|
|
|
# Extract table cells
|
|
for elem in soup.find_all(['td', 'th']):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 5:
|
|
extractions.append({
|
|
'text': text[:200],
|
|
'xpath': get_xpath(elem),
|
|
'tag': elem.name,
|
|
'classes': list(elem.get('class', []))
|
|
})
|
|
|
|
# Extract links with href
|
|
for elem in soup.find_all('a', href=True):
|
|
text = elem.get_text(strip=True)
|
|
href = elem.get('href', '')
|
|
if text and len(text) > 2:
|
|
extractions.append({
|
|
'text': text[:200],
|
|
'xpath': get_xpath(elem),
|
|
'tag': 'a',
|
|
'href': href[:500],
|
|
'classes': list(elem.get('class', []))
|
|
})
|
|
|
|
# Extract spans/divs with semantic classes
|
|
for elem in soup.find_all(['address', 'span', 'div', 'section', 'article'],
|
|
class_=lambda x: x and any(
|
|
t in str(x).lower() for t in
|
|
['address', 'contact', 'phone', 'email', 'location',
|
|
'description', 'content', 'main', 'body', 'text',
|
|
'info', 'detail', 'about', 'bio', 'summary']
|
|
)):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 10:
|
|
extractions.append({
|
|
'text': text[:500],
|
|
'xpath': get_xpath(elem),
|
|
'tag': elem.name,
|
|
'classes': list(elem.get('class', [])),
|
|
'type': 'semantic_content'
|
|
})
|
|
|
|
return extractions[:max_items]
|
|
|
|
|
|
def wget_warc_archive(url: str, output_dir: Path, timeout: int = 180) -> tuple[bool, str, Path | None]:
|
|
"""
|
|
Use wget to create a comprehensive WARC archive of a website.
|
|
|
|
Deep crawling with level=5 for maximum content extraction.
|
|
Optimized for speed with reduced wait times.
|
|
|
|
Args:
|
|
url: Starting URL
|
|
output_dir: Directory to store archive
|
|
timeout: Total timeout in seconds (default 180 = 3 minutes)
|
|
|
|
Returns:
|
|
Tuple of (success, error_message, warc_file_path)
|
|
"""
|
|
mirror_dir = output_dir / 'mirror'
|
|
mirror_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
warc_base = output_dir / 'archive'
|
|
|
|
# Optimized wget command for deep, fast crawling
|
|
cmd = [
|
|
'wget',
|
|
# WARC options (Internet Archive standard)
|
|
f'--warc-file={warc_base}',
|
|
'--warc-cdx',
|
|
f'--warc-header=operator: GLAM Heritage Custodian Project',
|
|
f'--warc-header=source: {url}',
|
|
f'--warc-header=format: WARC/1.1',
|
|
f'--warc-header=date: {datetime.now(timezone.utc).isoformat()}',
|
|
# Deep recursive crawling (5 levels for comprehensive capture)
|
|
'--recursive',
|
|
'--level=5',
|
|
'--page-requisites',
|
|
'--no-parent',
|
|
'--convert-links',
|
|
'--adjust-extension',
|
|
'--restrict-file-names=windows',
|
|
# Speed optimizations
|
|
'--wait=0', # No wait between requests (fast!)
|
|
'--quota=100m', # Allow up to 100MB per site
|
|
'--timeout=10', # Quick timeout per request
|
|
'--tries=1', # Single try per resource
|
|
'--no-verbose',
|
|
'--no-check-certificate', # Handle SSL certificate issues
|
|
# Accept more content types
|
|
'--accept=html,htm,php,asp,aspx,jsp,xhtml,shtml',
|
|
'--ignore-tags=img,script,link,style', # Skip heavy assets for speed
|
|
# Output
|
|
'--directory-prefix=' + str(mirror_dir),
|
|
# User agent
|
|
'--user-agent=Mozilla/5.0 (compatible; GLAMBot/1.0; Heritage Archiver)',
|
|
# DNS and connection optimization
|
|
'--dns-timeout=5',
|
|
'--connect-timeout=10',
|
|
'--read-timeout=15',
|
|
url
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
|
|
warc_file = Path(str(warc_base) + '.warc.gz')
|
|
|
|
# wget returns 0 (success) or 8 (some errors but got content)
|
|
if result.returncode in [0, 8]:
|
|
if warc_file.exists() and warc_file.stat().st_size > 500:
|
|
return True, "", warc_file
|
|
# Check for uncompressed
|
|
warc_uncompressed = Path(str(warc_base) + '.warc')
|
|
if warc_uncompressed.exists():
|
|
return True, "", warc_uncompressed
|
|
return False, "WARC file not created or too small", None
|
|
else:
|
|
# Check if partial WARC exists
|
|
if warc_file.exists() and warc_file.stat().st_size > 500:
|
|
return True, f"partial (exit {result.returncode})", warc_file
|
|
return False, f"wget exit {result.returncode}", None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
# Check for partial WARC on timeout
|
|
warc_file = Path(str(warc_base) + '.warc.gz')
|
|
if warc_file.exists() and warc_file.stat().st_size > 500:
|
|
return True, "timeout but partial saved", warc_file
|
|
return False, f"Timeout after {timeout}s", None
|
|
except Exception as e:
|
|
return False, str(e), None
|
|
|
|
|
|
def process_mirror(mirror_dir: Path, output_dir: Path, base_url: str) -> dict:
|
|
"""Process mirrored files to extract content and XPaths."""
|
|
pages_dir = output_dir / 'pages'
|
|
pages_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
pages = []
|
|
all_extractions = []
|
|
|
|
if not mirror_dir.exists():
|
|
return {'pages': [], 'extractions': [], 'total_pages': 0, 'processed_pages': 0}
|
|
|
|
# Find all HTML files
|
|
html_files = list(mirror_dir.rglob('*.html')) + list(mirror_dir.rglob('*.htm'))
|
|
|
|
for html_file in html_files[:200]: # Process up to 200 pages
|
|
try:
|
|
with open(html_file, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content = f.read()
|
|
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
|
|
title_elem = soup.find('title')
|
|
title = title_elem.get_text(strip=True) if title_elem else html_file.stem
|
|
|
|
extractions = extract_text_with_xpaths(soup)
|
|
|
|
rel_path = html_file.relative_to(mirror_dir)
|
|
flat_name = str(rel_path).replace('/', '_').replace('\\', '_')
|
|
dest_file = pages_dir / flat_name
|
|
shutil.copy2(html_file, dest_file)
|
|
|
|
page_info = {
|
|
'title': title[:200],
|
|
'source_path': str(rel_path),
|
|
'archived_file': str(dest_file.relative_to(output_dir)),
|
|
'extractions_count': len(extractions)
|
|
}
|
|
pages.append(page_info)
|
|
|
|
for ext in extractions:
|
|
ext['page'] = str(rel_path)
|
|
ext['html_file'] = str(dest_file.relative_to(ENTRIES_DIR))
|
|
all_extractions.extend(extractions)
|
|
|
|
except Exception:
|
|
pass # Skip problematic files silently
|
|
|
|
return {
|
|
'pages': pages,
|
|
'extractions': all_extractions[:1000], # Keep more extractions
|
|
'total_pages': len(html_files),
|
|
'processed_pages': len(pages)
|
|
}
|
|
|
|
|
|
def get_urls_from_entry(data: dict) -> list[str]:
|
|
"""Extract all source URLs from an entry.
|
|
|
|
Checks multiple sources for URLs:
|
|
- web_enrichment.source_url and raw_sources
|
|
- original_entry.webadres_organisatie
|
|
- original_entry.reference[*].label (when URL)
|
|
- wikidata_enrichment.wikidata_official_website
|
|
- contact.website
|
|
- digital_presence.website
|
|
- nde_metadata.original_entry.webadres_organisatie
|
|
"""
|
|
urls = set()
|
|
|
|
# 1. Web enrichment sources
|
|
if 'web_enrichment' in data:
|
|
we = data['web_enrichment']
|
|
if we.get('source_url'):
|
|
urls.add(we['source_url'])
|
|
for source in we.get('raw_sources', []):
|
|
if source.get('url'):
|
|
urls.add(source['url'])
|
|
|
|
# 2. Original entry sources
|
|
if 'original_entry' in data:
|
|
oe = data['original_entry']
|
|
if oe.get('webadres_organisatie'):
|
|
urls.add(oe['webadres_organisatie'])
|
|
# Check reference array for URLs
|
|
for ref in oe.get('reference', []):
|
|
if isinstance(ref, dict) and ref.get('label', '').startswith('http'):
|
|
urls.add(ref['label'])
|
|
elif isinstance(ref, str) and ref.startswith('http'):
|
|
urls.add(ref)
|
|
|
|
# 3. Wikidata official website
|
|
if 'wikidata_enrichment' in data:
|
|
wd = data['wikidata_enrichment']
|
|
if wd.get('wikidata_official_website'):
|
|
urls.add(wd['wikidata_official_website'])
|
|
|
|
# 4. Contact section
|
|
if 'contact' in data:
|
|
contact = data['contact']
|
|
if contact.get('website'):
|
|
urls.add(contact['website'])
|
|
|
|
# 5. Digital presence section
|
|
if 'digital_presence' in data:
|
|
dp = data['digital_presence']
|
|
if dp.get('website'):
|
|
urls.add(dp['website'])
|
|
|
|
# 6. NDE metadata (nested original_entry)
|
|
if 'nde_metadata' in data:
|
|
nde = data['nde_metadata']
|
|
if 'original_entry' in nde:
|
|
nde_oe = nde['original_entry']
|
|
if nde_oe.get('webadres_organisatie'):
|
|
urls.add(nde_oe['webadres_organisatie'])
|
|
|
|
return [u for u in urls if u and u.startswith('http')]
|
|
|
|
|
|
def extract_entry_number(filename: str) -> str:
|
|
"""Extract entry number from filename."""
|
|
match = re.match(r'^(\d+)', filename)
|
|
return match.group(1) if match else filename.replace('.yaml', '')
|
|
|
|
|
|
def has_valid_warc(entry_num: str) -> bool:
|
|
"""Check if entry already has a valid WARC archive."""
|
|
entry_web_dir = WEB_DIR / entry_num
|
|
if not entry_web_dir.exists():
|
|
return False
|
|
for warc in entry_web_dir.rglob('*.warc.gz'):
|
|
if warc.stat().st_size > 1000:
|
|
return True
|
|
return False
|
|
|
|
|
|
def archive_single_entry(filepath: Path, force: bool = False) -> dict:
|
|
"""
|
|
Archive websites for a single entry.
|
|
|
|
Returns dict with results for thread-safe processing.
|
|
"""
|
|
result = {
|
|
'filepath': filepath,
|
|
'entry_name': filepath.name,
|
|
'pages_archived': 0,
|
|
'sites_failed': 0,
|
|
'errors': [],
|
|
'status': 'pending' # pending, completed, failed, skipped
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
result['status'] = 'failed'
|
|
result['errors'].append(f"Read error: {e}")
|
|
return result
|
|
|
|
if not data:
|
|
result['status'] = 'skipped'
|
|
return result
|
|
|
|
urls = get_urls_from_entry(data)
|
|
if not urls:
|
|
result['status'] = 'skipped'
|
|
return result
|
|
|
|
entry_num = extract_entry_number(filepath.name)
|
|
|
|
# Check existing WARC
|
|
if not force and has_valid_warc(entry_num):
|
|
result['status'] = 'completed'
|
|
return result
|
|
|
|
web_archives = []
|
|
|
|
for url in urls[:2]:
|
|
if SHUTDOWN_REQUESTED:
|
|
break
|
|
|
|
dirname = sanitize_dirname(url)
|
|
site_dir = WEB_DIR / entry_num / dirname
|
|
|
|
# Skip if already has WARC
|
|
if not force and (site_dir / 'archive.warc.gz').exists():
|
|
if (site_dir / 'archive.warc.gz').stat().st_size > 1000:
|
|
continue
|
|
|
|
log_message(f" [{entry_num}] Archiving: {url}", also_print=True)
|
|
|
|
# Clean and create directory
|
|
if site_dir.exists():
|
|
shutil.rmtree(site_dir)
|
|
site_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create WARC archive
|
|
success, error, warc_file = wget_warc_archive(url, site_dir)
|
|
|
|
if not success:
|
|
# Check for partial WARC
|
|
partial = site_dir / 'archive.warc.gz'
|
|
if partial.exists() and partial.stat().st_size > 1000:
|
|
warc_file = partial
|
|
success = True
|
|
log_message(f" [{entry_num}] Partial WARC saved ({partial.stat().st_size} bytes)")
|
|
else:
|
|
result['errors'].append(f"{url}: {error}")
|
|
result['sites_failed'] += 1
|
|
continue
|
|
|
|
mirror_dir = site_dir / 'mirror'
|
|
|
|
# Find content directory
|
|
content_dirs = [d for d in mirror_dir.iterdir() if d.is_dir()] if mirror_dir.exists() else []
|
|
actual_mirror = content_dirs[0] if content_dirs else mirror_dir
|
|
|
|
# Process mirror
|
|
proc_result = process_mirror(actual_mirror, site_dir, url)
|
|
|
|
# WARC metadata
|
|
warc_info = {}
|
|
if warc_file and warc_file.exists():
|
|
warc_info = {
|
|
'warc_file': str(warc_file.relative_to(site_dir)),
|
|
'warc_size_bytes': warc_file.stat().st_size,
|
|
'warc_format': 'WARC/1.1',
|
|
'warc_standard': 'ISO 28500'
|
|
}
|
|
cdx_file = site_dir / 'archive.cdx'
|
|
if cdx_file.exists():
|
|
warc_info['cdx_index'] = str(cdx_file.relative_to(site_dir))
|
|
|
|
# Save metadata
|
|
metadata = {
|
|
'url': url,
|
|
'archive_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'archive_method': 'wget_warc_deep',
|
|
'archive_standard': 'Internet Archive WARC/1.1 (ISO 28500)',
|
|
'crawl_depth': 5,
|
|
'entry_file': filepath.name,
|
|
'warc': warc_info,
|
|
'total_pages': proc_result['total_pages'],
|
|
'processed_pages': proc_result['processed_pages'],
|
|
'pages': proc_result['pages'],
|
|
'extractions': proc_result['extractions']
|
|
}
|
|
|
|
with open(site_dir / 'metadata.yaml', 'w', encoding='utf-8') as f:
|
|
yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
web_archives.append({
|
|
'url': url,
|
|
'directory': str(site_dir.relative_to(ENTRIES_DIR)),
|
|
'pages_archived': proc_result['processed_pages'],
|
|
'archive_method': 'wget_warc_deep',
|
|
'warc_file': warc_info.get('warc_file'),
|
|
'warc_size_bytes': warc_info.get('warc_size_bytes'),
|
|
'warc_format': 'ISO 28500'
|
|
})
|
|
|
|
result['pages_archived'] += proc_result['processed_pages']
|
|
log_message(f" [{entry_num}] ✓ {proc_result['processed_pages']} pages from {proc_result['total_pages']} found")
|
|
|
|
# Update entry YAML
|
|
if web_archives:
|
|
if 'web_enrichment' not in data:
|
|
data['web_enrichment'] = {}
|
|
|
|
data['web_enrichment']['web_archives'] = web_archives
|
|
data['web_enrichment']['full_site_archive_timestamp'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
result['status'] = 'failed' if result['sites_failed'] > 0 else 'completed'
|
|
return result
|
|
|
|
|
|
def get_entries_to_process(state: dict, force: bool = False) -> list[Path]:
|
|
"""Get list of entry files that need processing."""
|
|
all_files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if not f.name.startswith('.')])
|
|
|
|
if force:
|
|
return all_files
|
|
|
|
completed = set(state.get('completed_entries', []))
|
|
skipped = set(state.get('skipped_entries', []))
|
|
|
|
return [f for f in all_files if f.name not in completed and f.name not in skipped]
|
|
|
|
|
|
def print_status(state: dict):
|
|
"""Print current archiving status."""
|
|
print("\n" + "="*60)
|
|
print("📊 WARC ARCHIVING STATUS (Deep Parallel Mode)")
|
|
print("="*60)
|
|
|
|
completed = len(state.get('completed_entries', []))
|
|
failed = len(state.get('failed_entries', []))
|
|
skipped = len(state.get('skipped_entries', []))
|
|
|
|
# Count total entries with URLs
|
|
total_with_urls = 0
|
|
total_entries = 0
|
|
for f in ENTRIES_DIR.glob('*.yaml'):
|
|
if f.name.startswith('.'):
|
|
continue
|
|
total_entries += 1
|
|
try:
|
|
with open(f, 'r', encoding='utf-8') as fh:
|
|
data = yaml.safe_load(fh)
|
|
if data and get_urls_from_entry(data):
|
|
total_with_urls += 1
|
|
except:
|
|
pass
|
|
|
|
# Count WARC files and total size
|
|
warc_files = list(WEB_DIR.rglob('*.warc.gz'))
|
|
warc_count = len(warc_files)
|
|
total_size = sum(f.stat().st_size for f in warc_files)
|
|
|
|
print(f"\n📁 Total entries: {total_entries}")
|
|
print(f"🌐 Entries with URLs: {total_with_urls}")
|
|
print(f"✅ Completed: {completed}")
|
|
print(f"❌ Failed: {failed}")
|
|
print(f"⏭️ Skipped (no URL): {skipped}")
|
|
print(f"📦 WARC files: {warc_count}")
|
|
print(f"💾 Total WARC size: {total_size / (1024*1024):.1f} MB")
|
|
print(f"📄 Pages archived: {state.get('total_pages_archived', 0)}")
|
|
|
|
remaining = total_with_urls - completed - failed
|
|
print(f"\n⏳ Remaining: {remaining}")
|
|
|
|
in_progress = state.get('in_progress', [])
|
|
if in_progress:
|
|
print(f"🔄 In progress: {len(in_progress)} entries")
|
|
if state.get('last_updated'):
|
|
print(f"🕐 Last updated: {state['last_updated']}")
|
|
if state.get('run_count'):
|
|
print(f"🔁 Run count: {state['run_count']}")
|
|
|
|
print("\n" + "="*60)
|
|
|
|
|
|
def main():
|
|
global SHUTDOWN_REQUESTED
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Archive websites with WARC format (deep parallel crawling)',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s --continuous --workers 4 Run with 4 parallel workers
|
|
%(prog)s --limit 50 --workers 8 Process 50 entries with 8 workers
|
|
%(prog)s --entry 0423 Process specific entry
|
|
%(prog)s --status Show current progress
|
|
%(prog)s --reset Reset state and start fresh
|
|
"""
|
|
)
|
|
parser.add_argument('--continuous', action='store_true',
|
|
help='Run continuously until all entries are processed')
|
|
parser.add_argument('--limit', type=int, default=None,
|
|
help='Limit number of entries to process')
|
|
parser.add_argument('--entry', type=str, default=None,
|
|
help='Process specific entry number')
|
|
parser.add_argument('--workers', type=int, default=4,
|
|
help='Number of parallel workers (default: 4)')
|
|
parser.add_argument('--force', action='store_true',
|
|
help='Re-archive even if exists')
|
|
parser.add_argument('--status', action='store_true',
|
|
help='Show current archiving status')
|
|
parser.add_argument('--reset', action='store_true',
|
|
help='Reset state and start fresh')
|
|
args = parser.parse_args()
|
|
|
|
# Load state
|
|
state = load_state()
|
|
|
|
if args.status:
|
|
print_status(state)
|
|
return 0
|
|
|
|
if args.reset:
|
|
if STATE_FILE.exists():
|
|
STATE_FILE.unlink()
|
|
if LOG_FILE.exists():
|
|
LOG_FILE.unlink()
|
|
print("✅ State reset. Run again to start fresh.")
|
|
return 0
|
|
|
|
if not HAS_DEPS:
|
|
print("Error: Required dependencies not installed.")
|
|
print("Run: pip install beautifulsoup4")
|
|
return 1
|
|
|
|
# Check wget
|
|
try:
|
|
subprocess.run(['wget', '--version'], capture_output=True, check=True)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
print("Error: wget not found. Please install wget.")
|
|
return 1
|
|
|
|
# Set up signal handlers
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Update run count
|
|
state['run_count'] = state.get('run_count', 0) + 1
|
|
if not state.get('started_at'):
|
|
state['started_at'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Get files to process
|
|
if args.entry:
|
|
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
|
|
else:
|
|
files = get_entries_to_process(state, args.force)
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
|
|
if not files:
|
|
log_message("✅ No entries to process. All done!")
|
|
print_status(state)
|
|
return 0
|
|
|
|
num_workers = min(args.workers, len(files))
|
|
|
|
log_message(f"\n🚀 Starting DEEP WARC archiving (Run #{state['run_count']})")
|
|
log_message(f" Entries to process: {len(files)}")
|
|
log_message(f" Parallel workers: {num_workers}")
|
|
log_message(f" Crawl depth: 5 levels")
|
|
|
|
session_pages = 0
|
|
session_failed = 0
|
|
session_completed = 0
|
|
|
|
# Process entries in parallel
|
|
with ThreadPoolExecutor(max_workers=num_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_file = {
|
|
executor.submit(archive_single_entry, f, args.force): f
|
|
for f in files
|
|
}
|
|
|
|
state['in_progress'] = [f.name for f in files[:num_workers]]
|
|
save_state(state)
|
|
|
|
for future in as_completed(future_to_file):
|
|
if SHUTDOWN_REQUESTED:
|
|
log_message("🛑 Shutdown requested, waiting for current tasks...")
|
|
executor.shutdown(wait=True, cancel_futures=True)
|
|
break
|
|
|
|
filepath = future_to_file[future]
|
|
|
|
try:
|
|
result = future.result()
|
|
|
|
session_pages += result['pages_archived']
|
|
state['total_pages_archived'] = state.get('total_pages_archived', 0) + result['pages_archived']
|
|
|
|
if result['status'] == 'completed':
|
|
state['completed_entries'].append(result['entry_name'])
|
|
session_completed += 1
|
|
elif result['status'] == 'failed':
|
|
state['failed_entries'].append(result['entry_name'])
|
|
session_failed += result['sites_failed']
|
|
state['total_sites_failed'] = state.get('total_sites_failed', 0) + result['sites_failed']
|
|
for e in result['errors']:
|
|
log_message(f" ❌ {result['entry_name']}: {e}")
|
|
elif result['status'] == 'skipped':
|
|
state['skipped_entries'].append(result['entry_name'])
|
|
|
|
# Update in_progress
|
|
if result['entry_name'] in state.get('in_progress', []):
|
|
state['in_progress'].remove(result['entry_name'])
|
|
|
|
save_state(state)
|
|
|
|
except Exception as e:
|
|
log_message(f"❌ Exception processing {filepath.name}: {e}")
|
|
state['failed_entries'].append(filepath.name)
|
|
save_state(state)
|
|
|
|
state['in_progress'] = []
|
|
save_state(state)
|
|
|
|
# Final summary
|
|
log_message(f"\n{'='*60}")
|
|
log_message(f"📊 SESSION SUMMARY")
|
|
log_message(f"{'='*60}")
|
|
log_message(f" Entries completed: {session_completed}")
|
|
log_message(f" Pages archived: {session_pages}")
|
|
log_message(f" Sites failed: {session_failed}")
|
|
|
|
if SHUTDOWN_REQUESTED:
|
|
log_message(f"\n💾 State saved. Run again to resume.")
|
|
else:
|
|
remaining = len(get_entries_to_process(state, False))
|
|
if remaining > 0:
|
|
log_message(f"\n⏳ {remaining} entries remaining. Run again to continue.")
|
|
else:
|
|
log_message(f"\n🎉 All entries processed!")
|
|
|
|
print_status(state)
|
|
|
|
return 0 if session_failed == 0 else 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|