#!/usr/bin/env python3 """ Fix malformed ghcid_history entries in YAML files. Version 2: More robust parsing and reconstruction. """ import re import sys from pathlib import Path def fix_ghcid_history_section(content: str) -> str: """Fix the ghcid_history section of a YAML file.""" # Find the ghcid_history section history_match = re.search(r'(\s*)ghcid_history:\s*\n', content) if not history_match: return content # No ghcid_history section base_indent = history_match.group(1) list_indent = base_indent + " " item_indent = list_indent + " " # Find the end of ghcid_history section (next top-level key at same or less indent) start_pos = history_match.end() # Find where ghcid_history ends by looking for next key at same level remaining = content[start_pos:] # Match pattern for next section at base_indent level or less end_pattern = re.compile(rf'^{base_indent}[a-z_]+:', re.MULTILINE) end_match = end_pattern.search(remaining) if end_match: history_section = remaining[:end_match.start()] after_section = remaining[end_match.start():] else: history_section = remaining after_section = "" # Parse all history entries from the section # They might be concatenated on one line or split incorrectly # Extract all ghcid entries - they have pattern: ghcid: entry_pattern = re.compile( r'(?:^|\s*-\s*)ghcid:\s*(\S+).*?' r'(?:valid_from:\s*[\'"]?([^\'"]+)[\'"]?)?\s*' r'(?:ghcid_numeric:\s*(\d+))?\s*' r'(?:reason:\s*([^\n]+))?', re.DOTALL ) # Simpler approach: Just extract key-value pairs entries = [] current_entry = {} # Split by potential entry boundaries and reconstruct lines = history_section.split('\n') for line in lines: stripped = line.strip() if not stripped: continue # Start of new entry if stripped.startswith('- ghcid:') or (stripped.startswith('ghcid:') and not current_entry): if current_entry: entries.append(current_entry) value = stripped.replace('- ghcid:', '').replace('ghcid:', '').strip() current_entry = {'ghcid': value} elif stripped.startswith('- ') and ':' in stripped[2:]: # This might be a new entry starting with a different key if current_entry: entries.append(current_entry) # Parse the key-value key_val = stripped[2:] if ':' in key_val: key, val = key_val.split(':', 1) current_entry = {key.strip(): val.strip().strip("'\"")} elif ':' in stripped: # It's a key-value pair for current entry key, val = stripped.split(':', 1) key = key.strip().replace('- ', '') val = val.strip().strip("'\"") if key and val: current_entry[key] = val if current_entry: entries.append(current_entry) # Deduplicate entries by ghcid + valid_from seen = set() unique_entries = [] for entry in entries: key = (entry.get('ghcid', ''), entry.get('valid_from', '')) if key not in seen and entry.get('ghcid'): seen.add(key) unique_entries.append(entry) # Reconstruct the section properly new_history = f"{base_indent}ghcid_history:\n" for entry in unique_entries: new_history += f"{list_indent}- ghcid: {entry.get('ghcid', '')}\n" if 'valid_from' in entry: new_history += f"{item_indent}valid_from: '{entry['valid_from']}'\n" if 'ghcid_numeric' in entry: new_history += f"{item_indent}ghcid_numeric: {entry['ghcid_numeric']}\n" if 'reason' in entry: # Escape colons in reason text by quoting reason = entry['reason'] if ':' in reason and not reason.startswith('"') and not reason.startswith("'"): reason = f'"{reason}"' new_history += f"{item_indent}reason: {reason}\n" # Rebuild the content before_section = content[:history_match.start()] + history_match.group(0).rstrip() + '\n' return before_section + new_history + after_section def process_file(filepath: Path, dry_run: bool = False) -> bool: """Process a single file.""" with open(filepath, 'r', encoding='utf-8') as f: original = f.read() fixed = fix_ghcid_history_section(original) if fixed != original: if dry_run: print(f"Would fix: {filepath.name}") else: with open(filepath, 'w', encoding='utf-8') as f: f.write(fixed) print(f"Fixed: {filepath.name}") return True return False def main(): dry_run = '--dry-run' in sys.argv custodian_dir = Path('data/custodian') fixed_count = 0 for prefix in ['BE-', 'EG-']: for yaml_file in sorted(custodian_dir.glob(f'{prefix}*.yaml')): if process_file(yaml_file, dry_run): fixed_count += 1 print(f"\n{'Would fix' if dry_run else 'Fixed'}: {fixed_count} files") if __name__ == '__main__': main()