glam/scripts/validate_organization_names.py
2025-12-03 17:38:46 +01:00

395 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Validate organization names against ORGANISATION entity annotation rules.
Based on: docs/convention/schema/20251112/entity_annotation_rules_ontology_enriched.yaml
Entity type: ORGANISATION (ORG)
- Organizations including companies, institutions, governments, branches,
associations, legislative bodies, political parties, military forces,
sports teams, meetings, bands, religious orders, and ships.
Key rules:
- ORG_INC003: Tag the place as part of the organisation when the
organisation is defined by its location
This script analyzes custodian_name values to:
1. Identify embedded place names (per ORG_INC003)
2. Detect legal form prefixes that should be in custodian_legal_name
3. Validate name patterns against Dutch heritage institution conventions
4. Generate statistics and flag potential issues
Author: GLAM Project
Date: 2025-12-02
"""
import os
import re
import yaml
import json
import argparse
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Optional
# Dutch legal form prefixes (should be in custodian_legal_name, not custodian_name)
LEGAL_FORMS = [
'Stichting',
'Vereniging',
'Coöperatie', 'Cooperatie',
'B.V.', 'BV', 'B.V',
'N.V.', 'NV', 'N.V',
'V.O.F.', 'VOF',
'Maatschap',
'Eenmanszaak',
'Commanditaire Vennootschap', 'C.V.', 'CV',
'Besloten Vennootschap',
'Naamloze Vennootschap',
]
# Dutch place indicators (per ORG_INC003)
PLACE_INDICATORS = [
' te ', ' in ', ' van ', ' voor ',
' bij ', ' aan de ', ' aan het ',
]
# Common Dutch heritage institution type keywords
INSTITUTION_TYPES = [
'Museum', 'Musea',
'Archief', 'Archieven',
'Bibliotheek', 'Bibliotheken',
'Galerie', 'Galerij',
'Gemeentearchief',
'Rijksarchief', 'Regionaal Archief',
'Streekarchief',
'Stadsarchief',
'Heemkundekring', 'Heemkundige Kring',
'Historische Vereniging', 'Historisch Genootschap',
'Oudheidkamer', 'Oudheidkundige',
'Documentatiecentrum',
'Kenniscentrum',
'Erfgoedcentrum',
'Herinneringscentrum',
'Bezoekerscentrum',
'Informatiecentrum',
]
# Dutch province/region names (for place detection)
DUTCH_PROVINCES = [
'Drenthe', 'Flevoland', 'Friesland', 'Fryslân',
'Gelderland', 'Groningen', 'Limburg',
'Noord-Brabant', 'Noord-Holland',
'Overijssel', 'Utrecht', 'Zeeland', 'Zuid-Holland',
]
# Common Dutch city names (sample for detection)
DUTCH_CITIES = [
'Amsterdam', 'Rotterdam', 'Den Haag', "'s-Gravenhage",
'Utrecht', 'Eindhoven', 'Tilburg', 'Groningen',
'Almere', 'Breda', 'Nijmegen', 'Enschede',
'Haarlem', 'Arnhem', 'Zaanstad', 'Amersfoort',
'Apeldoorn', 'Hoofddorp', 'Maastricht', 'Leiden',
'Dordrecht', 'Zoetermeer', 'Zwolle', 'Deventer',
'Delft', 'Alkmaar', 'Heerlen', 'Venlo',
'Leeuwarden', 'Assen', 'Emmen', 'Hoogeveen',
'Meppel', 'Coevorden', 'Borger', 'Odoorn',
]
class OrganizationNameValidator:
"""Validate organization names against ORGANISATION entity rules."""
def __init__(self, entries_dir: str):
self.entries_dir = Path(entries_dir)
self.results = {
'total_entries': 0,
'entries_with_custodian_name': 0,
'entries_missing_custodian_name': 0,
'legal_form_in_name': [],
'place_in_name': [],
'institution_type_detected': defaultdict(list),
'name_length_stats': {'min': 999, 'max': 0, 'total': 0},
'issues': [],
'validated_at': datetime.now().isoformat(),
}
def detect_legal_form(self, name: str) -> Optional[str]:
"""Detect legal form prefix in name."""
name_lower = name.lower()
for form in LEGAL_FORMS:
if name_lower.startswith(form.lower()):
return form
# Also check with space after
if name_lower.startswith(form.lower() + ' '):
return form
return None
def detect_place_indicator(self, name: str) -> Optional[tuple]:
"""Detect place indicators (te, in, van, etc.) in name."""
name_lower = ' ' + name.lower() + ' '
for indicator in PLACE_INDICATORS:
if indicator in name_lower:
# Find what comes after the indicator
idx = name_lower.find(indicator)
after = name[idx + len(indicator) - 1:].strip()
return (indicator.strip(), after[:30])
return None
def detect_embedded_place(self, name: str) -> list:
"""Detect embedded place names (cities, provinces)."""
found = []
name_parts = name.split()
for place in DUTCH_PROVINCES + DUTCH_CITIES:
if place in name_parts or place in name:
found.append(place)
return found
def detect_institution_type(self, name: str) -> list:
"""Detect institution type keywords."""
found = []
name_lower = name.lower()
for inst_type in INSTITUTION_TYPES:
if inst_type.lower() in name_lower:
found.append(inst_type)
return found
def validate_entry(self, entry: dict, filename: str) -> dict:
"""Validate a single entry's organization name."""
result = {
'filename': filename,
'issues': [],
'info': {},
}
# Get custodian_name
cn = entry.get('custodian_name', {})
name = cn.get('claim_value', '')
if not name:
result['issues'].append('MISSING_CUSTODIAN_NAME')
# Try fallback
org = entry.get('original_entry', {}).get('organisatie', '')
if org:
result['info']['fallback_original'] = org
return result
result['info']['custodian_name'] = name
result['info']['source'] = cn.get('source', 'unknown')
result['info']['confidence'] = cn.get('confidence', 0)
# Update length stats
name_len = len(name)
self.results['name_length_stats']['total'] += name_len
if name_len < self.results['name_length_stats']['min']:
self.results['name_length_stats']['min'] = name_len
if name_len > self.results['name_length_stats']['max']:
self.results['name_length_stats']['max'] = name_len
# Check for legal form prefix
legal_form = self.detect_legal_form(name)
if legal_form:
result['issues'].append(f'LEGAL_FORM_IN_NAME: {legal_form}')
self.results['legal_form_in_name'].append({
'filename': filename,
'name': name,
'legal_form': legal_form,
})
# Check for place indicators (per ORG_INC003)
place_indicator = self.detect_place_indicator(name)
if place_indicator:
result['info']['place_indicator'] = place_indicator
# Check for embedded places
places = self.detect_embedded_place(name)
if places:
result['info']['embedded_places'] = places
# This is informational, not an issue (per ORG_INC003)
self.results['place_in_name'].append({
'filename': filename,
'name': name,
'places': places,
})
# Detect institution type
inst_types = self.detect_institution_type(name)
if inst_types:
result['info']['institution_types'] = inst_types
for t in inst_types:
self.results['institution_type_detected'][t].append(filename)
# Check for unusual patterns
if len(name) < 3:
result['issues'].append('NAME_TOO_SHORT')
if len(name) > 100:
result['issues'].append('NAME_TOO_LONG')
if name != name.strip():
result['issues'].append('WHITESPACE_PADDING')
if ' ' in name:
result['issues'].append('DOUBLE_SPACES')
return result
def validate_all(self, limit: Optional[int] = None) -> dict:
"""Validate all entries in the directory."""
files = sorted(self.entries_dir.glob('*.yaml'))
if limit:
files = files[:limit]
entry_results = []
for filepath in files:
self.results['total_entries'] += 1
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if entry.get('custodian_name', {}).get('claim_value'):
self.results['entries_with_custodian_name'] += 1
else:
self.results['entries_missing_custodian_name'] += 1
result = self.validate_entry(entry, filepath.name)
if result['issues']:
self.results['issues'].append(result)
entry_results.append(result)
# Calculate averages
if self.results['entries_with_custodian_name'] > 0:
self.results['name_length_stats']['avg'] = round(
self.results['name_length_stats']['total'] /
self.results['entries_with_custodian_name'], 1
)
# Convert defaultdict to dict for JSON serialization
self.results['institution_type_detected'] = dict(self.results['institution_type_detected'])
# Add counts
for inst_type, files in self.results['institution_type_detected'].items():
self.results['institution_type_detected'][inst_type] = {
'count': len(files),
'sample': files[:5], # First 5 examples
}
return self.results
def print_summary(self):
"""Print validation summary."""
r = self.results
print("\n" + "=" * 70)
print("ORGANIZATION NAME VALIDATION REPORT")
print("=" * 70)
print(f"Validated at: {r['validated_at']}")
print(f"Entries directory: {self.entries_dir}")
print()
print("COVERAGE:")
print(f" Total entries: {r['total_entries']}")
print(f" With custodian_name: {r['entries_with_custodian_name']} ({100*r['entries_with_custodian_name']/max(1,r['total_entries']):.1f}%)")
print(f" Missing custodian_name: {r['entries_missing_custodian_name']}")
print()
print("NAME LENGTH STATISTICS:")
print(f" Min: {r['name_length_stats']['min']} chars")
print(f" Max: {r['name_length_stats']['max']} chars")
print(f" Avg: {r['name_length_stats'].get('avg', 'N/A')} chars")
print()
print("INSTITUTION TYPES DETECTED:")
for inst_type, data in sorted(r['institution_type_detected'].items(),
key=lambda x: -x[1]['count']):
print(f" {inst_type}: {data['count']}")
print()
print("ISSUES FOUND:")
if r['legal_form_in_name']:
print(f" Legal form in name: {len(r['legal_form_in_name'])}")
for item in r['legal_form_in_name'][:5]:
print(f" - {item['filename']}: '{item['name']}' ({item['legal_form']})")
if len(r['legal_form_in_name']) > 5:
print(f" ... and {len(r['legal_form_in_name']) - 5} more")
print()
print("PLACE NAMES IN ORGANIZATION NAMES (per ORG_INC003):")
print(f" Total with embedded places: {len(r['place_in_name'])}")
if r['place_in_name']:
print(" Sample:")
for item in r['place_in_name'][:10]:
print(f" - {item['name'][:50]} -> places: {item['places']}")
print()
other_issues = [i for i in r['issues'] if not any(
'LEGAL_FORM' in issue for issue in i['issues']
)]
if other_issues:
print("OTHER ISSUES:")
for item in other_issues[:10]:
print(f" - {item['filename']}: {item['issues']}")
print()
print("=" * 70)
def main():
parser = argparse.ArgumentParser(
description='Validate organization names against ORGANISATION entity rules'
)
parser.add_argument(
'--entries-dir',
default='data/nde/enriched/entries',
help='Directory containing entry YAML files'
)
parser.add_argument(
'--limit',
type=int,
default=None,
help='Limit number of entries to process (for testing)'
)
parser.add_argument(
'--output',
default='reports/organization_name_validation.json',
help='Output file for JSON report'
)
parser.add_argument(
'--verbose',
'-v',
action='store_true',
help='Verbose output'
)
args = parser.parse_args()
# Resolve paths
base_dir = Path(__file__).parent.parent
entries_dir = base_dir / args.entries_dir
output_path = base_dir / args.output
if not entries_dir.exists():
print(f"ERROR: Entries directory not found: {entries_dir}")
return 1
print(f"Validating organization names in: {entries_dir}")
validator = OrganizationNameValidator(entries_dir)
results = validator.validate_all(limit=args.limit)
# Print summary
validator.print_summary()
# Save JSON report
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"JSON report saved to: {output_path}")
return 0
if __name__ == '__main__':
exit(main())