- Introduced `llm_extract_archiveslab.py` script for entity and relationship extraction using LLMAnnotator with GLAM-NER v1.7.0. - Replaced regex-based extraction with generative LLM inference. - Added functions for loading markdown content, converting annotation sessions to dictionaries, and generating extraction statistics. - Implemented comprehensive logging of extraction results, including counts of entities, relationships, and specific types like heritage institutions and persons. - Results and statistics are saved in JSON format for further analysis.
518 lines
17 KiB
Python
518 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fetch a URL using Playwright and extract GLAM claims.
|
|
|
|
This script:
|
|
1. Archives a webpage using Playwright (HTML + markdown)
|
|
2. Extracts entity claims using LLMAnnotator
|
|
3. Generates triples for relationships
|
|
4. Outputs structured claims with provenance
|
|
|
|
Usage:
|
|
PYTHONPATH=src python scripts/fetch_and_extract_url.py <URL> [--output OUTPUT_DIR]
|
|
|
|
Example:
|
|
PYTHONPATH=src python scripts/fetch_and_extract_url.py "https://www.archiveslab.org/events/..." --output data/extracted/archiveslab
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
# Check dependencies
|
|
HAS_PLAYWRIGHT = False
|
|
HAS_MARKDOWNIFY = False
|
|
HAS_BS4 = False
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
|
|
HAS_PLAYWRIGHT = True
|
|
except ImportError:
|
|
print("Warning: Playwright not available. Install with: pip install playwright")
|
|
print("Then run: playwright install chromium")
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
HAS_BS4 = True
|
|
except ImportError:
|
|
print("Warning: BeautifulSoup not available. Install with: pip install beautifulsoup4")
|
|
|
|
try:
|
|
from markdownify import markdownify as md
|
|
HAS_MARKDOWNIFY = True
|
|
except ImportError:
|
|
print("Warning: markdownify not available. Install with: pip install markdownify")
|
|
|
|
|
|
# =============================================================================
|
|
# Simple dataclasses for claims (avoid complex imports)
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class SimpleProvenance:
|
|
"""Simple provenance tracking."""
|
|
source_url: str
|
|
extraction_method: str
|
|
extraction_date: str
|
|
confidence: float = 0.85
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class SimpleEntityClaim:
|
|
"""Simple entity claim."""
|
|
entity_id: str
|
|
entity_type: str
|
|
name: str
|
|
context: Optional[str] = None
|
|
provenance: Optional[SimpleProvenance] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
result = {
|
|
'entity_id': self.entity_id,
|
|
'entity_type': self.entity_type,
|
|
'name': self.name,
|
|
}
|
|
if self.context:
|
|
result['context'] = self.context
|
|
if self.provenance:
|
|
result['provenance'] = self.provenance.to_dict()
|
|
if self.metadata:
|
|
result['metadata'] = self.metadata
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class SimpleTriple:
|
|
"""Simple triple (subject-predicate-object)."""
|
|
subject: str
|
|
predicate: str
|
|
object: str
|
|
provenance: Optional[SimpleProvenance] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
result = {
|
|
'subject': self.subject,
|
|
'predicate': self.predicate,
|
|
'object': self.object,
|
|
}
|
|
if self.provenance:
|
|
result['provenance'] = self.provenance.to_dict()
|
|
return result
|
|
|
|
|
|
def sanitize_dirname(url: str) -> str:
|
|
"""Create a safe directory name from a URL."""
|
|
parsed = urlparse(url)
|
|
name = parsed.netloc.replace('www.', '')
|
|
name = re.sub(r'[^\w\-.]', '_', name)
|
|
return name
|
|
|
|
|
|
def clean_html_for_markdown(html: str) -> str:
|
|
"""Clean HTML before markdown conversion."""
|
|
if not HAS_BS4:
|
|
return html
|
|
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Remove unwanted elements
|
|
for element in soup.find_all(['script', 'style', 'nav', 'footer',
|
|
'aside', 'form', 'iframe', 'noscript', 'svg',
|
|
'button', 'input', 'select', 'textarea', 'meta',
|
|
'link']):
|
|
element.decompose()
|
|
|
|
# Remove cookie/tracking elements by checking class attributes
|
|
for element in soup.find_all(attrs={'class': True}):
|
|
classes = element.get('class', [])
|
|
if isinstance(classes, list):
|
|
class_str = ' '.join(classes).lower()
|
|
else:
|
|
class_str = str(classes).lower()
|
|
|
|
if any(term in class_str for term in
|
|
['cookie', 'gdpr', 'consent', 'tracking', 'advertisement', 'popup', 'modal']):
|
|
element.decompose()
|
|
|
|
return str(soup)
|
|
|
|
|
|
def extract_text_with_xpaths(soup) -> List[Dict[str, Any]]:
|
|
"""Extract text content with XPath locations for provenance."""
|
|
extractions = []
|
|
|
|
def get_xpath(element) -> str:
|
|
parts = []
|
|
while element and hasattr(element, 'name') and element.name:
|
|
if hasattr(element, 'find_previous_siblings'):
|
|
siblings = element.find_previous_siblings(element.name)
|
|
index = len(siblings) + 1
|
|
else:
|
|
index = 1
|
|
parts.insert(0, f"{element.name}[{index}]")
|
|
element = element.parent
|
|
return '/' + '/'.join(parts) if parts else '/'
|
|
|
|
# Extract headings
|
|
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
|
|
for elem in soup.find_all(tag):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 2:
|
|
extractions.append({
|
|
'text': text,
|
|
'xpath': get_xpath(elem),
|
|
'tag': tag
|
|
})
|
|
|
|
# Extract paragraphs
|
|
for elem in soup.find_all('p'):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 20:
|
|
extractions.append({
|
|
'text': text[:500],
|
|
'xpath': get_xpath(elem),
|
|
'tag': 'p'
|
|
})
|
|
|
|
# Extract list items
|
|
for elem in soup.find_all('li'):
|
|
text = elem.get_text(strip=True)
|
|
if text and len(text) > 10:
|
|
extractions.append({
|
|
'text': text[:300],
|
|
'xpath': get_xpath(elem),
|
|
'tag': 'li'
|
|
})
|
|
|
|
return extractions
|
|
|
|
|
|
def fetch_with_playwright(url: str, take_screenshot: bool = False, timeout: int = 30000) -> Dict[str, Any]:
|
|
"""Fetch URL using Playwright."""
|
|
result = {
|
|
'url': url,
|
|
'fetch_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'raw_html': None,
|
|
'rendered_html': None,
|
|
'markdown': None,
|
|
'extractions': [],
|
|
'screenshot': None,
|
|
'error': None
|
|
}
|
|
|
|
if not HAS_PLAYWRIGHT:
|
|
result['error'] = "Playwright not installed"
|
|
return result
|
|
|
|
if not HAS_BS4:
|
|
result['error'] = "BeautifulSoup not installed"
|
|
return result
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
context = browser.new_context(
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
viewport={'width': 1920, 'height': 1080}
|
|
)
|
|
page = context.new_page()
|
|
|
|
response = page.goto(url, wait_until='networkidle', timeout=timeout)
|
|
|
|
if not response or response.status >= 400:
|
|
result['error'] = f"HTTP {response.status if response else 'No response'}"
|
|
browser.close()
|
|
return result
|
|
|
|
result['raw_html'] = page.content()
|
|
page.wait_for_timeout(2000)
|
|
result['rendered_html'] = page.content()
|
|
|
|
if take_screenshot:
|
|
result['screenshot'] = page.screenshot(full_page=True)
|
|
|
|
soup = BeautifulSoup(result['rendered_html'], 'html.parser')
|
|
result['extractions'] = extract_text_with_xpaths(soup)
|
|
|
|
if HAS_MARKDOWNIFY:
|
|
cleaned = clean_html_for_markdown(result['rendered_html'])
|
|
markdown = md(cleaned, heading_style='atx', bullets='-')
|
|
result['markdown'] = re.sub(r'\n{3,}', '\n\n', markdown).strip()
|
|
|
|
browser.close()
|
|
|
|
except Exception as e:
|
|
if 'TimeoutError' in str(type(e).__name__):
|
|
result['error'] = f"Timeout loading {url}"
|
|
else:
|
|
result['error'] = f"Error: {str(e)}"
|
|
|
|
return result
|
|
|
|
|
|
def extract_entities_with_regex(text: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Extract entities using regex patterns.
|
|
Fallback when LLMAnnotator is not available.
|
|
"""
|
|
entities = []
|
|
|
|
# Person names (simple heuristic: Title Case words)
|
|
# Look for patterns like "Dr. John Smith" or "Prof. Jane Doe"
|
|
person_patterns = [
|
|
r'\b(?:Dr\.|Prof\.|Mr\.|Ms\.|Mrs\.)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)',
|
|
r'\b([A-Z][a-z]+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b(?=\s+(?:is|was|will|has|presented|spoke))',
|
|
]
|
|
|
|
for pattern in person_patterns:
|
|
for match in re.finditer(pattern, text):
|
|
name = match.group(1) if match.lastindex else match.group(0)
|
|
entities.append({
|
|
'text': name.strip(),
|
|
'type': 'PER',
|
|
'context': text[max(0, match.start()-50):match.end()+50]
|
|
})
|
|
|
|
# Organization names (look for common suffixes/patterns)
|
|
org_patterns = [
|
|
r'\b([A-Z][a-zA-Z\s]*(?:Archive|Archives|Museum|Library|Institute|University|Foundation|Center|Centre|Lab|Laboratory|Association|Society))\b',
|
|
r'\b(The\s+[A-Z][a-zA-Z\s]+(?:Project|Initiative|Program|Programme))\b',
|
|
]
|
|
|
|
for pattern in org_patterns:
|
|
for match in re.finditer(pattern, text):
|
|
entities.append({
|
|
'text': match.group(1).strip(),
|
|
'type': 'ORG',
|
|
'context': text[max(0, match.start()-50):match.end()+50]
|
|
})
|
|
|
|
# Location names (cities, countries)
|
|
location_patterns = [
|
|
r'\bin\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b',
|
|
r'\bfrom\s+([A-Z][a-z]+(?:,\s+[A-Z][a-z]+)?)\b',
|
|
]
|
|
|
|
for pattern in location_patterns:
|
|
for match in re.finditer(pattern, text):
|
|
entities.append({
|
|
'text': match.group(1).strip(),
|
|
'type': 'LOC',
|
|
'context': text[max(0, match.start()-50):match.end()+50]
|
|
})
|
|
|
|
# URLs
|
|
url_pattern = r'https?://[^\s<>"\']+(?:\([^\s<>"\']*\)|[^\s<>"\'\.,;:!?\)])'
|
|
for match in re.finditer(url_pattern, text):
|
|
entities.append({
|
|
'text': match.group(0),
|
|
'type': 'URL',
|
|
'context': None
|
|
})
|
|
|
|
# Email addresses
|
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
|
for match in re.finditer(email_pattern, text):
|
|
entities.append({
|
|
'text': match.group(0),
|
|
'type': 'EMAIL',
|
|
'context': None
|
|
})
|
|
|
|
# Deduplicate
|
|
seen = set()
|
|
unique_entities = []
|
|
for e in entities:
|
|
key = (e['text'], e['type'])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique_entities.append(e)
|
|
|
|
return unique_entities
|
|
|
|
|
|
def extract_claims_from_markdown(markdown: str, source_url: str) -> Dict[str, Any]:
|
|
"""Extract GLAM claims from markdown content."""
|
|
|
|
# Use regex-based extraction (simpler, no LLM dependency)
|
|
entities = extract_entities_with_regex(markdown)
|
|
|
|
# Build entity claims with metadata
|
|
entity_claims = []
|
|
triples = []
|
|
|
|
provenance = SimpleProvenance(
|
|
source_url=source_url,
|
|
extraction_method="Regex + Playwright",
|
|
extraction_date=datetime.now(timezone.utc).isoformat(),
|
|
confidence=0.75
|
|
)
|
|
|
|
# Extract event name from URL or content
|
|
event_name = "Resilient Communities Resilient Archives"
|
|
|
|
for i, entity in enumerate(entities):
|
|
# Create entity claim
|
|
claim = SimpleEntityClaim(
|
|
entity_id=f"archiveslab_{i:04d}",
|
|
entity_type=entity['type'],
|
|
name=entity['text'],
|
|
context=entity.get('context', '')[:200] if entity.get('context') else None,
|
|
provenance=provenance,
|
|
metadata={}
|
|
)
|
|
entity_claims.append(claim)
|
|
|
|
# Generate triples based on entity type
|
|
if entity['type'] == "PER":
|
|
# Person - likely a speaker or organizer
|
|
triples.append(SimpleTriple(
|
|
subject=entity['text'],
|
|
predicate="REL.EVT.SPEAKS_AT",
|
|
object=event_name,
|
|
provenance=provenance
|
|
))
|
|
elif entity['type'] == "ORG":
|
|
# Organization
|
|
triples.append(SimpleTriple(
|
|
subject=entity['text'],
|
|
predicate="REL.ONT.ISA",
|
|
object="Organization",
|
|
provenance=provenance
|
|
))
|
|
# Link to event
|
|
triples.append(SimpleTriple(
|
|
subject=entity['text'],
|
|
predicate="REL.EVT.PARTICIPATES",
|
|
object=event_name,
|
|
provenance=provenance
|
|
))
|
|
elif entity['type'] == "LOC":
|
|
# Location
|
|
triples.append(SimpleTriple(
|
|
subject=event_name,
|
|
predicate="REL.SPA.LOC",
|
|
object=entity['text'],
|
|
provenance=provenance
|
|
))
|
|
elif entity['type'] == "URL":
|
|
# URL/Website
|
|
triples.append(SimpleTriple(
|
|
subject=event_name,
|
|
predicate="REL.APP.URL",
|
|
object=entity['text'],
|
|
provenance=provenance
|
|
))
|
|
|
|
return {
|
|
'entity_claims': [c.to_dict() for c in entity_claims],
|
|
'triples': [t.to_dict() for t in triples],
|
|
'raw_entities': entities
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Fetch URL and extract GLAM claims')
|
|
parser.add_argument('url', help='URL to fetch and extract')
|
|
parser.add_argument('--output', '-o', default='data/extracted/archiveslab',
|
|
help='Output directory')
|
|
parser.add_argument('--screenshot', action='store_true', help='Take screenshot')
|
|
|
|
args = parser.parse_args()
|
|
|
|
output_dir = Path(args.output)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"Fetching: {args.url}")
|
|
print("-" * 60)
|
|
|
|
# Step 1: Fetch with Playwright
|
|
result = fetch_with_playwright(args.url, take_screenshot=args.screenshot)
|
|
|
|
if result['error']:
|
|
print(f"Error fetching URL: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
# Save archived content
|
|
domain = sanitize_dirname(args.url)
|
|
archive_dir = output_dir / domain
|
|
archive_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save HTML
|
|
if result['rendered_html']:
|
|
(archive_dir / 'rendered.html').write_text(result['rendered_html'], encoding='utf-8')
|
|
print(f"Saved: {archive_dir / 'rendered.html'}")
|
|
|
|
# Save markdown
|
|
if result['markdown']:
|
|
(archive_dir / 'content.md').write_text(result['markdown'], encoding='utf-8')
|
|
print(f"Saved: {archive_dir / 'content.md'}")
|
|
|
|
# Save extractions
|
|
if result['extractions']:
|
|
with open(archive_dir / 'extractions.json', 'w', encoding='utf-8') as f:
|
|
json.dump(result['extractions'], f, indent=2, ensure_ascii=False)
|
|
print(f"Saved: {archive_dir / 'extractions.json'}")
|
|
|
|
# Save screenshot
|
|
if result['screenshot']:
|
|
(archive_dir / 'screenshot.png').write_bytes(result['screenshot'])
|
|
print(f"Saved: {archive_dir / 'screenshot.png'}")
|
|
|
|
print("-" * 60)
|
|
print(f"Archived {len(result['extractions'])} text extractions with XPaths")
|
|
|
|
# Step 2: Extract claims from markdown or raw text
|
|
text_content = result['markdown'] or result.get('rendered_html', '')
|
|
|
|
if text_content:
|
|
print("\nExtracting GLAM claims...")
|
|
claims = extract_claims_from_markdown(text_content, args.url)
|
|
|
|
# Save claims
|
|
claims_file = output_dir / 'archiveslab_claims.json'
|
|
with open(claims_file, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'source_url': args.url,
|
|
'fetch_timestamp': result['fetch_timestamp'],
|
|
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'entity_claims': claims['entity_claims'],
|
|
'triples': claims['triples'],
|
|
'raw_entities': claims['raw_entities'],
|
|
'statistics': {
|
|
'total_entities': len(claims['raw_entities']),
|
|
'entity_claims': len(claims['entity_claims']),
|
|
'triples': len(claims['triples'])
|
|
}
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Saved: {claims_file}")
|
|
print("-" * 60)
|
|
print(f"Extracted {len(claims['entity_claims'])} entity claims")
|
|
print(f"Generated {len(claims['triples'])} triples")
|
|
|
|
# Print entity type breakdown
|
|
type_counts = {}
|
|
for e in claims['raw_entities']:
|
|
t = e['type']
|
|
type_counts[t] = type_counts.get(t, 0) + 1
|
|
|
|
print("\nEntity types:")
|
|
for t, count in sorted(type_counts.items(), key=lambda x: -x[1]):
|
|
print(f" {t}: {count}")
|
|
|
|
print("\nDone!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|