glam/scripts/convert_website_enrichment_to_claims.py

302 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Convert website_enrichment blocks to web_enrichment with claim-level provenance.
This script transforms the nested website_enrichment structure into the standardized
web_enrichment format with individual claims, each with its own provenance metadata.
Usage:
python scripts/convert_website_enrichment_to_claims.py [--dry-run] [--limit N]
"""
import argparse
import sys
from pathlib import Path
from datetime import datetime, timezone
from typing import Any
import yaml
def flatten_dict(d: dict, parent_key: str = '', sep: str = '.') -> dict:
"""Flatten a nested dictionary into dot-notation keys."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep).items())
elif isinstance(v, list):
# For lists, create indexed keys or join simple values
if v and all(isinstance(x, str) for x in v):
items.append((new_key, v)) # Keep as list
elif v and all(isinstance(x, dict) for x in v):
for i, item in enumerate(v):
items.extend(flatten_dict(item, f"{new_key}[{i}]", sep).items())
else:
items.append((new_key, v))
else:
items.append((new_key, v))
return dict(items)
def extract_claims_from_website_enrichment(website_enrichment: dict, source_url: str, fetch_timestamp: str) -> list[dict]:
"""
Extract individual claims from website_enrichment structure.
Maps nested fields to claim types with appropriate confidence scores.
"""
claims = []
# Define claim type mappings with confidence scores
# Higher confidence for factual data, lower for scraped descriptions
claim_mappings = {
# Organization details
'organization_details.full_name': ('organization_full_name', 0.95),
'organization_details.short_name': ('organization_short_name', 0.95),
'organization_details.legal_form': ('legal_form', 0.90),
'organization_details.founded': ('founded', 0.90),
'organization_details.description': ('description', 0.85),
'organization_details.mission': ('mission', 0.85),
'organization_details.member_count': ('member_count', 0.85),
'organization_details.membership_fee': ('membership_fee', 0.90),
'organization_details.tagline': ('tagline', 0.85),
'organization_details.parent_organization': ('parent_organization', 0.90),
# Legal status
'legal_status.anbi_status': ('anbi_status', 0.95),
'legal_status.rsin': ('rsin', 0.95),
'legal_status.kvk_number': ('kvk_number', 0.95),
# Museum info
'museum.name': ('museum_name', 0.95),
'museum.description': ('museum_description', 0.85),
'museum.website': ('museum_website', 0.95),
'museum.established': ('museum_established', 0.90),
# Location
'location.street_address': ('street_address', 0.95),
'location.postal_code': ('postal_code', 0.95),
'location.city': ('city', 0.95),
'location.municipality': ('municipality', 0.90),
'location.province': ('province', 0.95),
'location.country': ('country', 0.99),
'location.venue_name': ('venue_name', 0.90),
# Contact
'contact.email': ('email', 0.95),
'contact.phone': ('phone', 0.95),
'contact.website': ('website', 0.99),
'contact.facebook': ('facebook', 0.95),
# Publications
'publications.journal.name': ('journal_name', 0.95),
'publications.journal.url': ('journal_url', 0.95),
# Digital resources
'digital_resources.beeldbank.url': ('beeldbank_url', 0.95),
'digital_resources.beeldbank.description': ('beeldbank_description', 0.85),
# Collections
'collections.permanent_collection.description': ('collection_description', 0.85),
# Opening hours
'opening_hours': ('opening_hours', 0.90),
}
# Flatten the website_enrichment dict
flat = flatten_dict(website_enrichment)
for flat_key, value in flat.items():
if value is None or value == '' or flat_key in ('fetch_timestamp', 'fetch_status', 'source_url'):
continue
# Check for direct mapping
claim_type = None
confidence = 0.80 # Default confidence
for pattern, (ctype, conf) in claim_mappings.items():
if flat_key == pattern or flat_key.startswith(pattern):
claim_type = ctype
confidence = conf
break
# If no mapping found, create a generic claim type from the key
if claim_type is None:
# Convert nested key to claim type
claim_type = flat_key.replace('.', '_').replace('[', '_').replace(']', '')
confidence = 0.80
# Format the value
if isinstance(value, list):
if all(isinstance(x, str) for x in value):
claim_value = '; '.join(value)
else:
claim_value = str(value)
elif isinstance(value, bool):
claim_value = str(value).lower()
elif isinstance(value, dict):
# Skip complex nested dicts that weren't flattened
continue
else:
claim_value = str(value)
claims.append({
'claim_type': claim_type,
'claim_value': claim_value,
'source_url': source_url,
'extraction_timestamp': fetch_timestamp,
'confidence': confidence
})
return claims
def convert_file(filepath: Path, dry_run: bool = False) -> tuple[bool, str]:
"""
Convert a single file's website_enrichment to web_enrichment format.
Returns:
Tuple of (success, message)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return False, "Empty file"
# Check if website_enrichment exists
if 'website_enrichment' not in data:
return False, "No website_enrichment block"
# Check if web_enrichment already exists (avoid duplicate conversion)
if 'web_enrichment' in data:
return False, "web_enrichment already exists (skipping)"
website_enrichment = data['website_enrichment']
# Extract metadata - handle multiple source URL formats
source_url = website_enrichment.get('source_url', '')
source_urls = website_enrichment.get('source_urls', [])
# Handle 'sources' list format (e.g., [{url: ..., method: ...}])
sources_list = website_enrichment.get('sources', [])
if not source_url and not source_urls and sources_list:
if isinstance(sources_list, list) and sources_list:
first_source = sources_list[0]
if isinstance(first_source, dict) and 'url' in first_source:
source_url = first_source['url']
source_urls = [s.get('url') for s in sources_list if isinstance(s, dict) and s.get('url')]
# Use first source_url if source_urls is provided
if not source_url and source_urls:
source_url = source_urls[0] if isinstance(source_urls, list) else source_urls
# Fallback: try to get URL from original_entry.webadres_organisatie
if not source_url and 'original_entry' in data:
source_url = data['original_entry'].get('webadres_organisatie', '')
fetch_timestamp = website_enrichment.get('fetch_timestamp', datetime.now(timezone.utc).isoformat())
fetch_status = website_enrichment.get('fetch_status', 'SUCCESS')
if not source_url:
return False, "No source_url found (checked website_enrichment and original_entry)"
# Extract claims
claims = extract_claims_from_website_enrichment(website_enrichment, source_url, fetch_timestamp)
if not claims:
return False, "No claims extracted"
# Build raw_sources list (include all source URLs if multiple)
raw_sources = []
all_urls = source_urls if source_urls else [source_url]
for url in all_urls:
raw_sources.append({
'url': url,
'fetch_timestamp': fetch_timestamp,
'source_type': 'official_website',
'fetch_status': fetch_status
})
# Build new web_enrichment structure
web_enrichment = {
'enrichment_timestamp': fetch_timestamp,
'enrichment_method': 'website_scrape_with_claim_provenance',
'source_url': source_url,
'claims': claims,
'raw_sources': raw_sources,
'web_enrichment_status': fetch_status
}
# Add the new web_enrichment block
data['web_enrichment'] = web_enrichment
if dry_run:
return True, f"Would add web_enrichment with {len(claims)} claims"
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True, f"Added web_enrichment with {len(claims)} claims"
def main():
parser = argparse.ArgumentParser(description='Convert website_enrichment to web_enrichment with claims')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes')
parser.add_argument('--limit', type=int, default=None, help='Limit number of files to process')
parser.add_argument('--file', type=str, default=None, help='Process a single file')
args = parser.parse_args()
entries_dir = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
if args.file:
files = [Path(args.file)]
else:
# Find all files with website_enrichment
files = sorted(entries_dir.glob('*.yaml'))
if args.limit:
files = files[:args.limit]
converted = 0
skipped = 0
errors = 0
for filepath in files:
try:
# Quick check if file has website_enrichment
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
if 'website_enrichment:' not in content:
continue
if 'web_enrichment:' in content:
print(f"SKIP: {filepath.name} - web_enrichment already exists")
skipped += 1
continue
success, message = convert_file(filepath, dry_run=args.dry_run)
if success:
print(f"{'WOULD ' if args.dry_run else ''}OK: {filepath.name} - {message}")
converted += 1
else:
print(f"SKIP: {filepath.name} - {message}")
skipped += 1
except Exception as e:
print(f"ERROR: {filepath.name} - {e}")
errors += 1
print(f"\n{'DRY RUN ' if args.dry_run else ''}Summary:")
print(f" Converted: {converted}")
print(f" Skipped: {skipped}")
print(f" Errors: {errors}")
return 0 if errors == 0 else 1
if __name__ == '__main__':
sys.exit(main())