- batch_crawl4ai_recrawl.py: Retry failed URL crawls - batch_firecrawl_recrawl.py: FireCrawl batch processing - batch_httpx_scrape.py: HTTPX-based scraping - detect_name_mismatch.py: Find name mismatches in data - enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment - fix_collision_victims.py: GHCID collision resolution - fix_generic_platform_names*.py: Platform name cleanup - fix_ghcid_type.py: GHCID type corrections - fix_simon_kemper_contamination.py: Data cleanup - scan_dutch_data_quality.py: Data quality scanning - transform_crawl4ai_to_digital_platform.py: Data transformation
199 lines
6.6 KiB
Python
199 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Fast data quality scan - optimized for speed."""
|
|
|
|
import os
|
|
import re
|
|
import yaml
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
# Use C loader for speed
|
|
try:
|
|
from yaml import CSafeLoader as SafeLoader
|
|
except ImportError:
|
|
from yaml import SafeLoader
|
|
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
|
|
def extract_ghcid_type(filename):
|
|
match = re.match(r'NL-[A-Z]{2}-[A-Z]{3}-([A-Z])-', filename)
|
|
return match.group(1) if match else None
|
|
|
|
def scan_file_fast(filepath):
|
|
"""Fast scan using string operations where possible."""
|
|
filename = filepath.name
|
|
issues = []
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
return ['parse_error']
|
|
|
|
# Quick string-based checks first
|
|
|
|
# Absolute paths
|
|
if '/Volumes/KINGSTON/' in content or '/Users/kempersc/' in content:
|
|
issues.append('absolute_paths')
|
|
|
|
# No URL
|
|
if '\nurl:' not in content and 'url: ' not in content[:500]:
|
|
issues.append('no_url')
|
|
|
|
# HTTP instead of HTTPS
|
|
if 'url: http://' in content:
|
|
issues.append('http_not_https')
|
|
|
|
# No digital_platforms
|
|
if 'digital_platforms:' not in content:
|
|
issues.append('no_digital_platforms')
|
|
elif 'digital_platforms: []\n' in content or 'digital_platforms:\n-' not in content:
|
|
issues.append('empty_digital_platforms')
|
|
|
|
# No verified_claims
|
|
if 'verified_claims:' not in content:
|
|
issues.append('no_verified_claims')
|
|
|
|
# Wikidata NOT_FOUND
|
|
if "status: NOT_FOUND" in content:
|
|
issues.append('wikidata_not_found')
|
|
elif 'wikidata_enrichment:' not in content:
|
|
issues.append('no_wikidata_enrichment')
|
|
|
|
# Unknown type in filename
|
|
ghcid_type = extract_ghcid_type(filename)
|
|
if ghcid_type == 'U':
|
|
issues.append('unknown_type_U')
|
|
|
|
# Parse YAML only for complex checks
|
|
try:
|
|
data = yaml.load(content, Loader=SafeLoader)
|
|
except:
|
|
issues.append('yaml_parse_error')
|
|
return issues
|
|
|
|
if not data:
|
|
issues.append('empty_file')
|
|
return issues
|
|
|
|
# Check GHCID type mismatch
|
|
if 'original_entry' in data:
|
|
oe = data['original_entry']
|
|
expected = None
|
|
if 'type' in oe and oe['type'] and isinstance(oe['type'], list):
|
|
expected = oe['type'][0]
|
|
elif 'type_organisatie' in oe and oe['type_organisatie']:
|
|
type_map = {'archive': 'A', 'archief': 'A', 'library': 'L',
|
|
'bibliotheek': 'L', 'museum': 'M', 'gallery': 'G'}
|
|
expected = type_map.get(oe['type_organisatie'].lower())
|
|
|
|
if expected and ghcid_type and ghcid_type != expected:
|
|
issues.append(f'wrong_type:{ghcid_type}→{expected}')
|
|
|
|
# Check Google Maps mismatch
|
|
if 'google_maps_enrichment' in data and 'original_entry' in data:
|
|
gm_name = data['google_maps_enrichment'].get('name', '').lower()
|
|
org_name = data['original_entry'].get('organisatie', '').lower()
|
|
|
|
if gm_name and org_name:
|
|
gm_words = set(gm_name.split()) - {'de', 'het', 'van', 'en', 'stichting'}
|
|
org_words = set(org_name.split()) - {'de', 'het', 'van', 'en', 'stichting'}
|
|
|
|
if gm_words and org_words:
|
|
overlap = len(gm_words & org_words)
|
|
similarity = overlap / max(len(gm_words), len(org_words))
|
|
if similarity < 0.25:
|
|
issues.append('google_maps_mismatch')
|
|
|
|
# Check coordinates
|
|
if 'location' in data:
|
|
loc = data['location']
|
|
lat = loc.get('latitude')
|
|
lon = loc.get('longitude')
|
|
if lat is None or lon is None:
|
|
issues.append('missing_coordinates')
|
|
elif not (50.5 < lat < 53.7 and 3.3 < lon < 7.3):
|
|
issues.append('coords_outside_NL')
|
|
else:
|
|
issues.append('no_location')
|
|
|
|
return issues
|
|
|
|
def main():
|
|
print(f"Fast scan started: {datetime.now().isoformat()}")
|
|
|
|
files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml"))
|
|
total = len(files)
|
|
|
|
print(f"Scanning {total} Dutch custodian files...")
|
|
|
|
issue_counts = defaultdict(int)
|
|
files_with_issues = defaultdict(list)
|
|
|
|
for i, fp in enumerate(files):
|
|
issues = scan_file_fast(fp)
|
|
for issue in issues:
|
|
issue_counts[issue] += 1
|
|
files_with_issues[issue].append(fp.name)
|
|
|
|
print(f"\nScan complete: {datetime.now().isoformat()}")
|
|
print("\n" + "=" * 80)
|
|
print("DATA QUALITY SUMMARY REPORT")
|
|
print("=" * 80)
|
|
print(f"\nTotal files: {total}")
|
|
|
|
# Count files with any issue
|
|
all_issue_files = set()
|
|
for files_list in files_with_issues.values():
|
|
all_issue_files.update(files_list)
|
|
|
|
print(f"Files with issues: {len(all_issue_files)} ({100*len(all_issue_files)/total:.1f}%)")
|
|
print(f"Clean files: {total - len(all_issue_files)}")
|
|
|
|
print("\n" + "-" * 80)
|
|
print("ISSUE BREAKDOWN")
|
|
print("-" * 80)
|
|
|
|
# Sort by count
|
|
for issue, count in sorted(issue_counts.items(), key=lambda x: -x[1]):
|
|
pct = 100 * count / total
|
|
bar = "█" * int(pct / 2)
|
|
print(f"{issue:35} {count:5} ({pct:5.1f}%) {bar}")
|
|
|
|
# Critical issues detail
|
|
print("\n" + "=" * 80)
|
|
print("CRITICAL ISSUES (require manual fix)")
|
|
print("=" * 80)
|
|
|
|
critical_issues = ['wrong_type:', 'google_maps_mismatch', 'absolute_paths', 'unknown_type_U']
|
|
|
|
for critical in critical_issues:
|
|
matching = [(k, v) for k, v in files_with_issues.items() if critical in k or k == critical]
|
|
if matching:
|
|
for issue_key, file_list in matching:
|
|
print(f"\n{issue_key} ({len(file_list)} files):")
|
|
for f in file_list[:15]:
|
|
print(f" - {f}")
|
|
if len(file_list) > 15:
|
|
print(f" ... and {len(file_list) - 15} more")
|
|
|
|
# Save report
|
|
report_path = CUSTODIAN_DIR.parent / 'reports' / 'dutch_data_quality_fast.yaml'
|
|
report_path.parent.mkdir(exist_ok=True)
|
|
|
|
report = {
|
|
'scan_timestamp': datetime.now().isoformat(),
|
|
'total_files': total,
|
|
'files_with_issues': len(all_issue_files),
|
|
'issue_counts': dict(sorted(issue_counts.items(), key=lambda x: -x[1])),
|
|
'files_by_issue': {k: v for k, v in files_with_issues.items()}
|
|
}
|
|
|
|
with open(report_path, 'w') as f:
|
|
yaml.dump(report, f, default_flow_style=False, allow_unicode=True)
|
|
|
|
print(f"\n\nFull report saved: {report_path}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|