376 lines
14 KiB
Python
Executable file
376 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Link Person Observations to Custodian YAML Files
|
|
|
|
This script reads parsed staff files and links them to custodian YAML files
|
|
by adding person_observations sections with references to person entity files.
|
|
|
|
Web claims are stored in person entity files (single source of truth for person data).
|
|
Custodian files only store affiliation provenance (when/how person was associated).
|
|
|
|
Usage:
|
|
python scripts/link_person_observations.py \
|
|
--staff-file data/custodian/person/affiliated/parsed/nationaal-archief_staff_*.json \
|
|
--custodian-file data/custodian/NL-ZH-DHA-A-NA.yaml \
|
|
--dry-run
|
|
|
|
Rules Applied:
|
|
- Rule 5: Additive only - never delete enriched data
|
|
- Rule 12: Person data reference pattern (file paths, not inline duplication)
|
|
- Rule 20: Person entity profiles stored individually
|
|
- Rule 26: Person Data Provenance - web claims stored in entity files
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import glob
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
# Custom YAML representer for multiline strings
|
|
def str_representer(dumper, data):
|
|
if '\n' in data:
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
yaml.add_representer(str, str_representer)
|
|
|
|
|
|
def find_entity_file(linkedin_slug: str, entity_dir: Path) -> tuple[str | None, Path | None]:
|
|
"""Find the entity file for a LinkedIn slug.
|
|
|
|
Returns tuple of (relative path from project root, absolute path) or (None, None) if not found.
|
|
"""
|
|
pattern = str(entity_dir / f"{linkedin_slug}_*.json")
|
|
matches = glob.glob(pattern)
|
|
|
|
if not matches:
|
|
return None, None
|
|
|
|
# If multiple matches, take the most recent (sorted by filename which includes timestamp)
|
|
matches.sort(reverse=True)
|
|
abs_path = Path(matches[0])
|
|
|
|
# Return path relative to project root
|
|
rel_path = os.path.relpath(matches[0], entity_dir.parent.parent.parent.parent)
|
|
return rel_path, abs_path
|
|
|
|
|
|
def load_staff_file(staff_file: Path) -> dict:
|
|
"""Load and parse staff JSON file."""
|
|
with open(staff_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def load_custodian_file(custodian_file: Path) -> dict:
|
|
"""Load custodian YAML file."""
|
|
with open(custodian_file, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
def update_entity_file_with_claims(
|
|
entity_path: Path,
|
|
staff_member: dict,
|
|
custodian_name: str,
|
|
custodian_slug: str,
|
|
timestamp: str,
|
|
dry_run: bool = True
|
|
) -> bool:
|
|
"""Add web_claims and affiliation to person entity file.
|
|
|
|
Returns True if update was successful.
|
|
"""
|
|
if not entity_path or not entity_path.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(entity_path, 'r', encoding='utf-8') as f:
|
|
entity_data = json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
print(f" WARNING: Could not read entity file {entity_path}: {e}")
|
|
return False
|
|
|
|
# Initialize web_claims if not present
|
|
if 'web_claims' not in entity_data:
|
|
entity_data['web_claims'] = []
|
|
|
|
# Check if we already have claims from this source
|
|
source_url = staff_member.get('linkedin_profile_url', '')
|
|
existing_sources = {c.get('source_url') for c in entity_data['web_claims']}
|
|
|
|
if source_url and source_url not in existing_sources:
|
|
# Add name claim
|
|
entity_data['web_claims'].append({
|
|
'claim_type': 'full_name',
|
|
'claim_value': staff_member['name'],
|
|
'source_url': source_url,
|
|
'retrieved_on': timestamp,
|
|
'retrieval_agent': 'linkedin_html_parser',
|
|
})
|
|
|
|
# Add role/headline claim if present
|
|
if staff_member.get('headline'):
|
|
entity_data['web_claims'].append({
|
|
'claim_type': 'role_title',
|
|
'claim_value': staff_member['headline'],
|
|
'source_url': source_url,
|
|
'retrieved_on': timestamp,
|
|
'retrieval_agent': 'linkedin_html_parser',
|
|
})
|
|
|
|
# Initialize affiliations if not present
|
|
if 'affiliations' not in entity_data:
|
|
entity_data['affiliations'] = []
|
|
|
|
# Check if this affiliation already exists
|
|
existing_affiliations = {
|
|
(a.get('custodian_slug'), a.get('role_title'))
|
|
for a in entity_data['affiliations']
|
|
}
|
|
|
|
affiliation_key = (custodian_slug, staff_member.get('headline', ''))
|
|
if affiliation_key not in existing_affiliations:
|
|
entity_data['affiliations'].append({
|
|
'custodian_name': custodian_name,
|
|
'custodian_slug': custodian_slug,
|
|
'role_title': staff_member.get('headline', ''),
|
|
'heritage_relevant': staff_member.get('heritage_relevant', False),
|
|
'heritage_type': staff_member.get('heritage_type'),
|
|
'current': True,
|
|
'observed_on': timestamp,
|
|
'source_url': f"https://www.linkedin.com/company/{custodian_slug}/people/",
|
|
})
|
|
|
|
if dry_run:
|
|
return True
|
|
|
|
# Write updated entity file
|
|
try:
|
|
with open(entity_path, 'w', encoding='utf-8') as f:
|
|
json.dump(entity_data, f, indent=2, ensure_ascii=False)
|
|
return True
|
|
except IOError as e:
|
|
print(f" WARNING: Could not write entity file {entity_path}: {e}")
|
|
return False
|
|
|
|
|
|
def generate_person_observations(
|
|
staff_data: dict,
|
|
entity_dir: Path,
|
|
custodian_slug: str,
|
|
custodian_name: str,
|
|
dry_run: bool = True
|
|
) -> dict:
|
|
"""Generate person_observations section from staff data.
|
|
|
|
Web claims are stored in person entity files.
|
|
Custodian file only stores affiliation provenance.
|
|
|
|
Returns a dict with observation_metadata and staff list.
|
|
"""
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
observations = {
|
|
'observation_metadata': {
|
|
'retrieval_agent': 'linkedin_html_parser',
|
|
'retrieval_timestamp': timestamp,
|
|
'source_url': f"https://www.linkedin.com/company/{staff_data['custodian_metadata'].get('custodian_slug', custodian_slug)}/people/",
|
|
'html_file': None, # Not archived for this extraction
|
|
'staff_count_total': staff_data['custodian_metadata'].get('associated_members', len(staff_data['staff'])),
|
|
'staff_count_extracted': len(staff_data['staff']),
|
|
'staff_count_with_linkedin': sum(1 for s in staff_data['staff'] if s.get('linkedin_slug')),
|
|
'staff_count_with_entity_file': 0, # Will be updated below
|
|
},
|
|
'staff': []
|
|
}
|
|
|
|
entity_file_count = 0
|
|
entity_files_updated = 0
|
|
|
|
for staff_member in staff_data['staff']:
|
|
# Skip if name looks like company name (first entry often is)
|
|
if staff_member['name'] == staff_data['custodian_metadata'].get('custodian_name'):
|
|
continue
|
|
|
|
# Skip anonymous/unknown entries
|
|
if staff_member.get('name_type') == 'anonymous' or not staff_member.get('name'):
|
|
continue
|
|
|
|
# Basic person entry with affiliation provenance only
|
|
person_entry = {
|
|
'person_id': staff_member['staff_id'],
|
|
'person_name': staff_member['name'],
|
|
'role_title': staff_member.get('headline', ''),
|
|
'heritage_relevant': staff_member.get('heritage_relevant', False),
|
|
'heritage_type': staff_member.get('heritage_type'),
|
|
'current': True, # From current LinkedIn data
|
|
# Affiliation provenance
|
|
'affiliation_provenance': {
|
|
'source_url': f"https://www.linkedin.com/company/{custodian_slug}/people/",
|
|
'retrieved_on': timestamp,
|
|
'retrieval_agent': 'linkedin_html_parser',
|
|
}
|
|
}
|
|
|
|
# Add LinkedIn URL if available
|
|
if staff_member.get('linkedin_profile_url'):
|
|
person_entry['linkedin_profile_url'] = staff_member['linkedin_profile_url']
|
|
|
|
# Find and link entity file if LinkedIn slug exists
|
|
if staff_member.get('linkedin_slug'):
|
|
rel_path, abs_path = find_entity_file(staff_member['linkedin_slug'], entity_dir)
|
|
if rel_path and abs_path:
|
|
person_entry['linkedin_profile_path'] = rel_path
|
|
entity_file_count += 1
|
|
|
|
# Update entity file with web_claims and affiliation
|
|
if update_entity_file_with_claims(
|
|
abs_path,
|
|
staff_member,
|
|
custodian_name,
|
|
custodian_slug,
|
|
timestamp,
|
|
dry_run
|
|
):
|
|
entity_files_updated += 1
|
|
|
|
observations['staff'].append(person_entry)
|
|
|
|
observations['observation_metadata']['staff_count_with_entity_file'] = entity_file_count
|
|
observations['observation_metadata']['entity_files_updated'] = entity_files_updated
|
|
|
|
return observations
|
|
|
|
|
|
def update_custodian_file(
|
|
custodian_file: Path,
|
|
person_observations: dict,
|
|
dry_run: bool = True
|
|
) -> bool:
|
|
"""Update custodian YAML file with person_observations.
|
|
|
|
Returns True if update was successful (or would be in dry-run).
|
|
"""
|
|
# Load existing content
|
|
with open(custodian_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
data = yaml.safe_load(content)
|
|
|
|
# Check if person_observations already exists
|
|
if 'person_observations' in data:
|
|
print(f" WARNING: person_observations already exists in {custodian_file}")
|
|
print(f" Existing staff count: {len(data['person_observations'].get('staff', []))}")
|
|
print(f" New staff count: {len(person_observations['staff'])}")
|
|
if not dry_run:
|
|
response = input(" Overwrite? [y/N]: ").strip().lower()
|
|
if response != 'y':
|
|
print(" Skipping.")
|
|
return False
|
|
|
|
# Add person_observations
|
|
data['person_observations'] = person_observations
|
|
|
|
if dry_run:
|
|
print(f"\n DRY RUN - Would update {custodian_file}")
|
|
print(f" Staff entries: {len(person_observations['staff'])}")
|
|
print(f" With entity files: {person_observations['observation_metadata']['staff_count_with_entity_file']}")
|
|
print(f" Entity files would be updated: {person_observations['observation_metadata'].get('entity_files_updated', 0)}")
|
|
return True
|
|
|
|
# Write updated file
|
|
with open(custodian_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False, width=120)
|
|
|
|
print(f" Updated {custodian_file}")
|
|
print(f" Staff entries: {len(person_observations['staff'])}")
|
|
print(f" With entity files: {person_observations['observation_metadata']['staff_count_with_entity_file']}")
|
|
print(f" Entity files updated: {person_observations['observation_metadata'].get('entity_files_updated', 0)}")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Link person observations to custodian YAML files'
|
|
)
|
|
parser.add_argument(
|
|
'--staff-file',
|
|
required=True,
|
|
help='Path to parsed staff JSON file (supports glob patterns)'
|
|
)
|
|
parser.add_argument(
|
|
'--custodian-file',
|
|
required=True,
|
|
help='Path to custodian YAML file to update'
|
|
)
|
|
parser.add_argument(
|
|
'--entity-dir',
|
|
default='data/custodian/person/entity',
|
|
help='Directory containing person entity files'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Show what would be done without making changes'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Resolve paths
|
|
project_root = Path(__file__).parent.parent
|
|
|
|
# Find staff file (supports glob)
|
|
staff_files = glob.glob(args.staff_file)
|
|
if not staff_files:
|
|
print(f"ERROR: No staff file found matching: {args.staff_file}")
|
|
sys.exit(1)
|
|
|
|
staff_file = Path(staff_files[0]) # Take first match
|
|
if len(staff_files) > 1:
|
|
print(f" Note: Multiple staff files found, using: {staff_file}")
|
|
|
|
custodian_file = Path(args.custodian_file)
|
|
entity_dir = project_root / args.entity_dir
|
|
|
|
# Validate files exist
|
|
if not staff_file.exists():
|
|
print(f"ERROR: Staff file not found: {staff_file}")
|
|
sys.exit(1)
|
|
|
|
if not custodian_file.exists():
|
|
print(f"ERROR: Custodian file not found: {custodian_file}")
|
|
sys.exit(1)
|
|
|
|
if not entity_dir.exists():
|
|
print(f"ERROR: Entity directory not found: {entity_dir}")
|
|
sys.exit(1)
|
|
|
|
print(f"Processing: {staff_file.name}")
|
|
print(f"Target: {custodian_file.name}")
|
|
print(f"Entity dir: {entity_dir}")
|
|
|
|
# Load staff data
|
|
staff_data = load_staff_file(staff_file)
|
|
custodian_slug = staff_data['custodian_metadata'].get('custodian_slug', '')
|
|
custodian_name = staff_data['custodian_metadata'].get('custodian_name', '')
|
|
|
|
print(f"\nStaff file stats:")
|
|
print(f" Total staff: {len(staff_data['staff'])}")
|
|
print(f" With LinkedIn: {sum(1 for s in staff_data['staff'] if s.get('linkedin_slug'))}")
|
|
|
|
# Generate person_observations (also updates entity files)
|
|
observations = generate_person_observations(
|
|
staff_data, entity_dir, custodian_slug, custodian_name, dry_run=args.dry_run
|
|
)
|
|
|
|
# Update custodian file
|
|
update_custodian_file(custodian_file, observations, dry_run=args.dry_run)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|