glam/src/glam_extractor/annotators/html_parser.py
2025-12-05 15:30:23 +01:00

394 lines
12 KiB
Python

"""
HTML Parser for web archive documents.
Parses HTML files from web archives and creates structured document
representations suitable for annotation.
Supports:
- WARC archives
- HTML mirror directories
- Rendered HTML from Playwright
"""
import hashlib
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from lxml import html, etree
from lxml.html import HtmlElement
@dataclass
class HTMLElement:
"""
Represents an HTML element with XPath and content.
"""
tag: str
xpath: str
text_content: str
tail_text: str = ""
attributes: Dict[str, str] = field(default_factory=dict)
# Position in document
start_offset: Optional[int] = None
end_offset: Optional[int] = None
# Nesting
parent_xpath: Optional[str] = None
children_xpaths: List[str] = field(default_factory=list)
# Layout hints
is_block: bool = True
is_heading: bool = False
heading_level: Optional[int] = None
@property
def full_text(self) -> str:
"""Get text content including tail."""
return self.text_content + self.tail_text
@dataclass
class HTMLDocument:
"""
Parsed HTML document ready for annotation.
"""
# Source information
source_url: Optional[str] = None
source_file: Optional[str] = None
retrieved_at: Optional[str] = None
# Document metadata
title: str = ""
language: Optional[str] = None
encoding: str = "utf-8"
# Content hash for deduplication
content_hash: str = ""
# Raw content
raw_html: str = ""
# Parsed structure
elements: List[HTMLElement] = field(default_factory=list)
# Full text (for offset calculations)
full_text: str = ""
# XPath to element mapping
xpath_map: Dict[str, HTMLElement] = field(default_factory=dict)
def get_element_by_xpath(self, xpath: str) -> Optional[HTMLElement]:
"""Get element by XPath."""
return self.xpath_map.get(xpath)
def get_text_at_offset(self, start: int, end: int) -> str:
"""Get text slice by character offsets."""
return self.full_text[start:end]
class HTMLParser:
"""
Parser for HTML documents from web archives.
Creates HTMLDocument objects suitable for annotation with
XPath-based provenance tracking.
"""
# Block-level elements
BLOCK_ELEMENTS = {
'div', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
'ul', 'ol', 'li', 'table', 'tr', 'td', 'th',
'blockquote', 'pre', 'article', 'section', 'aside',
'header', 'footer', 'nav', 'main', 'figure', 'figcaption',
'address', 'form', 'fieldset', 'legend', 'dl', 'dt', 'dd',
}
# Heading elements
HEADING_ELEMENTS = {'h1': 1, 'h2': 2, 'h3': 3, 'h4': 4, 'h5': 5, 'h6': 6}
# Elements to skip (no semantic content)
SKIP_ELEMENTS = {'script', 'style', 'noscript', 'svg', 'path', 'meta', 'link'}
def __init__(self, include_invisible: bool = False):
"""
Initialize HTML parser.
Args:
include_invisible: Include hidden/invisible elements
"""
self.include_invisible = include_invisible
def parse_file(self, file_path: str | Path) -> HTMLDocument:
"""
Parse an HTML file.
Args:
file_path: Path to HTML file
Returns:
Parsed HTMLDocument
"""
file_path = Path(file_path)
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
doc = self.parse_string(html_content)
doc.source_file = str(file_path)
return doc
def parse_string(self, html_content: str) -> HTMLDocument:
"""
Parse HTML from string.
Args:
html_content: HTML content as string
Returns:
Parsed HTMLDocument
"""
doc = HTMLDocument()
doc.raw_html = html_content
doc.content_hash = hashlib.sha256(html_content.encode('utf-8')).hexdigest()
# Parse with lxml
try:
tree = html.fromstring(html_content)
except Exception as e:
# Fallback for malformed HTML
tree = html.document_fromstring(html_content)
# Extract metadata
doc.title = self._extract_title(tree)
doc.language = self._extract_language(tree)
# Build element list with XPaths
doc.elements, doc.full_text = self._extract_elements(tree)
# Build XPath map
doc.xpath_map = {elem.xpath: elem for elem in doc.elements}
return doc
def _extract_title(self, tree: HtmlElement) -> str:
"""Extract document title."""
title_elems = tree.xpath('//title/text()')
if title_elems:
return str(title_elems[0]).strip()
# Fallback to h1
h1_elems = tree.xpath('//h1/text()')
if h1_elems:
return str(h1_elems[0]).strip()
return ""
def _extract_language(self, tree: HtmlElement) -> Optional[str]:
"""Extract document language."""
# Check html lang attribute
lang = tree.get('lang')
if lang:
return lang
# Check meta content-language
meta_lang = tree.xpath('//meta[@http-equiv="content-language"]/@content')
if meta_lang:
return str(meta_lang[0])
return None
def _extract_elements(
self,
tree: HtmlElement,
) -> Tuple[List[HTMLElement], str]:
"""
Extract elements with XPaths and text content.
Returns:
Tuple of (element list, full text)
"""
elements: List[HTMLElement] = []
text_parts: List[str] = []
current_offset = 0
def process_element(elem: HtmlElement, parent_xpath: Optional[str] = None):
nonlocal current_offset
tag = elem.tag if isinstance(elem.tag, str) else "unknown"
# Skip non-content elements
if tag in self.SKIP_ELEMENTS:
return
# Skip comments and processing instructions
if not isinstance(tag, str):
return
# Generate XPath
xpath = tree.getroottree().getpath(elem)
# Get text content
text = elem.text or ""
tail = elem.tail or ""
# Skip invisible elements unless configured otherwise
if not self.include_invisible:
style = elem.get('style', '')
if 'display:none' in style or 'visibility:hidden' in style:
return
# Create element
html_elem = HTMLElement(
tag=tag,
xpath=xpath,
text_content=text,
tail_text=tail,
attributes=dict(elem.attrib),
parent_xpath=parent_xpath,
is_block=tag.lower() in self.BLOCK_ELEMENTS,
is_heading=tag.lower() in self.HEADING_ELEMENTS,
heading_level=self.HEADING_ELEMENTS.get(tag.lower()),
)
# Calculate offsets
if text:
html_elem.start_offset = current_offset
text_parts.append(text)
current_offset += len(text)
html_elem.end_offset = current_offset
elements.append(html_elem)
# Process children
children_xpaths = []
for child in elem:
if isinstance(child.tag, str):
child_xpath = tree.getroottree().getpath(child)
children_xpaths.append(child_xpath)
process_element(child, xpath)
html_elem.children_xpaths = children_xpaths
# Add tail text to offset tracking
if tail:
text_parts.append(tail)
current_offset += len(tail)
# Start processing from body or root
body = tree.xpath('//body')
if body:
process_element(body[0])
else:
process_element(tree)
full_text = ''.join(text_parts)
return elements, full_text
def parse_warc(self, warc_path: str | Path) -> List[HTMLDocument]:
"""
Parse HTML documents from a WARC archive.
Args:
warc_path: Path to WARC file
Returns:
List of parsed HTMLDocument objects
"""
# Import warcio only when needed
try:
from warcio.archiveiterator import ArchiveIterator
except ImportError:
raise ImportError("warcio required for WARC parsing: pip install warcio")
warc_path = Path(warc_path)
documents = []
with open(warc_path, 'rb') as f:
for record in ArchiveIterator(f):
if record.rec_type == 'response':
content_type = record.http_headers.get_header('Content-Type', '')
if 'text/html' in content_type:
url = record.rec_headers.get_header('WARC-Target-URI')
content = record.content_stream().read().decode('utf-8', errors='replace')
doc = self.parse_string(content)
doc.source_url = url
doc.source_file = str(warc_path)
# Extract date from WARC header
warc_date = record.rec_headers.get_header('WARC-Date')
if warc_date:
doc.retrieved_at = warc_date
documents.append(doc)
return documents
def parse_mirror_directory(
self,
mirror_path: str | Path,
) -> List[HTMLDocument]:
"""
Parse all HTML files in a mirror directory.
Args:
mirror_path: Path to mirror directory
Returns:
List of parsed HTMLDocument objects
"""
mirror_path = Path(mirror_path)
documents = []
for html_file in mirror_path.rglob('*.html'):
try:
doc = self.parse_file(html_file)
# Infer URL from directory structure
relative_path = html_file.relative_to(mirror_path)
doc.source_url = f"https://{relative_path}"
documents.append(doc)
except Exception as e:
print(f"Error parsing {html_file}: {e}")
continue
return documents
def get_xpath_for_text(
document: HTMLDocument,
search_text: str,
fuzzy: bool = False,
) -> List[Tuple[str, int, int]]:
"""
Find XPath(s) containing specific text.
Args:
document: Parsed HTML document
search_text: Text to search for
fuzzy: Use fuzzy matching
Returns:
List of (xpath, start_offset, end_offset) tuples
"""
results = []
for elem in document.elements:
if search_text in elem.text_content:
# Find exact position within element
start = elem.text_content.find(search_text)
end = start + len(search_text)
# Calculate document-level offsets
if elem.start_offset is not None:
doc_start = elem.start_offset + start
doc_end = elem.start_offset + end
else:
doc_start = start
doc_end = end
results.append((elem.xpath, doc_start, doc_end))
return results