395 lines
14 KiB
Python
395 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate organization names against ORGANISATION entity annotation rules.
|
|
|
|
Based on: docs/convention/schema/20251112/entity_annotation_rules_ontology_enriched.yaml
|
|
|
|
Entity type: ORGANISATION (ORG)
|
|
- Organizations including companies, institutions, governments, branches,
|
|
associations, legislative bodies, political parties, military forces,
|
|
sports teams, meetings, bands, religious orders, and ships.
|
|
|
|
Key rules:
|
|
- ORG_INC003: Tag the place as part of the organisation when the
|
|
organisation is defined by its location
|
|
|
|
This script analyzes custodian_name values to:
|
|
1. Identify embedded place names (per ORG_INC003)
|
|
2. Detect legal form prefixes that should be in custodian_legal_name
|
|
3. Validate name patterns against Dutch heritage institution conventions
|
|
4. Generate statistics and flag potential issues
|
|
|
|
Author: GLAM Project
|
|
Date: 2025-12-02
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import yaml
|
|
import json
|
|
import argparse
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Dutch legal form prefixes (should be in custodian_legal_name, not custodian_name)
|
|
LEGAL_FORMS = [
|
|
'Stichting',
|
|
'Vereniging',
|
|
'Coöperatie', 'Cooperatie',
|
|
'B.V.', 'BV', 'B.V',
|
|
'N.V.', 'NV', 'N.V',
|
|
'V.O.F.', 'VOF',
|
|
'Maatschap',
|
|
'Eenmanszaak',
|
|
'Commanditaire Vennootschap', 'C.V.', 'CV',
|
|
'Besloten Vennootschap',
|
|
'Naamloze Vennootschap',
|
|
]
|
|
|
|
# Dutch place indicators (per ORG_INC003)
|
|
PLACE_INDICATORS = [
|
|
' te ', ' in ', ' van ', ' voor ',
|
|
' bij ', ' aan de ', ' aan het ',
|
|
]
|
|
|
|
# Common Dutch heritage institution type keywords
|
|
INSTITUTION_TYPES = [
|
|
'Museum', 'Musea',
|
|
'Archief', 'Archieven',
|
|
'Bibliotheek', 'Bibliotheken',
|
|
'Galerie', 'Galerij',
|
|
'Gemeentearchief',
|
|
'Rijksarchief', 'Regionaal Archief',
|
|
'Streekarchief',
|
|
'Stadsarchief',
|
|
'Heemkundekring', 'Heemkundige Kring',
|
|
'Historische Vereniging', 'Historisch Genootschap',
|
|
'Oudheidkamer', 'Oudheidkundige',
|
|
'Documentatiecentrum',
|
|
'Kenniscentrum',
|
|
'Erfgoedcentrum',
|
|
'Herinneringscentrum',
|
|
'Bezoekerscentrum',
|
|
'Informatiecentrum',
|
|
]
|
|
|
|
# Dutch province/region names (for place detection)
|
|
DUTCH_PROVINCES = [
|
|
'Drenthe', 'Flevoland', 'Friesland', 'Fryslân',
|
|
'Gelderland', 'Groningen', 'Limburg',
|
|
'Noord-Brabant', 'Noord-Holland',
|
|
'Overijssel', 'Utrecht', 'Zeeland', 'Zuid-Holland',
|
|
]
|
|
|
|
# Common Dutch city names (sample for detection)
|
|
DUTCH_CITIES = [
|
|
'Amsterdam', 'Rotterdam', 'Den Haag', "'s-Gravenhage",
|
|
'Utrecht', 'Eindhoven', 'Tilburg', 'Groningen',
|
|
'Almere', 'Breda', 'Nijmegen', 'Enschede',
|
|
'Haarlem', 'Arnhem', 'Zaanstad', 'Amersfoort',
|
|
'Apeldoorn', 'Hoofddorp', 'Maastricht', 'Leiden',
|
|
'Dordrecht', 'Zoetermeer', 'Zwolle', 'Deventer',
|
|
'Delft', 'Alkmaar', 'Heerlen', 'Venlo',
|
|
'Leeuwarden', 'Assen', 'Emmen', 'Hoogeveen',
|
|
'Meppel', 'Coevorden', 'Borger', 'Odoorn',
|
|
]
|
|
|
|
|
|
class OrganizationNameValidator:
|
|
"""Validate organization names against ORGANISATION entity rules."""
|
|
|
|
def __init__(self, entries_dir: str):
|
|
self.entries_dir = Path(entries_dir)
|
|
self.results = {
|
|
'total_entries': 0,
|
|
'entries_with_custodian_name': 0,
|
|
'entries_missing_custodian_name': 0,
|
|
'legal_form_in_name': [],
|
|
'place_in_name': [],
|
|
'institution_type_detected': defaultdict(list),
|
|
'name_length_stats': {'min': 999, 'max': 0, 'total': 0},
|
|
'issues': [],
|
|
'validated_at': datetime.now().isoformat(),
|
|
}
|
|
|
|
def detect_legal_form(self, name: str) -> Optional[str]:
|
|
"""Detect legal form prefix in name."""
|
|
name_lower = name.lower()
|
|
for form in LEGAL_FORMS:
|
|
if name_lower.startswith(form.lower()):
|
|
return form
|
|
# Also check with space after
|
|
if name_lower.startswith(form.lower() + ' '):
|
|
return form
|
|
return None
|
|
|
|
def detect_place_indicator(self, name: str) -> Optional[tuple]:
|
|
"""Detect place indicators (te, in, van, etc.) in name."""
|
|
name_lower = ' ' + name.lower() + ' '
|
|
for indicator in PLACE_INDICATORS:
|
|
if indicator in name_lower:
|
|
# Find what comes after the indicator
|
|
idx = name_lower.find(indicator)
|
|
after = name[idx + len(indicator) - 1:].strip()
|
|
return (indicator.strip(), after[:30])
|
|
return None
|
|
|
|
def detect_embedded_place(self, name: str) -> list:
|
|
"""Detect embedded place names (cities, provinces)."""
|
|
found = []
|
|
name_parts = name.split()
|
|
|
|
for place in DUTCH_PROVINCES + DUTCH_CITIES:
|
|
if place in name_parts or place in name:
|
|
found.append(place)
|
|
|
|
return found
|
|
|
|
def detect_institution_type(self, name: str) -> list:
|
|
"""Detect institution type keywords."""
|
|
found = []
|
|
name_lower = name.lower()
|
|
|
|
for inst_type in INSTITUTION_TYPES:
|
|
if inst_type.lower() in name_lower:
|
|
found.append(inst_type)
|
|
|
|
return found
|
|
|
|
def validate_entry(self, entry: dict, filename: str) -> dict:
|
|
"""Validate a single entry's organization name."""
|
|
result = {
|
|
'filename': filename,
|
|
'issues': [],
|
|
'info': {},
|
|
}
|
|
|
|
# Get custodian_name
|
|
cn = entry.get('custodian_name', {})
|
|
name = cn.get('claim_value', '')
|
|
|
|
if not name:
|
|
result['issues'].append('MISSING_CUSTODIAN_NAME')
|
|
# Try fallback
|
|
org = entry.get('original_entry', {}).get('organisatie', '')
|
|
if org:
|
|
result['info']['fallback_original'] = org
|
|
return result
|
|
|
|
result['info']['custodian_name'] = name
|
|
result['info']['source'] = cn.get('source', 'unknown')
|
|
result['info']['confidence'] = cn.get('confidence', 0)
|
|
|
|
# Update length stats
|
|
name_len = len(name)
|
|
self.results['name_length_stats']['total'] += name_len
|
|
if name_len < self.results['name_length_stats']['min']:
|
|
self.results['name_length_stats']['min'] = name_len
|
|
if name_len > self.results['name_length_stats']['max']:
|
|
self.results['name_length_stats']['max'] = name_len
|
|
|
|
# Check for legal form prefix
|
|
legal_form = self.detect_legal_form(name)
|
|
if legal_form:
|
|
result['issues'].append(f'LEGAL_FORM_IN_NAME: {legal_form}')
|
|
self.results['legal_form_in_name'].append({
|
|
'filename': filename,
|
|
'name': name,
|
|
'legal_form': legal_form,
|
|
})
|
|
|
|
# Check for place indicators (per ORG_INC003)
|
|
place_indicator = self.detect_place_indicator(name)
|
|
if place_indicator:
|
|
result['info']['place_indicator'] = place_indicator
|
|
|
|
# Check for embedded places
|
|
places = self.detect_embedded_place(name)
|
|
if places:
|
|
result['info']['embedded_places'] = places
|
|
# This is informational, not an issue (per ORG_INC003)
|
|
self.results['place_in_name'].append({
|
|
'filename': filename,
|
|
'name': name,
|
|
'places': places,
|
|
})
|
|
|
|
# Detect institution type
|
|
inst_types = self.detect_institution_type(name)
|
|
if inst_types:
|
|
result['info']['institution_types'] = inst_types
|
|
for t in inst_types:
|
|
self.results['institution_type_detected'][t].append(filename)
|
|
|
|
# Check for unusual patterns
|
|
if len(name) < 3:
|
|
result['issues'].append('NAME_TOO_SHORT')
|
|
if len(name) > 100:
|
|
result['issues'].append('NAME_TOO_LONG')
|
|
if name != name.strip():
|
|
result['issues'].append('WHITESPACE_PADDING')
|
|
if ' ' in name:
|
|
result['issues'].append('DOUBLE_SPACES')
|
|
|
|
return result
|
|
|
|
def validate_all(self, limit: Optional[int] = None) -> dict:
|
|
"""Validate all entries in the directory."""
|
|
files = sorted(self.entries_dir.glob('*.yaml'))
|
|
if limit:
|
|
files = files[:limit]
|
|
|
|
entry_results = []
|
|
|
|
for filepath in files:
|
|
self.results['total_entries'] += 1
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
if entry.get('custodian_name', {}).get('claim_value'):
|
|
self.results['entries_with_custodian_name'] += 1
|
|
else:
|
|
self.results['entries_missing_custodian_name'] += 1
|
|
|
|
result = self.validate_entry(entry, filepath.name)
|
|
if result['issues']:
|
|
self.results['issues'].append(result)
|
|
entry_results.append(result)
|
|
|
|
# Calculate averages
|
|
if self.results['entries_with_custodian_name'] > 0:
|
|
self.results['name_length_stats']['avg'] = round(
|
|
self.results['name_length_stats']['total'] /
|
|
self.results['entries_with_custodian_name'], 1
|
|
)
|
|
|
|
# Convert defaultdict to dict for JSON serialization
|
|
self.results['institution_type_detected'] = dict(self.results['institution_type_detected'])
|
|
|
|
# Add counts
|
|
for inst_type, files in self.results['institution_type_detected'].items():
|
|
self.results['institution_type_detected'][inst_type] = {
|
|
'count': len(files),
|
|
'sample': files[:5], # First 5 examples
|
|
}
|
|
|
|
return self.results
|
|
|
|
def print_summary(self):
|
|
"""Print validation summary."""
|
|
r = self.results
|
|
|
|
print("\n" + "=" * 70)
|
|
print("ORGANIZATION NAME VALIDATION REPORT")
|
|
print("=" * 70)
|
|
print(f"Validated at: {r['validated_at']}")
|
|
print(f"Entries directory: {self.entries_dir}")
|
|
print()
|
|
|
|
print("COVERAGE:")
|
|
print(f" Total entries: {r['total_entries']}")
|
|
print(f" With custodian_name: {r['entries_with_custodian_name']} ({100*r['entries_with_custodian_name']/max(1,r['total_entries']):.1f}%)")
|
|
print(f" Missing custodian_name: {r['entries_missing_custodian_name']}")
|
|
print()
|
|
|
|
print("NAME LENGTH STATISTICS:")
|
|
print(f" Min: {r['name_length_stats']['min']} chars")
|
|
print(f" Max: {r['name_length_stats']['max']} chars")
|
|
print(f" Avg: {r['name_length_stats'].get('avg', 'N/A')} chars")
|
|
print()
|
|
|
|
print("INSTITUTION TYPES DETECTED:")
|
|
for inst_type, data in sorted(r['institution_type_detected'].items(),
|
|
key=lambda x: -x[1]['count']):
|
|
print(f" {inst_type}: {data['count']}")
|
|
print()
|
|
|
|
print("ISSUES FOUND:")
|
|
if r['legal_form_in_name']:
|
|
print(f" Legal form in name: {len(r['legal_form_in_name'])}")
|
|
for item in r['legal_form_in_name'][:5]:
|
|
print(f" - {item['filename']}: '{item['name']}' ({item['legal_form']})")
|
|
if len(r['legal_form_in_name']) > 5:
|
|
print(f" ... and {len(r['legal_form_in_name']) - 5} more")
|
|
print()
|
|
|
|
print("PLACE NAMES IN ORGANIZATION NAMES (per ORG_INC003):")
|
|
print(f" Total with embedded places: {len(r['place_in_name'])}")
|
|
if r['place_in_name']:
|
|
print(" Sample:")
|
|
for item in r['place_in_name'][:10]:
|
|
print(f" - {item['name'][:50]} -> places: {item['places']}")
|
|
print()
|
|
|
|
other_issues = [i for i in r['issues'] if not any(
|
|
'LEGAL_FORM' in issue for issue in i['issues']
|
|
)]
|
|
if other_issues:
|
|
print("OTHER ISSUES:")
|
|
for item in other_issues[:10]:
|
|
print(f" - {item['filename']}: {item['issues']}")
|
|
print()
|
|
|
|
print("=" * 70)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Validate organization names against ORGANISATION entity rules'
|
|
)
|
|
parser.add_argument(
|
|
'--entries-dir',
|
|
default='data/nde/enriched/entries',
|
|
help='Directory containing entry YAML files'
|
|
)
|
|
parser.add_argument(
|
|
'--limit',
|
|
type=int,
|
|
default=None,
|
|
help='Limit number of entries to process (for testing)'
|
|
)
|
|
parser.add_argument(
|
|
'--output',
|
|
default='reports/organization_name_validation.json',
|
|
help='Output file for JSON report'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
'-v',
|
|
action='store_true',
|
|
help='Verbose output'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Resolve paths
|
|
base_dir = Path(__file__).parent.parent
|
|
entries_dir = base_dir / args.entries_dir
|
|
output_path = base_dir / args.output
|
|
|
|
if not entries_dir.exists():
|
|
print(f"ERROR: Entries directory not found: {entries_dir}")
|
|
return 1
|
|
|
|
print(f"Validating organization names in: {entries_dir}")
|
|
|
|
validator = OrganizationNameValidator(entries_dir)
|
|
results = validator.validate_all(limit=args.limit)
|
|
|
|
# Print summary
|
|
validator.print_summary()
|
|
|
|
# Save JSON report
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
print(f"JSON report saved to: {output_path}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|