#!/usr/bin/env python3 """ Fetch a URL using Playwright and extract GLAM claims. This script: 1. Archives a webpage using Playwright (HTML + markdown) 2. Extracts entity claims using LLMAnnotator 3. Generates triples for relationships 4. Outputs structured claims with provenance Usage: PYTHONPATH=src python scripts/fetch_and_extract_url.py [--output OUTPUT_DIR] Example: PYTHONPATH=src python scripts/fetch_and_extract_url.py "https://www.archiveslab.org/events/..." --output data/extracted/archiveslab """ import argparse import json import re import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse from dataclasses import dataclass, asdict from typing import Any, Dict, List, Optional # Check dependencies HAS_PLAYWRIGHT = False HAS_MARKDOWNIFY = False HAS_BS4 = False try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout HAS_PLAYWRIGHT = True except ImportError: print("Warning: Playwright not available. Install with: pip install playwright") print("Then run: playwright install chromium") try: from bs4 import BeautifulSoup HAS_BS4 = True except ImportError: print("Warning: BeautifulSoup not available. Install with: pip install beautifulsoup4") try: from markdownify import markdownify as md HAS_MARKDOWNIFY = True except ImportError: print("Warning: markdownify not available. Install with: pip install markdownify") # ============================================================================= # Simple dataclasses for claims (avoid complex imports) # ============================================================================= @dataclass class SimpleProvenance: """Simple provenance tracking.""" source_url: str extraction_method: str extraction_date: str confidence: float = 0.85 def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class SimpleEntityClaim: """Simple entity claim.""" entity_id: str entity_type: str name: str context: Optional[str] = None provenance: Optional[SimpleProvenance] = None metadata: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: result = { 'entity_id': self.entity_id, 'entity_type': self.entity_type, 'name': self.name, } if self.context: result['context'] = self.context if self.provenance: result['provenance'] = self.provenance.to_dict() if self.metadata: result['metadata'] = self.metadata return result @dataclass class SimpleTriple: """Simple triple (subject-predicate-object).""" subject: str predicate: str object: str provenance: Optional[SimpleProvenance] = None def to_dict(self) -> Dict[str, Any]: result = { 'subject': self.subject, 'predicate': self.predicate, 'object': self.object, } if self.provenance: result['provenance'] = self.provenance.to_dict() return result def sanitize_dirname(url: str) -> str: """Create a safe directory name from a URL.""" parsed = urlparse(url) name = parsed.netloc.replace('www.', '') name = re.sub(r'[^\w\-.]', '_', name) return name def clean_html_for_markdown(html: str) -> str: """Clean HTML before markdown conversion.""" if not HAS_BS4: return html soup = BeautifulSoup(html, 'html.parser') # Remove unwanted elements for element in soup.find_all(['script', 'style', 'nav', 'footer', 'aside', 'form', 'iframe', 'noscript', 'svg', 'button', 'input', 'select', 'textarea', 'meta', 'link']): element.decompose() # Remove cookie/tracking elements by checking class attributes for element in soup.find_all(attrs={'class': True}): classes = element.get('class', []) if isinstance(classes, list): class_str = ' '.join(classes).lower() else: class_str = str(classes).lower() if any(term in class_str for term in ['cookie', 'gdpr', 'consent', 'tracking', 'advertisement', 'popup', 'modal']): element.decompose() return str(soup) def extract_text_with_xpaths(soup) -> List[Dict[str, Any]]: """Extract text content with XPath locations for provenance.""" extractions = [] def get_xpath(element) -> str: parts = [] while element and hasattr(element, 'name') and element.name: if hasattr(element, 'find_previous_siblings'): siblings = element.find_previous_siblings(element.name) index = len(siblings) + 1 else: index = 1 parts.insert(0, f"{element.name}[{index}]") element = element.parent return '/' + '/'.join(parts) if parts else '/' # Extract headings for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: for elem in soup.find_all(tag): text = elem.get_text(strip=True) if text and len(text) > 2: extractions.append({ 'text': text, 'xpath': get_xpath(elem), 'tag': tag }) # Extract paragraphs for elem in soup.find_all('p'): text = elem.get_text(strip=True) if text and len(text) > 20: extractions.append({ 'text': text[:500], 'xpath': get_xpath(elem), 'tag': 'p' }) # Extract list items for elem in soup.find_all('li'): text = elem.get_text(strip=True) if text and len(text) > 10: extractions.append({ 'text': text[:300], 'xpath': get_xpath(elem), 'tag': 'li' }) return extractions def fetch_with_playwright(url: str, take_screenshot: bool = False, timeout: int = 30000) -> Dict[str, Any]: """Fetch URL using Playwright.""" result = { 'url': url, 'fetch_timestamp': datetime.now(timezone.utc).isoformat(), 'raw_html': None, 'rendered_html': None, 'markdown': None, 'extractions': [], 'screenshot': None, 'error': None } if not HAS_PLAYWRIGHT: result['error'] = "Playwright not installed" return result if not HAS_BS4: result['error'] = "BeautifulSoup not installed" return result try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) page = context.new_page() response = page.goto(url, wait_until='networkidle', timeout=timeout) if not response or response.status >= 400: result['error'] = f"HTTP {response.status if response else 'No response'}" browser.close() return result result['raw_html'] = page.content() page.wait_for_timeout(2000) result['rendered_html'] = page.content() if take_screenshot: result['screenshot'] = page.screenshot(full_page=True) soup = BeautifulSoup(result['rendered_html'], 'html.parser') result['extractions'] = extract_text_with_xpaths(soup) if HAS_MARKDOWNIFY: cleaned = clean_html_for_markdown(result['rendered_html']) markdown = md(cleaned, heading_style='atx', bullets='-') result['markdown'] = re.sub(r'\n{3,}', '\n\n', markdown).strip() browser.close() except Exception as e: if 'TimeoutError' in str(type(e).__name__): result['error'] = f"Timeout loading {url}" else: result['error'] = f"Error: {str(e)}" return result def extract_entities_with_regex(text: str) -> List[Dict[str, Any]]: """ Extract entities using regex patterns. Fallback when LLMAnnotator is not available. """ entities = [] # Person names (simple heuristic: Title Case words) # Look for patterns like "Dr. John Smith" or "Prof. Jane Doe" person_patterns = [ r'\b(?:Dr\.|Prof\.|Mr\.|Ms\.|Mrs\.)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)', r'\b([A-Z][a-z]+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b(?=\s+(?:is|was|will|has|presented|spoke))', ] for pattern in person_patterns: for match in re.finditer(pattern, text): name = match.group(1) if match.lastindex else match.group(0) entities.append({ 'text': name.strip(), 'type': 'PER', 'context': text[max(0, match.start()-50):match.end()+50] }) # Organization names (look for common suffixes/patterns) org_patterns = [ r'\b([A-Z][a-zA-Z\s]*(?:Archive|Archives|Museum|Library|Institute|University|Foundation|Center|Centre|Lab|Laboratory|Association|Society))\b', r'\b(The\s+[A-Z][a-zA-Z\s]+(?:Project|Initiative|Program|Programme))\b', ] for pattern in org_patterns: for match in re.finditer(pattern, text): entities.append({ 'text': match.group(1).strip(), 'type': 'ORG', 'context': text[max(0, match.start()-50):match.end()+50] }) # Location names (cities, countries) location_patterns = [ r'\bin\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b', r'\bfrom\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b', ] for pattern in location_patterns: for match in re.finditer(pattern, text): entities.append({ 'text': match.group(1).strip(), 'type': 'LOC', 'context': text[max(0, match.start()-50):match.end()+50] }) # URLs url_pattern = r'https?://[^\s<>"\']+(?:\([^\s<>"\']*\)|[^\s<>"\'\.,;:!?\)])' for match in re.finditer(url_pattern, text): entities.append({ 'text': match.group(0), 'type': 'URL', 'context': None }) # Email addresses email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' for match in re.finditer(email_pattern, text): entities.append({ 'text': match.group(0), 'type': 'EMAIL', 'context': None }) # Deduplicate seen = set() unique_entities = [] for e in entities: key = (e['text'], e['type']) if key not in seen: seen.add(key) unique_entities.append(e) return unique_entities def extract_claims_from_markdown(markdown: str, source_url: str) -> Dict[str, Any]: """Extract GLAM claims from markdown content.""" # Use regex-based extraction (simpler, no LLM dependency) entities = extract_entities_with_regex(markdown) # Build entity claims with metadata entity_claims = [] triples = [] provenance = SimpleProvenance( source_url=source_url, extraction_method="Regex + Playwright", extraction_date=datetime.now(timezone.utc).isoformat(), confidence=0.75 ) # Extract event name from URL or content event_name = "Resilient Communities Resilient Archives" for i, entity in enumerate(entities): # Create entity claim claim = SimpleEntityClaim( entity_id=f"archiveslab_{i:04d}", entity_type=entity['type'], name=entity['text'], context=entity.get('context', '')[:200] if entity.get('context') else None, provenance=provenance, metadata={} ) entity_claims.append(claim) # Generate triples based on entity type if entity['type'] == "PER": # Person - likely a speaker or organizer triples.append(SimpleTriple( subject=entity['text'], predicate="REL.EVT.SPEAKS_AT", object=event_name, provenance=provenance )) elif entity['type'] == "ORG": # Organization triples.append(SimpleTriple( subject=entity['text'], predicate="REL.ONT.ISA", object="Organization", provenance=provenance )) # Link to event triples.append(SimpleTriple( subject=entity['text'], predicate="REL.EVT.PARTICIPATES", object=event_name, provenance=provenance )) elif entity['type'] == "LOC": # Location triples.append(SimpleTriple( subject=event_name, predicate="REL.SPA.LOC", object=entity['text'], provenance=provenance )) elif entity['type'] == "URL": # URL/Website triples.append(SimpleTriple( subject=event_name, predicate="REL.APP.URL", object=entity['text'], provenance=provenance )) return { 'entity_claims': [c.to_dict() for c in entity_claims], 'triples': [t.to_dict() for t in triples], 'raw_entities': entities } def main(): parser = argparse.ArgumentParser(description='Fetch URL and extract GLAM claims') parser.add_argument('url', help='URL to fetch and extract') parser.add_argument('--output', '-o', default='data/extracted/archiveslab', help='Output directory') parser.add_argument('--screenshot', action='store_true', help='Take screenshot') args = parser.parse_args() output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) print(f"Fetching: {args.url}") print("-" * 60) # Step 1: Fetch with Playwright result = fetch_with_playwright(args.url, take_screenshot=args.screenshot) if result['error']: print(f"Error fetching URL: {result['error']}") sys.exit(1) # Save archived content domain = sanitize_dirname(args.url) archive_dir = output_dir / domain archive_dir.mkdir(parents=True, exist_ok=True) # Save HTML if result['rendered_html']: (archive_dir / 'rendered.html').write_text(result['rendered_html'], encoding='utf-8') print(f"Saved: {archive_dir / 'rendered.html'}") # Save markdown if result['markdown']: (archive_dir / 'content.md').write_text(result['markdown'], encoding='utf-8') print(f"Saved: {archive_dir / 'content.md'}") # Save extractions if result['extractions']: with open(archive_dir / 'extractions.json', 'w', encoding='utf-8') as f: json.dump(result['extractions'], f, indent=2, ensure_ascii=False) print(f"Saved: {archive_dir / 'extractions.json'}") # Save screenshot if result['screenshot']: (archive_dir / 'screenshot.png').write_bytes(result['screenshot']) print(f"Saved: {archive_dir / 'screenshot.png'}") print("-" * 60) print(f"Archived {len(result['extractions'])} text extractions with XPaths") # Step 2: Extract claims from markdown or raw text text_content = result['markdown'] or result.get('rendered_html', '') if text_content: print("\nExtracting GLAM claims...") claims = extract_claims_from_markdown(text_content, args.url) # Save claims claims_file = output_dir / 'archiveslab_claims.json' with open(claims_file, 'w', encoding='utf-8') as f: json.dump({ 'source_url': args.url, 'fetch_timestamp': result['fetch_timestamp'], 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'entity_claims': claims['entity_claims'], 'triples': claims['triples'], 'raw_entities': claims['raw_entities'], 'statistics': { 'total_entities': len(claims['raw_entities']), 'entity_claims': len(claims['entity_claims']), 'triples': len(claims['triples']) } }, f, indent=2, ensure_ascii=False) print(f"Saved: {claims_file}") print("-" * 60) print(f"Extracted {len(claims['entity_claims'])} entity claims") print(f"Generated {len(claims['triples'])} triples") # Print entity type breakdown type_counts = {} for e in claims['raw_entities']: t = e['type'] type_counts[t] = type_counts.get(t, 0) + 1 print("\nEntity types:") for t, count in sorted(type_counts.items(), key=lambda x: -x[1]): print(f" {t}: {count}") print("\nDone!") if __name__ == '__main__': main()