glam/scripts/validate_csv_to_yaml_conversion.py
2025-11-19 23:25:22 +01:00

279 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""
Validate the CSV to YAML conversion using LinkML schemas.
This script validates that:
1. The source CSV structure matches the CSV schema
2. The target YAML structure matches the YAML schema
3. All data from CSV is present in YAML
4. Field mappings are correct
"""
import csv
import yaml
import sys
from pathlib import Path
from typing import Dict, List, Set
from collections import defaultdict
def clean_field_name_for_yaml(name: str) -> str:
"""
Clean field name using the same logic as the conversion script.
"""
name = name.strip()
name = name.replace('\r\n', '_')
name = name.replace('\n', '_')
name = name.replace('\r', '_')
name = name.replace(' ', '_')
name = name.replace('/', '_')
name = name.replace('(', '')
name = name.replace(')', '')
name = name.replace(',', '')
name = name.replace('"', '')
while '__' in name:
name = name.replace('__', '_')
name = name.strip('_')
name = name.lower()
if not name:
name = 'unnamed_field'
return name
def load_csv_data(csv_path: Path):
"""Load CSV and return headers and records."""
with open(csv_path, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.DictReader(f)
headers = list(reader.fieldnames) if reader.fieldnames else []
records = list(reader)
return headers, records
def load_yaml_data(yaml_path: Path) -> List[Dict[str, str]]:
"""Load YAML data."""
with open(yaml_path, 'r', encoding='utf-8') as f:
records = yaml.safe_load(f)
return records
def validate_field_mapping(csv_headers: List[str], yaml_fields: Set[str]):
"""
Validate that CSV headers map correctly to YAML fields.
Returns dict with validation results:
- 'valid_mappings': list of correctly mapped fields
- 'missing_mappings': list of CSV fields not found in YAML
- 'unexpected_yaml': list of YAML fields not mapped from CSV
"""
results = {
'valid_mappings': [],
'missing_mappings': [],
'unexpected_yaml': []
}
# Create expected mapping
expected_yaml_fields = set()
for csv_field in csv_headers:
yaml_field = clean_field_name_for_yaml(csv_field)
expected_yaml_fields.add(yaml_field)
if yaml_field in yaml_fields:
results['valid_mappings'].append(f"{csv_field}{yaml_field}")
else:
results['missing_mappings'].append(f"{csv_field}{yaml_field} (MISSING)")
# Check for unexpected YAML fields
for yaml_field in yaml_fields:
if yaml_field not in expected_yaml_fields:
results['unexpected_yaml'].append(yaml_field)
return results
def validate_data_preservation(csv_records: List[Dict[str, str]],
yaml_records: List[Dict[str, str]]):
"""
Validate that all non-empty CSV data is preserved in YAML.
Returns dict with validation results.
"""
results = {
'total_records': len(csv_records),
'yaml_records': len(yaml_records),
'records_match': len(csv_records) == len(yaml_records),
'missing_data': [],
'value_mismatches': [],
'csv_non_empty_cells': 0,
'yaml_total_fields': 0
}
# Count cells
for record in csv_records:
for value in record.values():
if value and value.strip():
results['csv_non_empty_cells'] += 1
for record in yaml_records:
results['yaml_total_fields'] += len(record)
# Detailed record-by-record validation
for idx, (csv_rec, yaml_rec) in enumerate(zip(csv_records, yaml_records)):
for csv_field, csv_value in csv_rec.items():
csv_value_clean = csv_value.strip() if csv_value else ""
if csv_value_clean:
yaml_field = clean_field_name_for_yaml(csv_field)
if yaml_field not in yaml_rec:
results['missing_data'].append({
'record': idx + 2, # +2 for header and 0-indexing
'csv_field': csv_field,
'yaml_field': yaml_field,
'value': csv_value_clean[:50]
})
else:
yaml_value = str(yaml_rec[yaml_field])
if csv_value_clean != yaml_value:
results['value_mismatches'].append({
'record': idx + 2,
'field': csv_field,
'csv_value': csv_value_clean[:50],
'yaml_value': yaml_value[:50]
})
return results
def main():
"""Main validation routine."""
# Paths
csv_path = Path("data/nde/voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.csv")
yaml_path = Path("data/nde/voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.yaml")
print("=" * 80)
print("LINKML-BASED CSV TO YAML CONVERSION VALIDATION")
print("=" * 80)
print()
# Load data
print("Loading data...")
csv_headers, csv_records = load_csv_data(csv_path)
yaml_records = load_yaml_data(yaml_path)
# Get all YAML fields
yaml_fields = set()
for record in yaml_records:
yaml_fields.update(record.keys())
print(f" CSV: {len(csv_records)} records, {len(csv_headers)} columns")
print(f" YAML: {len(yaml_records)} records, {len(yaml_fields)} unique fields")
print()
# Validate field mapping
print("=" * 80)
print("1. FIELD MAPPING VALIDATION")
print("=" * 80)
mapping_results = validate_field_mapping(csv_headers, yaml_fields)
print(f"\n✓ Valid mappings: {len(mapping_results['valid_mappings'])}")
if len(mapping_results['valid_mappings']) <= 10:
for mapping in mapping_results['valid_mappings']:
print(f" {mapping}")
else:
print(f" (Showing first 5 of {len(mapping_results['valid_mappings'])})")
for mapping in mapping_results['valid_mappings'][:5]:
print(f" {mapping}")
if mapping_results['missing_mappings']:
print(f"\n✗ Missing mappings: {len(mapping_results['missing_mappings'])}")
for mapping in mapping_results['missing_mappings'][:10]:
print(f" {mapping}")
else:
print(f"\n✓ No missing mappings")
if mapping_results['unexpected_yaml']:
print(f"\n⚠ Unexpected YAML fields: {len(mapping_results['unexpected_yaml'])}")
for field in mapping_results['unexpected_yaml'][:10]:
print(f" {field}")
else:
print(f"\n✓ No unexpected YAML fields")
# Validate data preservation
print("\n" + "=" * 80)
print("2. DATA PRESERVATION VALIDATION")
print("=" * 80)
data_results = validate_data_preservation(csv_records, yaml_records)
print(f"\nRecord count:")
print(f" CSV records: {data_results['total_records']}")
print(f" YAML records: {data_results['yaml_records']}")
print(f" Match: {'✓ YES' if data_results['records_match'] else '✗ NO'}")
print(f"\nCell/field count:")
print(f" CSV non-empty cells: {data_results['csv_non_empty_cells']}")
print(f" YAML total fields: {data_results['yaml_total_fields']}")
print(f" Match: {'✓ YES' if data_results['csv_non_empty_cells'] == data_results['yaml_total_fields'] else '✗ NO'}")
if data_results['missing_data']:
print(f"\n✗ Missing data: {len(data_results['missing_data'])} instances")
for item in data_results['missing_data'][:5]:
print(f" Record {item['record']}: {item['csv_field']}{item['yaml_field']}")
else:
print(f"\n✓ No missing data")
if data_results['value_mismatches']:
print(f"\n✗ Value mismatches: {len(data_results['value_mismatches'])} instances")
for item in data_results['value_mismatches'][:5]:
print(f" Record {item['record']}, Field '{item['field']}'")
print(f" CSV: {item['csv_value']}")
print(f" YAML: {item['yaml_value']}")
else:
print(f"\n✓ No value mismatches")
# Final verdict
print("\n" + "=" * 80)
print("FINAL VALIDATION VERDICT")
print("=" * 80)
print()
all_valid = (
data_results['records_match'] and
data_results['csv_non_empty_cells'] == data_results['yaml_total_fields'] and
len(data_results['missing_data']) == 0 and
len(data_results['value_mismatches']) == 0 and
len(mapping_results['missing_mappings']) == 0
)
if all_valid:
print(" ✓✓✓ VALIDATION PASSED ✓✓✓")
print()
print(" All validation checks passed:")
print(f"{len(mapping_results['valid_mappings'])} field mappings correct")
print(f"{data_results['total_records']} records preserved")
print(f"{data_results['csv_non_empty_cells']} data cells preserved")
print(" ✓ No missing data")
print(" ✓ No value mismatches")
print()
print(" The CSV to YAML conversion is VERIFIED as complete and correct.")
return 0
else:
print(" ✗✗✗ VALIDATION FAILED ✗✗✗")
print()
print(" Issues found:")
if not data_results['records_match']:
print(" ✗ Record count mismatch")
if data_results['csv_non_empty_cells'] != data_results['yaml_total_fields']:
print(" ✗ Cell count mismatch")
if data_results['missing_data']:
print(f"{len(data_results['missing_data'])} missing data instances")
if data_results['value_mismatches']:
print(f"{len(data_results['value_mismatches'])} value mismatches")
if mapping_results['missing_mappings']:
print(f"{len(mapping_results['missing_mappings'])} missing field mappings")
return 1
if __name__ == "__main__":
sys.exit(main())