340 lines
12 KiB
Python
340 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test YAML integrity for hyponyms_curated.yaml
|
|
|
|
This script tests:
|
|
1. Q-number extraction completeness (all Q-numbers are captured)
|
|
2. Nested field preservation (rico, time fields are NOT corrupted)
|
|
|
|
Usage:
|
|
python scripts/test_yaml_integrity.py
|
|
"""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Set
|
|
import yaml
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
HYPONYMS_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/hyponyms_curated.yaml"
|
|
|
|
|
|
def extract_q_numbers_regex(yaml_path: Path) -> Set[str]:
|
|
"""
|
|
Extract Q-numbers using REGEX (current method in generate_gallery_query_with_exclusions.py).
|
|
|
|
Extracts from:
|
|
1. 'label:' fields - primary Q-number
|
|
2. 'duplicate:' fields - alternative Q-numbers
|
|
"""
|
|
with open(yaml_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
q_numbers = set()
|
|
|
|
# Pattern 1: Extract from "label: Q<digits>" lines
|
|
label_pattern = r'^\s*-?\s*label:\s+(Q\d+)'
|
|
|
|
for match in re.finditer(label_pattern, content, re.MULTILINE):
|
|
q_number = match.group(1).strip()
|
|
q_numbers.add(q_number)
|
|
|
|
# Pattern 2: Extract from "duplicate:" lists
|
|
duplicate_pattern = r'^\s+duplicate:\s*\n((?:\s+-\s+Q\d+\s*\n?)+)'
|
|
|
|
for match in re.finditer(duplicate_pattern, content, re.MULTILINE):
|
|
duplicate_block = match.group(1)
|
|
q_in_block = re.findall(r'Q\d+', duplicate_block)
|
|
q_numbers.update(q_in_block)
|
|
|
|
return q_numbers
|
|
|
|
|
|
def extract_q_numbers_comprehensive(yaml_path: Path) -> Set[str]:
|
|
"""
|
|
Extract ALL Q-numbers from file using comprehensive regex.
|
|
|
|
This catches Q-numbers in:
|
|
- label fields
|
|
- duplicate fields
|
|
- rico fields (nested)
|
|
- time fields (nested)
|
|
- Any other location
|
|
"""
|
|
with open(yaml_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Find ALL Q-numbers anywhere in the file
|
|
all_q_pattern = r'\bQ\d+\b'
|
|
all_q_numbers = set(re.findall(all_q_pattern, content))
|
|
|
|
return all_q_numbers
|
|
|
|
|
|
def extract_q_numbers_yaml_parse(yaml_path: Path) -> Set[str]:
|
|
"""
|
|
Extract Q-numbers by YAML parsing (most accurate but slower).
|
|
|
|
This walks through the entire YAML structure and extracts Q-numbers from:
|
|
- label fields at any nesting level
|
|
- Any string that starts with Q followed by digits
|
|
"""
|
|
with open(yaml_path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
q_numbers = set()
|
|
|
|
def extract_from_structure(obj, path=""):
|
|
"""Recursively extract Q-numbers from nested structures."""
|
|
if isinstance(obj, dict):
|
|
for key, value in obj.items():
|
|
new_path = f"{path}.{key}" if path else key
|
|
|
|
# Check if this is a label field with Q-number
|
|
if key == 'label' and isinstance(value, str) and value.startswith('Q'):
|
|
q_match = re.match(r'^(Q\d+)', value)
|
|
if q_match:
|
|
q_numbers.add(q_match.group(1))
|
|
|
|
# Recurse
|
|
extract_from_structure(value, new_path)
|
|
|
|
elif isinstance(obj, list):
|
|
for i, item in enumerate(obj):
|
|
new_path = f"{path}[{i}]"
|
|
extract_from_structure(item, new_path)
|
|
|
|
elif isinstance(obj, str):
|
|
# Check if string starts with Q followed by digits
|
|
q_match = re.match(r'^(Q\d+)', obj)
|
|
if q_match:
|
|
q_numbers.add(q_match.group(1))
|
|
|
|
# Extract from all sections
|
|
for section_name in ['hypernym', 'entity', 'entity_list', 'standard', 'collection', 'exclude']:
|
|
if section_name in data:
|
|
extract_from_structure(data[section_name], section_name)
|
|
|
|
return q_numbers
|
|
|
|
|
|
def check_nested_field_corruption(yaml_path: Path) -> dict:
|
|
"""
|
|
Check if nested fields (rico, time) are properly structured.
|
|
|
|
Returns:
|
|
Dict with corruption status and details
|
|
"""
|
|
with open(yaml_path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
results = {
|
|
'rico_fields': 0,
|
|
'rico_corrupted': 0,
|
|
'rico_examples': [],
|
|
'time_fields': 0,
|
|
'time_corrupted': 0,
|
|
'time_examples': []
|
|
}
|
|
|
|
def check_structure(obj, parent_label=None):
|
|
"""Recursively check for rico/time field corruption."""
|
|
if isinstance(obj, dict):
|
|
# Check for rico field
|
|
if 'rico' in obj:
|
|
results['rico_fields'] += 1
|
|
rico_value = obj['rico']
|
|
|
|
# Expected structure: list with dict containing 'label' key
|
|
# Example: [{'label': 'recordSetTypes'}]
|
|
if isinstance(rico_value, list) and len(rico_value) > 0:
|
|
if isinstance(rico_value[0], dict) and 'label' in rico_value[0]:
|
|
# Correct structure
|
|
pass
|
|
else:
|
|
# Corrupted structure
|
|
results['rico_corrupted'] += 1
|
|
results['rico_examples'].append({
|
|
'label': parent_label,
|
|
'structure': rico_value
|
|
})
|
|
else:
|
|
# Unexpected structure
|
|
results['rico_corrupted'] += 1
|
|
results['rico_examples'].append({
|
|
'label': parent_label,
|
|
'structure': rico_value
|
|
})
|
|
|
|
# Check for time field
|
|
if 'time' in obj:
|
|
results['time_fields'] += 1
|
|
time_value = obj['time']
|
|
|
|
# Expected structure: list with dict containing 'label' key
|
|
# Example: [{'label': 'Renaissance'}] or [{'label': '1960s', 'type': [{'label': 'start'}]}]
|
|
if isinstance(time_value, list) and len(time_value) > 0:
|
|
if isinstance(time_value[0], dict) and 'label' in time_value[0]:
|
|
# Correct structure
|
|
pass
|
|
else:
|
|
# Corrupted structure
|
|
results['time_corrupted'] += 1
|
|
results['time_examples'].append({
|
|
'label': parent_label,
|
|
'structure': time_value
|
|
})
|
|
else:
|
|
# Unexpected structure
|
|
results['time_corrupted'] += 1
|
|
results['time_examples'].append({
|
|
'label': parent_label,
|
|
'structure': time_value
|
|
})
|
|
|
|
# Get parent label
|
|
current_label = obj.get('label', parent_label)
|
|
|
|
# Recurse
|
|
for key, value in obj.items():
|
|
check_structure(value, current_label)
|
|
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
check_structure(item, parent_label)
|
|
|
|
# Check all sections
|
|
for section_name in ['hypernym', 'entity', 'entity_list', 'standard', 'collection']:
|
|
if section_name in data:
|
|
check_structure(data[section_name])
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
"""Main test function."""
|
|
print("=" * 70)
|
|
print("YAML INTEGRITY TEST - hyponyms_curated.yaml")
|
|
print("=" * 70)
|
|
|
|
if not HYPONYMS_FILE.exists():
|
|
print(f"❌ File not found: {HYPONYMS_FILE}")
|
|
return
|
|
|
|
print(f"\n📂 Testing: {HYPONYMS_FILE.name}\n")
|
|
|
|
# Test 1: Q-number extraction completeness
|
|
print("=" * 70)
|
|
print("TEST 1: Q-NUMBER EXTRACTION COMPLETENESS")
|
|
print("=" * 70)
|
|
|
|
print("\n⏳ Extracting Q-numbers using different methods...\n")
|
|
|
|
regex_q_numbers = extract_q_numbers_regex(HYPONYMS_FILE)
|
|
comprehensive_q_numbers = extract_q_numbers_comprehensive(HYPONYMS_FILE)
|
|
yaml_parse_q_numbers = extract_q_numbers_yaml_parse(HYPONYMS_FILE)
|
|
|
|
print(f"Method 1 (REGEX - current): {len(regex_q_numbers)} Q-numbers")
|
|
print(f"Method 2 (COMPREHENSIVE): {len(comprehensive_q_numbers)} Q-numbers")
|
|
print(f"Method 3 (YAML PARSE): {len(yaml_parse_q_numbers)} Q-numbers")
|
|
|
|
# Find missing Q-numbers
|
|
missing_in_regex = comprehensive_q_numbers - regex_q_numbers
|
|
missing_in_yaml_parse = comprehensive_q_numbers - yaml_parse_q_numbers
|
|
|
|
print(f"\n📊 Analysis:")
|
|
print(f" Missing in REGEX method: {len(missing_in_regex)} Q-numbers")
|
|
print(f" Missing in YAML PARSE method: {len(missing_in_yaml_parse)} Q-numbers")
|
|
|
|
if missing_in_regex:
|
|
print(f"\n⚠️ WARNING: REGEX method is INCOMPLETE!")
|
|
print(f" Missing Q-numbers (first 20): {sorted(list(missing_in_regex))[:20]}")
|
|
|
|
# Investigate where missing Q-numbers appear
|
|
with open(HYPONYMS_FILE, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
print(f"\n🔍 Where do missing Q-numbers appear?")
|
|
for q in sorted(list(missing_in_regex))[:5]:
|
|
lines = [line for line in content.split('\n') if q in line]
|
|
if lines:
|
|
print(f"\n {q}:")
|
|
for line in lines[:2]:
|
|
print(f" {line.strip()}")
|
|
|
|
if missing_in_yaml_parse:
|
|
print(f"\n⚠️ WARNING: YAML PARSE method is INCOMPLETE!")
|
|
print(f" Missing Q-numbers (first 20): {sorted(list(missing_in_yaml_parse))[:20]}")
|
|
|
|
if not missing_in_regex and not missing_in_yaml_parse:
|
|
print(f"\n✅ All methods extracted the same Q-numbers")
|
|
|
|
# Test 2: Nested field corruption check
|
|
print("\n" + "=" * 70)
|
|
print("TEST 2: NESTED FIELD CORRUPTION (rico, time)")
|
|
print("=" * 70)
|
|
|
|
print("\n⏳ Checking nested field structures...\n")
|
|
|
|
corruption_results = check_nested_field_corruption(HYPONYMS_FILE)
|
|
|
|
print(f"Rico fields found: {corruption_results['rico_fields']}")
|
|
print(f"Rico fields corrupted: {corruption_results['rico_corrupted']}")
|
|
|
|
if corruption_results['rico_corrupted'] > 0:
|
|
print(f"\n❌ CORRUPTION DETECTED in rico fields!")
|
|
print(f" Examples (first 3):")
|
|
for ex in corruption_results['rico_examples'][:3]:
|
|
print(f" Label: {ex['label']}")
|
|
print(f" Structure: {ex['structure']}")
|
|
else:
|
|
print(f"✅ All rico fields are correctly structured")
|
|
|
|
print(f"\nTime fields found: {corruption_results['time_fields']}")
|
|
print(f"Time fields corrupted: {corruption_results['time_corrupted']}")
|
|
|
|
if corruption_results['time_corrupted'] > 0:
|
|
print(f"\n❌ CORRUPTION DETECTED in time fields!")
|
|
print(f" Examples (first 3):")
|
|
for ex in corruption_results['time_examples'][:3]:
|
|
print(f" Label: {ex['label']}")
|
|
print(f" Structure: {ex['structure']}")
|
|
else:
|
|
print(f"✅ All time fields are correctly structured")
|
|
|
|
# Summary
|
|
print("\n" + "=" * 70)
|
|
print("TEST SUMMARY")
|
|
print("=" * 70)
|
|
|
|
issues_found = []
|
|
|
|
if missing_in_regex:
|
|
issues_found.append(f"Q-number extraction incomplete ({len(missing_in_regex)} missing)")
|
|
|
|
if corruption_results['rico_corrupted'] > 0:
|
|
issues_found.append(f"Rico field corruption ({corruption_results['rico_corrupted']} fields)")
|
|
|
|
if corruption_results['time_corrupted'] > 0:
|
|
issues_found.append(f"Time field corruption ({corruption_results['time_corrupted']} fields)")
|
|
|
|
if issues_found:
|
|
print("\n❌ ISSUES FOUND:")
|
|
for i, issue in enumerate(issues_found, 1):
|
|
print(f" {i}. {issue}")
|
|
print("\n⚠️ Recommendations:")
|
|
if missing_in_regex:
|
|
print(" - Update extract_q_numbers_from_yaml() to use comprehensive regex")
|
|
print(" - Or switch to YAML parsing method for complete extraction")
|
|
if corruption_results['rico_corrupted'] > 0 or corruption_results['time_corrupted'] > 0:
|
|
print(" - Review enrich_hyponyms_with_wikidata.py for field preservation")
|
|
print(" - Ensure nested structures are preserved during enrichment")
|
|
else:
|
|
print("\n✅ NO ISSUES FOUND - All tests passed!")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|