glam/scripts/cleanup_entities.py
2025-12-14 17:09:55 +01:00

497 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Clean up low-quality entities from custodian validated_entity_claims sections.
This script removes:
1. Language codes (nl-NL, en-US, etc.) - HTML lang attributes, not real entities
2. Generic navigation labels (Home, Menu, Contact, etc.)
3. Numeric-only entities (image dimensions, years without context)
4. Single/double character entities
5. Common stopwords extracted as entities
Removed entities are archived for audit purposes.
Usage:
python scripts/cleanup_entities.py --dry-run # Preview changes
python scripts/cleanup_entities.py # Apply changes
python scripts/cleanup_entities.py --verbose # Show details
"""
import os
import re
import glob
import json
import argparse
from datetime import datetime, timezone
from collections import Counter, defaultdict
from typing import Optional
# ============================================================================
# CLEANUP RULES - Add new patterns here
# ============================================================================
# Language code patterns (HTML lang attributes)
LANGUAGE_CODE_PATTERNS = [
r'^[a-z]{2}[-_][A-Z]{2}$', # nl-NL, en-US, de-DE
r'^[a-z]{2}_[a-z]{2}$', # nl_nl, en_us
r'^[a-z]{2}$', # nl, en, de, fr (when alone)
]
# Two-letter codes that are NOT language codes (keep these)
KEEP_TWO_LETTER = {
'eu', # European Union
}
# Generic navigation/UI labels (case-insensitive)
GENERIC_LABELS = {
# Navigation
'home', 'menu', 'contact', 'over', 'about', 'search', 'zoeken',
'terug', 'back', 'next', 'vorige', 'volgende', 'more', 'meer',
# Common sections
'nieuws', 'news', 'agenda', 'events', 'evenementen', 'blog',
'login', 'logout', 'inloggen', 'uitloggen', 'registreren',
'cookie', 'cookies', 'privacy', 'disclaimer', 'terms',
# Generic content labels
'lees meer', 'read more', 'bekijk', 'view', 'download',
'share', 'delen', 'print', 'email', 'e-mail',
# Social media generic
'twitter', 'facebook', 'instagram', 'linkedin', 'youtube',
'social media', 'sociale media', 'volg ons', 'follow us',
# Site elements
'header', 'footer', 'sidebar', 'main', 'content',
'skip to content', 'ga naar inhoud',
}
# Numeric-only patterns (image dimensions, isolated numbers)
NUMERIC_PATTERNS = [
r"^'?\d+'?$", # '2025', '1200', 800
r'^\d+x\d+$', # 1920x1080
r'^\d+px$', # 100px
r'^\d+%$', # 50%
]
# Entity types that should be filtered more aggressively
LOW_VALUE_TYPES = {
'QTY.MSR', # Measurements (often image dimensions)
'QTY.CNT', # Counts without context
}
# Minimum entity length (after normalization)
MIN_ENTITY_LENGTH = 3
# Maximum occurrences to consider "too generic" (appears in >X files)
# Entities like "nl-NL" appear 500+ times - clearly metadata, not content
MAX_GENERIC_OCCURRENCES = 100
def normalize_entity(name: str) -> str:
"""Normalize entity name for comparison."""
norm = name.lower().strip()
norm = norm.strip("'\"")
norm = ' '.join(norm.split())
return norm
def is_language_code(entity: str) -> bool:
"""Check if entity is a language code."""
# Keep known non-language two-letter codes
if entity.lower() in KEEP_TWO_LETTER:
return False
for pattern in LANGUAGE_CODE_PATTERNS:
if re.match(pattern, entity, re.IGNORECASE):
return True
return False
def is_generic_label(entity: str) -> bool:
"""Check if entity is a generic navigation/UI label."""
return normalize_entity(entity) in GENERIC_LABELS
def is_numeric_only(entity: str) -> bool:
"""Check if entity is numeric-only."""
for pattern in NUMERIC_PATTERNS:
if re.match(pattern, entity.strip("'\"")):
return True
return False
def is_too_short(entity: str) -> bool:
"""Check if entity is too short to be meaningful."""
norm = normalize_entity(entity)
return len(norm) < MIN_ENTITY_LENGTH
def should_filter_entity(entity_name: str, entity_type: str) -> tuple[bool, str]:
"""
Determine if an entity should be filtered out.
Returns:
(should_filter, reason)
"""
# Language codes
if is_language_code(entity_name):
return True, "language_code"
# Generic labels
if is_generic_label(entity_name):
return True, "generic_label"
# Numeric only
if is_numeric_only(entity_name):
return True, "numeric_only"
# Too short
if is_too_short(entity_name):
return True, "too_short"
# Low-value types with generic content
if entity_type in LOW_VALUE_TYPES:
# Keep measurements if they have context (e.g., "150 cm")
if re.match(r"^'?\d+'?$", entity_name.strip("'\"")):
return True, f"low_value_type_{entity_type}"
return False, ""
def extract_validated_claims_section(content: str) -> Optional[str]:
"""Extract the validated_entity_claims section from YAML content."""
match = re.search(
r'(validated_entity_claims:.*?)(?=\n[a-z_]+:|\Z)',
content,
re.DOTALL
)
return match.group(1) if match else None
def parse_claims_from_section(section: str) -> list[dict]:
"""Parse individual claims from the claims section."""
claims = []
# Find the claims list
claims_match = re.search(r'claims:\s*\n(.*)', section, re.DOTALL)
if not claims_match:
return claims
claims_text = claims_match.group(1)
# Split into individual claim blocks
claim_blocks = re.split(r'\n - entity:', claims_text)
for i, block in enumerate(claim_blocks):
if not block.strip():
continue
# Add back the "- entity:" prefix for parsing (except first)
if i > 0:
block = "- entity:" + block
claim = {}
# Extract fields
entity_match = re.search(r'entity: (.+)', block)
if entity_match:
claim['entity'] = entity_match.group(1).strip()
type_match = re.search(r'entity_type: (\S+)', block)
if type_match:
claim['entity_type'] = type_match.group(1).strip()
xpath_match = re.search(r'xpath: (.+)', block)
if xpath_match:
claim['xpath'] = xpath_match.group(1).strip()
# Get confidence scores
for field in ['base_confidence', 'layout_score', 'pattern_score', 'final_confidence']:
match = re.search(rf'{field}: ([\d.]+)', block)
if match:
claim[field] = float(match.group(1))
layout_match = re.search(r'layout_match: (.+)', block)
if layout_match:
claim['layout_match'] = layout_match.group(1).strip()
if 'entity' in claim:
claims.append(claim)
return claims
def rebuild_claims_yaml(claims: list[dict], metadata: dict) -> str:
"""Rebuild the validated_entity_claims YAML section."""
lines = ['validated_entity_claims:']
lines.append(f" extraction_timestamp: '{metadata.get('extraction_timestamp', '')}'")
lines.append(f" extraction_method: {metadata.get('extraction_method', 'hybrid_llm_pattern_layout_v1')}")
lines.append(f" confidence_threshold: {metadata.get('confidence_threshold', 0.6)}")
lines.append(f" entities_count: {len(claims)}")
lines.append(f" cleanup_applied: '{datetime.now(timezone.utc).isoformat()}'")
lines.append(' claims:')
for claim in claims:
lines.append(f" - entity: {claim['entity']}")
lines.append(f" entity_type: {claim['entity_type']}")
if 'xpath' in claim:
lines.append(f" xpath: {claim['xpath']}")
if 'base_confidence' in claim:
lines.append(f" base_confidence: {claim['base_confidence']}")
if 'layout_score' in claim:
lines.append(f" layout_score: {claim['layout_score']}")
if 'pattern_score' in claim:
lines.append(f" pattern_score: {claim['pattern_score']}")
if 'final_confidence' in claim:
lines.append(f" final_confidence: {claim['final_confidence']}")
if 'layout_match' in claim:
lines.append(f" layout_match: {claim['layout_match']}")
return '\n'.join(lines)
def extract_metadata_from_section(section: str) -> dict:
"""Extract metadata fields from the validated_entity_claims section."""
metadata = {}
ts_match = re.search(r"extraction_timestamp: '([^']+)'", section)
if ts_match:
metadata['extraction_timestamp'] = ts_match.group(1)
method_match = re.search(r'extraction_method: (\S+)', section)
if method_match:
metadata['extraction_method'] = method_match.group(1)
thresh_match = re.search(r'confidence_threshold: ([\d.]+)', section)
if thresh_match:
metadata['confidence_threshold'] = float(thresh_match.group(1))
return metadata
def process_file(filepath: str, dry_run: bool = True, verbose: bool = False) -> dict:
"""
Process a single custodian file and clean up entities.
Returns:
dict with processing results
"""
result = {
'file': os.path.basename(filepath),
'original_count': 0,
'filtered_count': 0,
'removed': [],
'kept': 0,
'modified': False,
'error': None
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Check if file has validated_entity_claims
if 'validated_entity_claims:' not in content:
return result
# Extract the section
section = extract_validated_claims_section(content)
if not section:
return result
# Parse claims
claims = parse_claims_from_section(section)
result['original_count'] = len(claims)
if not claims:
return result
# Extract metadata
metadata = extract_metadata_from_section(section)
# Filter claims
kept_claims = []
removed_claims = []
for claim in claims:
entity_name = claim.get('entity', '')
entity_type = claim.get('entity_type', '')
should_filter, reason = should_filter_entity(entity_name, entity_type)
if should_filter:
removed_claims.append({
'entity': entity_name,
'type': entity_type,
'reason': reason
})
else:
kept_claims.append(claim)
result['filtered_count'] = len(removed_claims)
result['removed'] = removed_claims
result['kept'] = len(kept_claims)
# If nothing to remove, skip
if not removed_claims:
return result
result['modified'] = True
if verbose:
print(f"\n {result['file']}:")
print(f" Original: {result['original_count']}, Kept: {result['kept']}, Removed: {result['filtered_count']}")
for r in removed_claims[:5]:
print(f" - {r['entity'][:40]} ({r['type']}) -> {r['reason']}")
if len(removed_claims) > 5:
print(f" ... and {len(removed_claims) - 5} more")
if not dry_run:
# Rebuild the section
new_section = rebuild_claims_yaml(kept_claims, metadata)
# Replace in content
new_content = re.sub(
r'validated_entity_claims:.*?(?=\n[a-z_]+:|\Z)',
new_section + '\n',
content,
flags=re.DOTALL
)
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
f.write(new_content)
except Exception as e:
result['error'] = str(e)
return result
def main():
parser = argparse.ArgumentParser(
description='Clean up low-quality entities from custodian files'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview changes without modifying files'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Show detailed output for each file'
)
parser.add_argument(
'--limit',
type=int,
default=None,
help='Process only N files (for testing)'
)
parser.add_argument(
'--pattern',
type=str,
default='data/custodian/NL-*.yaml',
help='Glob pattern for files to process'
)
args = parser.parse_args()
print("=" * 70)
print("Entity Cleanup Script")
print("=" * 70)
if args.dry_run:
print("\n[DRY RUN] - No files will be modified\n")
# Find files
files = sorted(glob.glob(args.pattern))
if args.limit:
files = files[:args.limit]
print(f"Found {len(files)} files to process\n")
# Track statistics
stats = {
'total_files': len(files),
'files_with_claims': 0,
'files_modified': 0,
'total_original': 0,
'total_removed': 0,
'total_kept': 0,
'removal_reasons': Counter(),
'errors': [],
}
# Archive for removed entities
archive = {
'cleanup_date': datetime.now(timezone.utc).isoformat(),
'dry_run': args.dry_run,
'files_processed': len(files),
'removed_entities': []
}
# Process files
for i, filepath in enumerate(files):
if i % 200 == 0 and not args.verbose:
print(f" Processing {i}/{len(files)}...")
result = process_file(filepath, dry_run=args.dry_run, verbose=args.verbose)
if result['error']:
stats['errors'].append((result['file'], result['error']))
continue
if result['original_count'] > 0:
stats['files_with_claims'] += 1
stats['total_original'] += result['original_count']
stats['total_kept'] += result['kept']
stats['total_removed'] += result['filtered_count']
if result['modified']:
stats['files_modified'] += 1
for removed in result['removed']:
stats['removal_reasons'][removed['reason']] += 1
archive['removed_entities'].append({
'file': result['file'],
'entity': removed['entity'],
'type': removed['type'],
'reason': removed['reason']
})
# Print summary
print("\n" + "=" * 70)
print("CLEANUP SUMMARY")
print("=" * 70)
print(f"\nFiles:")
print(f" Total processed: {stats['total_files']:,}")
print(f" With claims: {stats['files_with_claims']:,}")
print(f" Modified: {stats['files_modified']:,}")
print(f"\nEntities:")
print(f" Original total: {stats['total_original']:,}")
print(f" Removed: {stats['total_removed']:,} ({100*stats['total_removed']/max(1,stats['total_original']):.1f}%)")
print(f" Kept: {stats['total_kept']:,} ({100*stats['total_kept']/max(1,stats['total_original']):.1f}%)")
print(f"\nRemoval reasons:")
for reason, count in stats['removal_reasons'].most_common():
print(f" {reason:20s}: {count:,}")
if stats['errors']:
print(f"\nErrors: {len(stats['errors'])}")
for file, error in stats['errors'][:5]:
print(f" {file}: {error}")
# Save archive
archive_path = f"reports/entity_cleanup_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
os.makedirs('reports', exist_ok=True)
with open(archive_path, 'w', encoding='utf-8') as f:
json.dump(archive, f, indent=2, ensure_ascii=False)
print(f"\nArchive saved: {archive_path}")
if args.dry_run:
print("\n[DRY RUN] No files were modified. Run without --dry-run to apply changes.")
if __name__ == '__main__':
main()