glam/scripts/merge_linkedin_to_custodians.py
2025-12-16 20:27:39 +01:00

278 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""
Merge LinkedIn custodian data into existing NL-* custodian files.
This script:
1. Loads all LinkedIn custodian YAML files from data/custodian/linkedin/
2. Matches them to existing NL-* files via:
a. LinkedIn slug (from pre-built index)
b. Name similarity (from _name_matches.json)
3. Adds linkedin_enrichment section to matched files (ADDITIVE ONLY per Rule 5)
4. Reports unmatched LinkedIn records for later processing
Usage:
python scripts/merge_linkedin_to_custodians.py [--dry-run] [--limit N]
python scripts/merge_linkedin_to_custodians.py --use-name-matches [--min-score 80]
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
import yaml
# Project root
PROJECT_ROOT = Path(__file__).parent.parent
LINKEDIN_DIR = PROJECT_ROOT / "data" / "custodian" / "linkedin"
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
INDEX_FILE = LINKEDIN_DIR / "_nl_file_index.json"
NAME_MATCHES_FILE = LINKEDIN_DIR / "_name_matches.json"
def load_yaml(filepath: Path) -> dict:
"""Load a YAML file."""
with open(filepath, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def save_yaml(filepath: Path, data: dict) -> None:
"""Save data to a YAML file with nice formatting."""
with open(filepath, "w", encoding="utf-8") as f:
yaml.dump(
data,
f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
width=120,
)
def load_name_matches(min_score: int = 80) -> dict[str, Path]:
"""Load name-based matches from _name_matches.json.
Args:
min_score: Minimum similarity score (0-100) to include a match.
Returns:
Dict mapping LinkedIn slug to NL-* file path.
"""
if not NAME_MATCHES_FILE.exists():
return {}
with open(NAME_MATCHES_FILE, "r") as f:
data = json.load(f)
matches = {}
for match in data.get("matches", []):
if match.get("score", 0) >= min_score:
slug = match["linkedin_slug"]
nl_file = Path(match["nl_file"])
matches[slug] = nl_file
return matches
def load_nl_file_index() -> dict[str, Path]:
"""Load pre-built index mapping LinkedIn slugs to NL-* custodian files.
Run scripts/build_linkedin_index.py first to create this index.
"""
print("Loading NL-* file index...")
if not INDEX_FILE.exists():
print(f" ERROR: Index file not found: {INDEX_FILE}")
print(" Run: python scripts/build_linkedin_index.py")
sys.exit(1)
with open(INDEX_FILE, "r") as f:
raw_index = json.load(f)
# Convert relative paths to Path objects
index = {slug: PROJECT_ROOT / path for slug, path in raw_index.items()}
print(f" Loaded {len(index)} LinkedIn slug mappings")
return index
def load_linkedin_custodians() -> list[tuple[Path, dict]]:
"""Load all LinkedIn custodian YAML files."""
print("Loading LinkedIn custodian files...")
custodians = []
for filepath in LINKEDIN_DIR.glob("*.yaml"):
if filepath.name.startswith("_"):
continue # Skip excluded files
try:
data = load_yaml(filepath)
custodians.append((filepath, data))
except Exception as e:
print(f"Warning: Error loading {filepath}: {e}", file=sys.stderr)
print(f" Loaded {len(custodians)} LinkedIn custodians")
return custodians
def create_linkedin_enrichment(linkedin_data: dict, source_file: Path) -> dict:
"""Create linkedin_enrichment section from LinkedIn data."""
enrichment = {
"linkedin_url": linkedin_data.get("linkedin_url"),
"linkedin_slug": linkedin_data.get("linkedin_slug"),
"industry": linkedin_data.get("industry"),
"follower_count": linkedin_data.get("follower_count"),
"staff_count": linkedin_data.get("staff_count"),
"heritage_staff_count": linkedin_data.get("heritage_staff_count"),
"enrichment_timestamp": datetime.now(timezone.utc).isoformat(),
"provenance": {
"source": "linkedin_company_scrape",
"original_file": str(source_file.relative_to(PROJECT_ROOT)),
"schema_version": linkedin_data.get("provenance", {}).get("schema_version", "1.0.0"),
"generated_at": linkedin_data.get("provenance", {}).get("generated_at"),
},
}
# Add heritage_staff if present
if heritage_staff := linkedin_data.get("heritage_staff"):
enrichment["heritage_staff"] = heritage_staff
# Remove None values
enrichment = {k: v for k, v in enrichment.items() if v is not None}
return enrichment
def merge_linkedin_data(
nl_filepath: Path,
linkedin_data: dict,
linkedin_filepath: Path,
dry_run: bool = False,
) -> bool:
"""Merge LinkedIn data into existing NL-* file."""
try:
nl_data = load_yaml(nl_filepath)
# Check if already has linkedin_enrichment
if "linkedin_enrichment" in nl_data:
existing_slug = nl_data["linkedin_enrichment"].get("linkedin_slug")
new_slug = linkedin_data.get("linkedin_slug")
if existing_slug == new_slug:
print(f" Skipping {nl_filepath.name}: already has linkedin_enrichment for {new_slug}")
return False
# Create enrichment section
enrichment = create_linkedin_enrichment(linkedin_data, linkedin_filepath)
# Add to NL data (ADDITIVE - per Rule 5)
nl_data["linkedin_enrichment"] = enrichment
if dry_run:
print(f" [DRY-RUN] Would add linkedin_enrichment to {nl_filepath.name}")
print(f" LinkedIn: {linkedin_data.get('name')} ({linkedin_data.get('linkedin_slug')})")
staff_count = linkedin_data.get("heritage_staff_count", 0)
print(f" Staff: {staff_count} heritage-relevant")
else:
save_yaml(nl_filepath, nl_data)
print(f" Merged: {linkedin_filepath.name} -> {nl_filepath.name}")
return True
except Exception as e:
print(f" Error merging {linkedin_filepath.name}: {e}", file=sys.stderr)
return False
def main():
parser = argparse.ArgumentParser(description="Merge LinkedIn custodian data into NL-* files")
parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing")
parser.add_argument("--limit", type=int, help="Limit number of merges for testing")
parser.add_argument("--use-name-matches", action="store_true",
help="Include name-based matches from _name_matches.json")
parser.add_argument("--min-score", type=int, default=80,
help="Minimum name match score (0-100, default: 80)")
args = parser.parse_args()
print("=" * 60)
print("LinkedIn Custodian Merge Script")
print("=" * 60)
if args.dry_run:
print("*** DRY RUN MODE - No files will be modified ***\n")
# Load pre-built index of LinkedIn slugs in existing NL-* files
nl_index = load_nl_file_index()
# Optionally add name-based matches
if args.use_name_matches:
name_matches = load_name_matches(args.min_score)
print(f" Adding {len(name_matches)} name-based matches (score >= {args.min_score})")
# Name matches supplement slug matches (slug matches take priority)
for slug, path in name_matches.items():
if slug not in nl_index:
nl_index[slug] = path
print(f" Total mappings after merge: {len(nl_index)}")
# Load LinkedIn custodians
linkedin_custodians = load_linkedin_custodians()
# Match and merge
print("\nMatching and merging...")
matched = 0
merged = 0
unmatched = []
already_enriched = 0
for linkedin_path, linkedin_data in linkedin_custodians:
if args.limit and merged >= args.limit:
print(f"\nReached limit of {args.limit} merges")
break
slug = linkedin_data.get("linkedin_slug") or linkedin_path.stem
if slug in nl_index:
matched += 1
nl_path = nl_index[slug]
if merge_linkedin_data(nl_path, linkedin_data, linkedin_path, args.dry_run):
merged += 1
else:
already_enriched += 1
else:
unmatched.append((linkedin_path, linkedin_data))
# Report
print("\n" + "=" * 60)
print("MERGE SUMMARY")
print("=" * 60)
print(f"Total LinkedIn custodians: {len(linkedin_custodians)}")
print(f"Matched to NL-* files: {matched}")
print(f"Successfully merged: {merged}")
print(f"Already enriched (skipped): {already_enriched}")
print(f"Unmatched (need new files): {len(unmatched)}")
if unmatched:
print("\n--- Top 20 Unmatched LinkedIn Custodians ---")
for linkedin_path, linkedin_data in unmatched[:20]:
name = linkedin_data.get("name", "Unknown")
slug = linkedin_data.get("linkedin_slug", linkedin_path.stem)
location = linkedin_data.get("location", {})
city = location.get("city", "?")
country = location.get("country", "?")
print(f" {slug}: {name} ({city}, {country})")
if len(unmatched) > 20:
print(f" ... and {len(unmatched) - 20} more")
# Save unmatched list for later processing
unmatched_file = PROJECT_ROOT / "data" / "custodian" / "linkedin" / "_unmatched.txt"
with open(unmatched_file, "w") as f:
for linkedin_path, linkedin_data in unmatched:
f.write(f"{linkedin_path.stem}\n")
print(f"\nUnmatched slugs saved to: {unmatched_file.relative_to(PROJECT_ROOT)}")
return 0 if merged > 0 or args.dry_run else 1
if __name__ == "__main__":
sys.exit(main())