glam/scripts/add_provenance_to_enriched.py

317 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Add claim-level provenance to enriched NDE YAML files.
This script adds proper provenance tracking to all enriched files,
recording where each piece of data came from.
Provenance Schema:
- Each claim (data point) can have its own source reference
- Sources include: original_entry, wikidata_api, google_maps_api, exa_web_search, webfetch
- For web content: URL, fetch_timestamp, text_excerpt (exact quoted text)
- For Exa: highlights with scores, markdown section path
"""
import os
import sys
import yaml
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import re
# Preserve YAML formatting
class PreservingDumper(yaml.SafeDumper):
pass
def str_representer(dumper, data):
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
PreservingDumper.add_representer(str, str_representer)
def create_source_reference(
source_type: str,
url: Optional[str] = None,
fetch_timestamp: Optional[str] = None,
text_excerpt: Optional[str] = None,
markdown_section: Optional[str] = None,
exa_highlight_score: Optional[float] = None,
api_endpoint: Optional[str] = None,
entity_id: Optional[str] = None,
place_id: Optional[str] = None,
claims_extracted: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Create a standardized source reference object."""
ref = {"source_type": source_type}
if url:
ref["url"] = url
if fetch_timestamp:
ref["fetch_timestamp"] = fetch_timestamp
if text_excerpt:
ref["text_excerpt"] = text_excerpt
if markdown_section:
ref["markdown_section"] = markdown_section
if exa_highlight_score is not None:
ref["exa_highlight_score"] = exa_highlight_score
if api_endpoint:
ref["api_endpoint"] = api_endpoint
if entity_id:
ref["entity_id"] = entity_id
if place_id:
ref["place_id"] = place_id
if claims_extracted:
ref["claims_extracted"] = claims_extracted
return ref
def extract_provenance_from_existing(data: Dict[str, Any]) -> Dict[str, List[Dict]]:
"""
Extract provenance information from existing enrichment data
and organize it by claim type.
"""
provenance = {}
# 1. Original NDE Entry provenance
if "original_entry" in data:
provenance["original_entry"] = [{
"source_type": "nde_csv_registry",
"data_tier": "TIER_1_AUTHORITATIVE",
"claims_extracted": list(data["original_entry"].keys()),
}]
# 2. Wikidata enrichment provenance
if "wikidata_enrichment" in data:
wd = data["wikidata_enrichment"]
api_meta = wd.get("api_metadata", {})
wd_ref = create_source_reference(
source_type="wikidata_api",
api_endpoint=api_meta.get("api_endpoint", "https://www.wikidata.org/w/rest.php/wikibase/v1"),
fetch_timestamp=api_meta.get("fetch_timestamp"),
entity_id=wd.get("wikidata_entity_id"),
claims_extracted=[]
)
# Track which claims came from Wikidata
wikidata_claims = []
if "wikidata_labels" in wd:
wikidata_claims.append("labels")
if "wikidata_descriptions" in wd:
wikidata_claims.append("descriptions")
if "wikidata_instance_of" in wd:
wikidata_claims.append("instance_of")
if "wikidata_country" in wd:
wikidata_claims.append("country")
if "wikidata_located_in" in wd:
wikidata_claims.append("located_in")
if "wikidata_coordinates" in wd:
wikidata_claims.append("coordinates")
if "wikidata_official_website" in wd:
wikidata_claims.append("official_website")
if "wikidata_claims" in wd:
for claim_key in wd["wikidata_claims"].keys():
wikidata_claims.append(f"claim_{claim_key}")
wd_ref["claims_extracted"] = wikidata_claims
provenance["wikidata"] = [wd_ref]
# 3. Google Maps enrichment provenance
if "google_maps_enrichment" in data:
gm = data["google_maps_enrichment"]
gm_ref = create_source_reference(
source_type="google_maps_api",
api_endpoint="https://maps.googleapis.com/maps/api/place/textsearch",
fetch_timestamp=gm.get("fetch_timestamp"),
place_id=gm.get("place_id"),
claims_extracted=[]
)
# Track which claims came from Google Maps
gm_claims = []
if "coordinates" in gm:
gm_claims.append("coordinates")
if "formatted_address" in gm:
gm_claims.append("formatted_address")
if "address_components" in gm:
gm_claims.append("address_components")
if "business_status" in gm:
gm_claims.append("business_status")
if "website" in gm:
gm_claims.append("website")
if "phone_local" in gm or "phone_international" in gm:
gm_claims.append("phone")
if "rating" in gm:
gm_claims.append("rating")
if "reviews" in gm:
gm_claims.append("reviews")
if "opening_hours" in gm:
gm_claims.append("opening_hours")
gm_ref["claims_extracted"] = gm_claims
provenance["google_maps"] = [gm_ref]
# 4. Exa enrichment provenance (if exists)
if "exa_enrichment" in data:
exa = data["exa_enrichment"]
exa_ref = create_source_reference(
source_type="exa_web_search",
url=exa.get("source_url"),
fetch_timestamp=exa.get("fetch_timestamp"),
claims_extracted=[]
)
# If we have highlights, include them
if "highlights" in exa:
exa_ref["highlights"] = exa["highlights"]
provenance["exa"] = [exa_ref]
# 5. Website enrichment provenance (direct webfetch)
if "website_enrichment" in data or "organization_details" in data:
# Check for source_references already in organization_details
org = data.get("organization_details", {})
if "source_references" in org:
provenance["website"] = org["source_references"]
return provenance
def add_provenance_section(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Add a consolidated provenance section to the YAML data.
"""
# Extract existing provenance
existing_prov = extract_provenance_from_existing(data)
# Create consolidated provenance section
provenance_section = {
"provenance": {
"schema_version": "1.0.0",
"generated_at": datetime.now(timezone.utc).isoformat(),
"sources": existing_prov,
"data_tier_summary": {
"TIER_1_AUTHORITATIVE": ["original_entry (NDE CSV)"],
"TIER_2_VERIFIED": ["wikidata_api", "google_maps_api"],
"TIER_3_CROWD_SOURCED": [],
"TIER_4_INFERRED": ["website_scrape", "exa_web_search"],
},
"notes": [
"Provenance tracking added retroactively",
"claim_level_provenance available in sources section",
]
}
}
# Insert provenance section after entry_index
new_data = {}
for key, value in data.items():
new_data[key] = value
if key == "enrichment_status":
new_data["provenance"] = provenance_section["provenance"]
# If enrichment_status wasn't found, add at end
if "provenance" not in new_data:
new_data["provenance"] = provenance_section["provenance"]
return new_data
def has_provenance(data: Dict[str, Any]) -> bool:
"""Check if file already has provenance section."""
return "provenance" in data and "schema_version" in data.get("provenance", {})
def process_file(filepath: Path, dry_run: bool = False) -> bool:
"""
Process a single YAML file and add provenance if needed.
Returns True if file was updated.
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data is None:
print(f" Skipping empty file: {filepath.name}")
return False
if has_provenance(data):
print(f" Already has provenance: {filepath.name}")
return False
# Check if it's an enriched file (has any enrichment data)
if not any(key in data for key in ["wikidata_enrichment", "google_maps_enrichment", "organization_details", "website_enrichment", "exa_enrichment"]):
print(f" Not enriched, skipping: {filepath.name}")
return False
# Add provenance
new_data = add_provenance_section(data)
if dry_run:
print(f" Would update: {filepath.name}")
return True
# Write updated file
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(new_data, f, Dumper=PreservingDumper,
allow_unicode=True, default_flow_style=False, sort_keys=False)
print(f" Updated: {filepath.name}")
return True
except Exception as e:
print(f" Error processing {filepath.name}: {e}")
return False
def main():
import argparse
parser = argparse.ArgumentParser(description="Add provenance to enriched NDE YAML files")
parser.add_argument("--dry-run", action="store_true", help="Don't actually modify files")
parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process")
parser.add_argument("--pattern", type=str, default="*_Q*.yaml", help="File pattern to match")
args = parser.parse_args()
entries_dir = Path("/Users/kempersc/apps/glam/data/nde/enriched/entries")
if not entries_dir.exists():
print(f"Error: Directory not found: {entries_dir}")
sys.exit(1)
# Find all enriched files
files = sorted(entries_dir.glob(args.pattern))
if args.limit:
files = files[:args.limit]
print(f"Found {len(files)} files matching pattern '{args.pattern}'")
print(f"Dry run: {args.dry_run}")
print()
updated = 0
skipped = 0
errors = 0
for filepath in files:
result = process_file(filepath, dry_run=args.dry_run)
if result:
updated += 1
else:
skipped += 1
print()
print(f"Summary:")
print(f" Updated: {updated}")
print(f" Skipped: {skipped}")
print(f" Total: {len(files)}")
if __name__ == "__main__":
main()