#!/usr/bin/env python3 """ Re-process LinkedIn profiles that have exa_raw_response but are missing pico_structured. This script: 1. Scans entity directory for profiles with exa_raw_response but no pico_structured 2. Calls GLM-4.6 with the LinkedIn-specific PiCo prompt to structure the data 3. Updates the profile files in-place with pico_structured data Usage: python scripts/reprocess_pico_structured.py [--dry-run] [--limit N] [--verbose] """ import json import os import sys import re import argparse from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() import httpx import yaml from tqdm import tqdm # Path to PiCo integration module with GLM system prompt PICO_MODULE_PATH = Path("/Users/kempersc/apps/glam/data/entity_annotation/modules/integrations/pico.yaml") ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity") # Cache for system prompt _CACHED_LINKEDIN_PROMPT: Optional[str] = None def load_linkedin_system_prompt() -> Optional[str]: """Load the LinkedIn-specific system prompt from PiCo YAML module.""" try: if not PICO_MODULE_PATH.exists(): print(f"Error: PiCo module not found at {PICO_MODULE_PATH}") return None with open(PICO_MODULE_PATH, 'r', encoding='utf-8') as f: pico_module = yaml.safe_load(f) linkedin_prompt = pico_module.get('glm_annotator_config', {}).get('linkedin_system_prompt') if linkedin_prompt: return linkedin_prompt.strip() print("Warning: linkedin_system_prompt not found in PiCo module") return None except Exception as e: print(f"Error loading LinkedIn prompt: {e}") return None def get_linkedin_system_prompt() -> Optional[str]: """Get cached LinkedIn-specific system prompt.""" global _CACHED_LINKEDIN_PROMPT if _CACHED_LINKEDIN_PROMPT is None: _CACHED_LINKEDIN_PROMPT = load_linkedin_system_prompt() return _CACHED_LINKEDIN_PROMPT def _call_glm_api(user_message: str, system_prompt: str, zai_api_token: str, session: httpx.Client, timeout: float = 180.0, model: str = "glm-4.6") -> Tuple[Optional[str], Optional[str]]: """Make a single GLM API call. Returns: Tuple of (content_string, error_message) """ try: glm_response = session.post( "https://api.z.ai/api/coding/paas/v4/chat/completions", headers={ "Authorization": f"Bearer {zai_api_token}", "Content-Type": "application/json", }, json={ "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "temperature": 0.1 }, timeout=timeout ) if glm_response.status_code != 200: return None, f"GLM API error: {glm_response.status_code} - {glm_response.text[:200]}" result = glm_response.json() glm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '') if not glm_content: return None, f"Empty GLM response (model={model})" return glm_content, None except httpx.TimeoutException: return None, f"GLM request timed out ({timeout}s)" except httpx.RequestError as e: return None, f"HTTP error: {e}" except Exception as e: return None, f"Exception: {e}" def _parse_glm_json(glm_content: str) -> Tuple[Optional[Dict], Optional[str]]: """Parse JSON from GLM response content. Returns: Tuple of (parsed_dict, error_message) """ # First try: direct JSON parse try: structured = json.loads(glm_content) return structured, None except json.JSONDecodeError: pass # Second try: extract from markdown code block json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', glm_content) if json_match: try: structured = json.loads(json_match.group(1)) return structured, None except json.JSONDecodeError: pass # Third try: find JSON object in text json_match = re.search(r'\{[\s\S]*\}', glm_content) if json_match: try: structured = json.loads(json_match.group()) return structured, None except json.JSONDecodeError: pass return None, f"Could not parse GLM response as JSON (len={len(glm_content)})" def structure_with_pico_annotator( exa_content: str, url: str, session: httpx.Client, verbose: bool = False, max_retries: int = 3, base_delay: float = 5.0 ) -> Tuple[Optional[Dict], Optional[str]]: """Use GLM with LinkedIn-specific PiCo prompt to structure profile data. Implements exponential backoff retry logic to handle GLM API inconsistency: - Retry on empty responses - Retry on timeouts - Exponential backoff: 5s, 10s, 20s delays between retries - Falls back to glm-4.5 if glm-4.6 returns empty Returns: Tuple of (structured_data, error_message) - (dict, None) on success - (None, error_string) on failure """ import time zai_api_token = os.environ.get('ZAI_API_TOKEN') if not zai_api_token: return None, "ZAI_API_TOKEN not set" system_prompt = get_linkedin_system_prompt() if not system_prompt: return None, "Could not load LinkedIn system prompt" user_message = f"""Source Type: modern_digital Source URL: {url} Content: {exa_content[:12000]}""" if verbose: print(f" → Calling GLM with {len(exa_content)} chars of content...") last_error = None # Model fallback order: try glm-4.6 first, then glm-4.5 models = ["glm-4.6", "glm-4.5"] for model in models: for attempt in range(max_retries): if attempt > 0: delay = base_delay * (2 ** (attempt - 1)) # Exponential backoff: 5s, 10s, 20s if verbose: print(f" → Retry {attempt}/{max_retries-1} with {model} after {delay}s delay (last error: {last_error})") time.sleep(delay) elif model != models[0] and verbose: print(f" → Falling back to {model}...") # Increase timeout on retries timeout = 180.0 + (attempt * 60) # 180s, 240s, 300s glm_content, error = _call_glm_api(user_message, system_prompt, zai_api_token, session, timeout=timeout, model=model) if error: last_error = error # If empty response, try next model instead of retrying same model if "Empty GLM response" in error: break # Exit retry loop, move to next model # Retry on timeouts if "timed out" in error: continue # Don't retry on non-recoverable errors if "GLM API error: 4" in error: # 4xx errors return None, error continue if verbose: print(f" → {model} returned {len(glm_content or '')} chars") if not glm_content: last_error = "Empty content after successful API call" break # Try next model # Parse the response structured, parse_error = _parse_glm_json(glm_content) if parse_error: last_error = parse_error continue # Retry on parse failures (might be truncated response) # Success! return structured, None # All retries exhausted for all models return None, f"Failed after trying all models. Last error: {last_error}" def find_profiles_needing_pico(entity_dir: Path) -> List[Path]: """Find all profile files that have exa_raw_response but no pico_structured.""" profiles_to_process = [] for file_path in entity_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Check if it has exa_raw_response if not data.get('exa_raw_response'): continue # Check if it already has pico_structured if data.get('pico_structured'): continue # Check if exa_raw_response has content results = data['exa_raw_response'].get('results', []) if not results or not results[0].get('text'): continue profiles_to_process.append(file_path) except Exception as e: print(f"Error reading {file_path}: {e}") return profiles_to_process def process_profile( file_path: Path, session: httpx.Client, dry_run: bool = False, verbose: bool = False, max_retries: int = 3 ) -> Tuple[bool, str]: """Process a single profile file and add pico_structured. Returns: Tuple of (success, message) """ try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Extract content from Exa response results = data.get('exa_raw_response', {}).get('results', []) if not results: return False, "No results in exa_raw_response" exa_content = results[0].get('text', '') if not exa_content or len(exa_content) < 50: return False, f"Insufficient content (len={len(exa_content)})" # Get URL url = data.get('extraction_metadata', {}).get('linkedin_url', '') if not url: url = results[0].get('url', '') if dry_run: return True, f"Would process {len(exa_content)} chars from {url}" # Call GLM to structure with retry logic structured, error = structure_with_pico_annotator( exa_content, url, session, verbose=verbose, max_retries=max_retries ) if error: return False, error if not structured: return False, "Empty structured result" # Update the file data['pico_structured'] = structured # Update extraction method if 'extraction_metadata' in data: old_method = data['extraction_metadata'].get('extraction_method', '') if 'pico' not in old_method.lower(): data['extraction_metadata']['extraction_method'] = old_method + '+pico_glm_reprocessed' data['extraction_metadata']['pico_reprocess_date'] = datetime.now(timezone.utc).isoformat() # Write back with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) # Calculate heritage experience count persons = structured.get('persons', []) heritage_exp_count = 0 if persons: heritage_exp_count = len(persons[0].get('heritage_relevant_experience', [])) return True, f"Added pico_structured with {heritage_exp_count} heritage experiences" except Exception as e: return False, f"Exception: {e}" def main(): parser = argparse.ArgumentParser( description="Re-process profiles that have exa_raw_response but no pico_structured" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be processed without making changes" ) parser.add_argument( "--limit", type=int, default=None, help="Maximum number of profiles to process" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed progress" ) parser.add_argument( "--file", type=str, default=None, help="Process a specific file (full path or just the filename)" ) parser.add_argument( "--max-retries", type=int, default=3, help="Maximum number of retries per profile on GLM failures (default: 3)" ) parser.add_argument( "--delay", type=float, default=3.0, help="Delay between processing profiles in seconds (default: 3.0)" ) args = parser.parse_args() # Check for required API key if not args.dry_run and not os.environ.get('ZAI_API_TOKEN'): print("Error: ZAI_API_TOKEN environment variable not set") sys.exit(1) # Check for LinkedIn prompt prompt = get_linkedin_system_prompt() if not prompt: print("Error: Could not load LinkedIn system prompt from pico.yaml") sys.exit(1) print(f"✓ Loaded LinkedIn system prompt ({len(prompt)} chars)") print(f"✓ Using max {args.max_retries} retries per profile with exponential backoff") # Find or specify files to process if args.file: # Process specific file file_path = Path(args.file) if not file_path.is_absolute(): file_path = ENTITY_DIR / args.file if not file_path.exists(): print(f"Error: File not found: {file_path}") sys.exit(1) profiles_to_process = [file_path] else: # Find all profiles needing processing print(f"\nScanning {ENTITY_DIR} for profiles needing PiCo annotation...") profiles_to_process = find_profiles_needing_pico(ENTITY_DIR) print(f"Found {len(profiles_to_process)} profiles to process") if not profiles_to_process: print("No profiles need processing!") return # Apply limit if args.limit: profiles_to_process = profiles_to_process[:args.limit] print(f"Processing first {len(profiles_to_process)} profiles (--limit {args.limit})") if args.dry_run: print("\n[DRY RUN - no changes will be made]\n") # Process profiles success_count = 0 failed_count = 0 results = [] with httpx.Client(timeout=300.0) as session: for i, file_path in enumerate(tqdm(profiles_to_process, desc="Processing profiles")): success, message = process_profile( file_path, session, dry_run=args.dry_run, verbose=args.verbose, max_retries=args.max_retries ) filename = file_path.name if success: success_count += 1 results.append(f"✓ {filename}: {message}") if args.verbose: print(f" ✓ {filename}: {message}") else: failed_count += 1 results.append(f"✗ {filename}: {message}") if args.verbose: print(f" ✗ {filename}: {message}") # Rate limiting between profiles (not on dry run or last profile) if not args.dry_run and i < len(profiles_to_process) - 1: import time time.sleep(args.delay) # Print summary print("\n" + "="*60) print("RESULTS") print("="*60) print(f"Successful: {success_count}") print(f"Failed: {failed_count}") # Show failed profiles if failed_count > 0: print(f"\nFailed profiles:") for result in results: if result.startswith("✗"): print(f" {result}") # Show sample of successful profiles if success_count > 0 and not args.dry_run: print(f"\nSuccessfully processed profiles (showing first 5):") for result in [r for r in results if r.startswith("✓")][:5]: print(f" {result}") if __name__ == "__main__": main()