glam/scripts/fetch_and_extract_url.py
kempersc 55e2cd2340 feat: implement LLM-based extraction for Archives Lab content
- Introduced `llm_extract_archiveslab.py` script for entity and relationship extraction using LLMAnnotator with GLAM-NER v1.7.0.
- Replaced regex-based extraction with generative LLM inference.
- Added functions for loading markdown content, converting annotation sessions to dictionaries, and generating extraction statistics.
- Implemented comprehensive logging of extraction results, including counts of entities, relationships, and specific types like heritage institutions and persons.
- Results and statistics are saved in JSON format for further analysis.
2025-12-05 23:16:21 +01:00

518 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Fetch a URL using Playwright and extract GLAM claims.
This script:
1. Archives a webpage using Playwright (HTML + markdown)
2. Extracts entity claims using LLMAnnotator
3. Generates triples for relationships
4. Outputs structured claims with provenance
Usage:
PYTHONPATH=src python scripts/fetch_and_extract_url.py <URL> [--output OUTPUT_DIR]
Example:
PYTHONPATH=src python scripts/fetch_and_extract_url.py "https://www.archiveslab.org/events/..." --output data/extracted/archiveslab
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional
# Check dependencies
HAS_PLAYWRIGHT = False
HAS_MARKDOWNIFY = False
HAS_BS4 = False
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
HAS_PLAYWRIGHT = True
except ImportError:
print("Warning: Playwright not available. Install with: pip install playwright")
print("Then run: playwright install chromium")
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
print("Warning: BeautifulSoup not available. Install with: pip install beautifulsoup4")
try:
from markdownify import markdownify as md
HAS_MARKDOWNIFY = True
except ImportError:
print("Warning: markdownify not available. Install with: pip install markdownify")
# =============================================================================
# Simple dataclasses for claims (avoid complex imports)
# =============================================================================
@dataclass
class SimpleProvenance:
"""Simple provenance tracking."""
source_url: str
extraction_method: str
extraction_date: str
confidence: float = 0.85
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class SimpleEntityClaim:
"""Simple entity claim."""
entity_id: str
entity_type: str
name: str
context: Optional[str] = None
provenance: Optional[SimpleProvenance] = None
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
result = {
'entity_id': self.entity_id,
'entity_type': self.entity_type,
'name': self.name,
}
if self.context:
result['context'] = self.context
if self.provenance:
result['provenance'] = self.provenance.to_dict()
if self.metadata:
result['metadata'] = self.metadata
return result
@dataclass
class SimpleTriple:
"""Simple triple (subject-predicate-object)."""
subject: str
predicate: str
object: str
provenance: Optional[SimpleProvenance] = None
def to_dict(self) -> Dict[str, Any]:
result = {
'subject': self.subject,
'predicate': self.predicate,
'object': self.object,
}
if self.provenance:
result['provenance'] = self.provenance.to_dict()
return result
def sanitize_dirname(url: str) -> str:
"""Create a safe directory name from a URL."""
parsed = urlparse(url)
name = parsed.netloc.replace('www.', '')
name = re.sub(r'[^\w\-.]', '_', name)
return name
def clean_html_for_markdown(html: str) -> str:
"""Clean HTML before markdown conversion."""
if not HAS_BS4:
return html
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup.find_all(['script', 'style', 'nav', 'footer',
'aside', 'form', 'iframe', 'noscript', 'svg',
'button', 'input', 'select', 'textarea', 'meta',
'link']):
element.decompose()
# Remove cookie/tracking elements by checking class attributes
for element in soup.find_all(attrs={'class': True}):
classes = element.get('class', [])
if isinstance(classes, list):
class_str = ' '.join(classes).lower()
else:
class_str = str(classes).lower()
if any(term in class_str for term in
['cookie', 'gdpr', 'consent', 'tracking', 'advertisement', 'popup', 'modal']):
element.decompose()
return str(soup)
def extract_text_with_xpaths(soup) -> List[Dict[str, Any]]:
"""Extract text content with XPath locations for provenance."""
extractions = []
def get_xpath(element) -> str:
parts = []
while element and hasattr(element, 'name') and element.name:
if hasattr(element, 'find_previous_siblings'):
siblings = element.find_previous_siblings(element.name)
index = len(siblings) + 1
else:
index = 1
parts.insert(0, f"{element.name}[{index}]")
element = element.parent
return '/' + '/'.join(parts) if parts else '/'
# Extract headings
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
for elem in soup.find_all(tag):
text = elem.get_text(strip=True)
if text and len(text) > 2:
extractions.append({
'text': text,
'xpath': get_xpath(elem),
'tag': tag
})
# Extract paragraphs
for elem in soup.find_all('p'):
text = elem.get_text(strip=True)
if text and len(text) > 20:
extractions.append({
'text': text[:500],
'xpath': get_xpath(elem),
'tag': 'p'
})
# Extract list items
for elem in soup.find_all('li'):
text = elem.get_text(strip=True)
if text and len(text) > 10:
extractions.append({
'text': text[:300],
'xpath': get_xpath(elem),
'tag': 'li'
})
return extractions
def fetch_with_playwright(url: str, take_screenshot: bool = False, timeout: int = 30000) -> Dict[str, Any]:
"""Fetch URL using Playwright."""
result = {
'url': url,
'fetch_timestamp': datetime.now(timezone.utc).isoformat(),
'raw_html': None,
'rendered_html': None,
'markdown': None,
'extractions': [],
'screenshot': None,
'error': None
}
if not HAS_PLAYWRIGHT:
result['error'] = "Playwright not installed"
return result
if not HAS_BS4:
result['error'] = "BeautifulSoup not installed"
return result
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
viewport={'width': 1920, 'height': 1080}
)
page = context.new_page()
response = page.goto(url, wait_until='networkidle', timeout=timeout)
if not response or response.status >= 400:
result['error'] = f"HTTP {response.status if response else 'No response'}"
browser.close()
return result
result['raw_html'] = page.content()
page.wait_for_timeout(2000)
result['rendered_html'] = page.content()
if take_screenshot:
result['screenshot'] = page.screenshot(full_page=True)
soup = BeautifulSoup(result['rendered_html'], 'html.parser')
result['extractions'] = extract_text_with_xpaths(soup)
if HAS_MARKDOWNIFY:
cleaned = clean_html_for_markdown(result['rendered_html'])
markdown = md(cleaned, heading_style='atx', bullets='-')
result['markdown'] = re.sub(r'\n{3,}', '\n\n', markdown).strip()
browser.close()
except Exception as e:
if 'TimeoutError' in str(type(e).__name__):
result['error'] = f"Timeout loading {url}"
else:
result['error'] = f"Error: {str(e)}"
return result
def extract_entities_with_regex(text: str) -> List[Dict[str, Any]]:
"""
Extract entities using regex patterns.
Fallback when LLMAnnotator is not available.
"""
entities = []
# Person names (simple heuristic: Title Case words)
# Look for patterns like "Dr. John Smith" or "Prof. Jane Doe"
person_patterns = [
r'\b(?:Dr\.|Prof\.|Mr\.|Ms\.|Mrs\.)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)',
r'\b([A-Z][a-z]+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b(?=\s+(?:is|was|will|has|presented|spoke))',
]
for pattern in person_patterns:
for match in re.finditer(pattern, text):
name = match.group(1) if match.lastindex else match.group(0)
entities.append({
'text': name.strip(),
'type': 'PER',
'context': text[max(0, match.start()-50):match.end()+50]
})
# Organization names (look for common suffixes/patterns)
org_patterns = [
r'\b([A-Z][a-zA-Z\s]*(?:Archive|Archives|Museum|Library|Institute|University|Foundation|Center|Centre|Lab|Laboratory|Association|Society))\b',
r'\b(The\s+[A-Z][a-zA-Z\s]+(?:Project|Initiative|Program|Programme))\b',
]
for pattern in org_patterns:
for match in re.finditer(pattern, text):
entities.append({
'text': match.group(1).strip(),
'type': 'ORG',
'context': text[max(0, match.start()-50):match.end()+50]
})
# Location names (cities, countries)
location_patterns = [
r'\bin\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b',
r'\bfrom\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b',
]
for pattern in location_patterns:
for match in re.finditer(pattern, text):
entities.append({
'text': match.group(1).strip(),
'type': 'LOC',
'context': text[max(0, match.start()-50):match.end()+50]
})
# URLs
url_pattern = r'https?://[^\s<>"\']+(?:\([^\s<>"\']*\)|[^\s<>"\'\.,;:!?\)])'
for match in re.finditer(url_pattern, text):
entities.append({
'text': match.group(0),
'type': 'URL',
'context': None
})
# Email addresses
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
for match in re.finditer(email_pattern, text):
entities.append({
'text': match.group(0),
'type': 'EMAIL',
'context': None
})
# Deduplicate
seen = set()
unique_entities = []
for e in entities:
key = (e['text'], e['type'])
if key not in seen:
seen.add(key)
unique_entities.append(e)
return unique_entities
def extract_claims_from_markdown(markdown: str, source_url: str) -> Dict[str, Any]:
"""Extract GLAM claims from markdown content."""
# Use regex-based extraction (simpler, no LLM dependency)
entities = extract_entities_with_regex(markdown)
# Build entity claims with metadata
entity_claims = []
triples = []
provenance = SimpleProvenance(
source_url=source_url,
extraction_method="Regex + Playwright",
extraction_date=datetime.now(timezone.utc).isoformat(),
confidence=0.75
)
# Extract event name from URL or content
event_name = "Resilient Communities Resilient Archives"
for i, entity in enumerate(entities):
# Create entity claim
claim = SimpleEntityClaim(
entity_id=f"archiveslab_{i:04d}",
entity_type=entity['type'],
name=entity['text'],
context=entity.get('context', '')[:200] if entity.get('context') else None,
provenance=provenance,
metadata={}
)
entity_claims.append(claim)
# Generate triples based on entity type
if entity['type'] == "PER":
# Person - likely a speaker or organizer
triples.append(SimpleTriple(
subject=entity['text'],
predicate="REL.EVT.SPEAKS_AT",
object=event_name,
provenance=provenance
))
elif entity['type'] == "ORG":
# Organization
triples.append(SimpleTriple(
subject=entity['text'],
predicate="REL.ONT.ISA",
object="Organization",
provenance=provenance
))
# Link to event
triples.append(SimpleTriple(
subject=entity['text'],
predicate="REL.EVT.PARTICIPATES",
object=event_name,
provenance=provenance
))
elif entity['type'] == "LOC":
# Location
triples.append(SimpleTriple(
subject=event_name,
predicate="REL.SPA.LOC",
object=entity['text'],
provenance=provenance
))
elif entity['type'] == "URL":
# URL/Website
triples.append(SimpleTriple(
subject=event_name,
predicate="REL.APP.URL",
object=entity['text'],
provenance=provenance
))
return {
'entity_claims': [c.to_dict() for c in entity_claims],
'triples': [t.to_dict() for t in triples],
'raw_entities': entities
}
def main():
parser = argparse.ArgumentParser(description='Fetch URL and extract GLAM claims')
parser.add_argument('url', help='URL to fetch and extract')
parser.add_argument('--output', '-o', default='data/extracted/archiveslab',
help='Output directory')
parser.add_argument('--screenshot', action='store_true', help='Take screenshot')
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Fetching: {args.url}")
print("-" * 60)
# Step 1: Fetch with Playwright
result = fetch_with_playwright(args.url, take_screenshot=args.screenshot)
if result['error']:
print(f"Error fetching URL: {result['error']}")
sys.exit(1)
# Save archived content
domain = sanitize_dirname(args.url)
archive_dir = output_dir / domain
archive_dir.mkdir(parents=True, exist_ok=True)
# Save HTML
if result['rendered_html']:
(archive_dir / 'rendered.html').write_text(result['rendered_html'], encoding='utf-8')
print(f"Saved: {archive_dir / 'rendered.html'}")
# Save markdown
if result['markdown']:
(archive_dir / 'content.md').write_text(result['markdown'], encoding='utf-8')
print(f"Saved: {archive_dir / 'content.md'}")
# Save extractions
if result['extractions']:
with open(archive_dir / 'extractions.json', 'w', encoding='utf-8') as f:
json.dump(result['extractions'], f, indent=2, ensure_ascii=False)
print(f"Saved: {archive_dir / 'extractions.json'}")
# Save screenshot
if result['screenshot']:
(archive_dir / 'screenshot.png').write_bytes(result['screenshot'])
print(f"Saved: {archive_dir / 'screenshot.png'}")
print("-" * 60)
print(f"Archived {len(result['extractions'])} text extractions with XPaths")
# Step 2: Extract claims from markdown or raw text
text_content = result['markdown'] or result.get('rendered_html', '')
if text_content:
print("\nExtracting GLAM claims...")
claims = extract_claims_from_markdown(text_content, args.url)
# Save claims
claims_file = output_dir / 'archiveslab_claims.json'
with open(claims_file, 'w', encoding='utf-8') as f:
json.dump({
'source_url': args.url,
'fetch_timestamp': result['fetch_timestamp'],
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'entity_claims': claims['entity_claims'],
'triples': claims['triples'],
'raw_entities': claims['raw_entities'],
'statistics': {
'total_entities': len(claims['raw_entities']),
'entity_claims': len(claims['entity_claims']),
'triples': len(claims['triples'])
}
}, f, indent=2, ensure_ascii=False)
print(f"Saved: {claims_file}")
print("-" * 60)
print(f"Extracted {len(claims['entity_claims'])} entity claims")
print(f"Generated {len(claims['triples'])} triples")
# Print entity type breakdown
type_counts = {}
for e in claims['raw_entities']:
t = e['type']
type_counts[t] = type_counts.get(t, 0) + 1
print("\nEntity types:")
for t, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t}: {count}")
print("\nDone!")
if __name__ == '__main__':
main()