glam/mcp_servers/wikidata_auth/server.py
2025-11-19 23:25:22 +01:00

652 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Enhanced Wikidata MCP Server with Hybrid API Authentication
Architecture:
- Action API (legacy): Used for search only - 500 requests/hour (no auth support)
- Wikibase REST API: Used for data retrieval - 5,000 requests/hour (with OAuth2 token)
Rate limits:
- Without token: 500 requests/hour (anonymous) on all endpoints
- With token: 5,000 requests/hour on REST API, 500 req/hr on search
Based on: https://github.com/zzaebok/mcp-wikidata
Enhanced with authentication support for GLAM data extraction project
"""
import httpx
import json
import os
from mcp.server.fastmcp import FastMCP
from typing import List, Dict, Optional
server = FastMCP("Wikidata MCP Server (Authenticated)")
# API Endpoints
WIKIDATA_ACTION_API = "https://www.wikidata.org/w/api.php"
WIKIDATA_REST_API = "https://www.wikidata.org/w/rest.php/wikibase/v1"
SPARQL_URL = "https://query.wikidata.org/sparql"
# Load authentication from environment
WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN", "")
WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "user@example.com")
# User-Agent is REQUIRED by Wikimedia policy
# Format: AppName/Version (contact@email.com)
USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL})"
# Headers for Action API (search only - no OAuth2 support)
ACTION_API_HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
# Headers for REST API (with OAuth2 token for 5,000 req/hour)
REST_API_HEADERS = {
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
if WIKIDATA_API_TOKEN:
REST_API_HEADERS["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}"
print(f"✓ Wikidata MCP Server authenticated with OAuth2 token")
print(f" Read operations (REST API): 5,000 requests/hour")
print(f" Write operations (Action API): OAuth2 authenticated")
print(f" Search (Action API): 500 requests/hour")
else:
print(f"⚠ Wikidata MCP Server running WITHOUT OAuth2 token")
print(f" All endpoints limited to 500 requests/hour")
print(f" Write operations DISABLED (requires authentication)")
print(f" Set WIKIDATA_API_TOKEN environment variable to increase rate limits")
print(f" User-Agent: {USER_AGENT}")
print(f" Strategy: Hybrid (Action API for search/writes, REST API for data retrieval)")
print(f" Available tools: search, read, create, edit, add_claim")
async def search_wikidata(query: str, is_entity: bool = True) -> str:
"""
Search for a Wikidata item or property ID by its query.
Uses Action API (no authentication - 500 req/hour limit).
Args:
query: Search query string
is_entity: True for entities (items), False for properties
Returns:
Wikidata ID (e.g., Q12345 or P123)
"""
params = {
"action": "query",
"list": "search",
"srsearch": query,
"srnamespace": 0 if is_entity else 120,
"srlimit": 1,
"srqiprofile": "classic_noboostlinks" if is_entity else "classic",
"srwhat": "text",
"format": "json",
}
async with httpx.AsyncClient() as client:
response = await client.get(WIKIDATA_ACTION_API, headers=ACTION_API_HEADERS, params=params)
response.raise_for_status()
try:
title = response.json()["query"]["search"][0]["title"]
# For properties, title format is "Property:P123", for entities it's just "Q123"
if ":" in title:
title = title.split(":")[-1]
return title
except (KeyError, IndexError):
return "No results found. Consider changing the search term."
@server.tool()
async def search_entity(query: str) -> str:
"""
Search for a Wikidata entity ID by its query.
Args:
query (str): The query to search for. The query should be unambiguous enough to uniquely identify the entity.
Returns:
str: The Wikidata entity ID corresponding to the given query (e.g., Q12345).
Example:
search_entity("Rijksmuseum") -> "Q190804"
"""
return await search_wikidata(query, is_entity=True)
@server.tool()
async def search_property(query: str) -> str:
"""
Search for a Wikidata property ID by its query.
Args:
query (str): The query to search for. The query should be unambiguous enough to uniquely identify the property.
Returns:
str: The Wikidata property ID corresponding to the given query (e.g., P123).
Example:
search_property("ISIL code") -> "P791"
"""
return await search_wikidata(query, is_entity=False)
@server.tool()
async def get_properties(entity_id: str) -> List[str]:
"""
Get the properties associated with a given Wikidata entity ID.
Uses Wikibase REST API (works with or without authentication).
Args:
entity_id (str): The entity ID to retrieve properties for. This should be a valid Wikidata entity ID.
Returns:
list: A list of property IDs associated with the given entity ID. If no properties are found, an empty list is returned.
Example:
get_properties("Q190804") -> ["P31", "P17", "P131", ...]
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=REST_API_HEADERS
)
# Check for OAuth errors and retry without authentication
if response.status_code == 403:
response_data = response.json()
if response_data.get("errorKey") == "mwoauth-invalid-authorization-invalid-user":
headers_no_auth = {k: v for k, v in REST_API_HEADERS.items() if k != "Authorization"}
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=headers_no_auth
)
response.raise_for_status()
data = response.json()
# Extract property IDs from statements
statements = data.get("statements", {})
return list(statements.keys())
@server.tool()
async def execute_sparql(sparql_query: str) -> str:
"""
Execute a SPARQL query on Wikidata.
You may assume the following prefixes:
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX p: <http://www.wikidata.org/prop/>
PREFIX ps: <http://www.wikidata.org/prop/statement/>
Args:
sparql_query (str): The SPARQL query to execute.
Returns:
str: The JSON-formatted result of the SPARQL query execution. If there are no results, an empty JSON object will be returned.
Example:
execute_sparql('SELECT ?item WHERE { ?item wdt:P791 "NL-AmRMA" }') -> JSON results
Note:
SPARQL endpoint does NOT use API token authentication.
Rate limits: https://www.wikidata.org/wiki/Wikidata:SPARQL_query_service/User_Manual#Query_limits
"""
# Note: SPARQL endpoint uses custom User-Agent for rate limiting
sparql_headers = {"User-Agent": USER_AGENT}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
SPARQL_URL,
headers=sparql_headers,
params={"query": sparql_query, "format": "json"}
)
response.raise_for_status()
result = response.json()["results"]["bindings"]
return json.dumps(result, indent=2)
@server.tool()
async def get_metadata(entity_id: str, language: str = "en") -> Dict[str, str]:
"""
Retrieve the label and description for a given Wikidata entity ID.
Uses Wikibase REST API (works with or without authentication).
Args:
entity_id (str): The entity ID to retrieve metadata for.
language (str): The language code for the label and description (default is "en"). Use ISO 639-1 codes.
Returns:
dict: A dictionary containing the label and description of the entity, if available.
Example:
get_metadata("Q190804", "en") -> {"Label": "Rijksmuseum", "Description": "museum in Amsterdam, Netherlands"}
get_metadata("Q190804", "nl") -> {"Label": "Rijksmuseum", "Description": "Nederlands nationaal museum in Amsterdam"}
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=REST_API_HEADERS
)
# Check for OAuth errors and retry without authentication
if response.status_code == 403:
response_data = response.json()
if response_data.get("errorKey") == "mwoauth-invalid-authorization-invalid-user":
# Retry without OAuth token (unified login not activated)
headers_no_auth = {k: v for k, v in REST_API_HEADERS.items() if k != "Authorization"}
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=headers_no_auth
)
response.raise_for_status()
data = response.json()
label = data.get("labels", {}).get(language, "No label found")
description = data.get("descriptions", {}).get(language, "No description found")
return {"Label": label, "Description": description}
@server.tool()
async def get_identifiers(entity_id: str) -> Dict[str, Optional[str]]:
"""
Get external identifiers for a Wikidata entity (ISIL, VIAF, etc.).
Uses Wikibase REST API (works with or without authentication).
Args:
entity_id (str): The Wikidata entity ID (e.g., Q190804)
Returns:
dict: Dictionary of identifier types and their values
Example:
get_identifiers("Q190804") -> {
"ISIL": "NL-AmRMA",
"VIAF": "148691498",
"official_website": "https://www.rijksmuseum.nl"
}
"""
# Key heritage-related property IDs
identifier_props = {
"P791": "ISIL", # ISIL code
"P214": "VIAF", # VIAF ID
"P227": "GND", # GND ID
"P244": "LCNAF", # Library of Congress
"P1566": "GeoNames", # GeoNames ID
"P856": "official_website", # Official website
"P2581": "BabelNet", # BabelNet ID
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=REST_API_HEADERS
)
# Check for OAuth errors and retry without authentication
if response.status_code == 403:
response_data = response.json()
if response_data.get("errorKey") == "mwoauth-invalid-authorization-invalid-user":
headers_no_auth = {k: v for k, v in REST_API_HEADERS.items() if k != "Authorization"}
response = await client.get(
f"{WIKIDATA_REST_API}/entities/items/{entity_id}",
headers=headers_no_auth
)
response.raise_for_status()
data = response.json()
statements = data.get("statements", {})
identifiers = {}
for prop_id, name in identifier_props.items():
if prop_id in statements and len(statements[prop_id]) > 0:
# Get the first value from the statement
try:
statement = statements[prop_id][0]
value_data = statement.get("value", {})
if prop_id == "P856": # URL
value = value_data.get("content")
else:
value = value_data.get("content")
identifiers[name] = value
except (KeyError, IndexError):
identifiers[name] = None
else:
identifiers[name] = None
return identifiers
async def _get_csrf_token() -> str:
"""
Get a CSRF token for authenticated write operations.
Uses Action API with OAuth2 Bearer token.
Returns:
str: CSRF token for use in wbeditentity calls
Raises:
ValueError: If no authentication token is available
httpx.HTTPError: If token retrieval fails
"""
if not WIKIDATA_API_TOKEN:
raise ValueError("CSRF token requires authentication. Set WIKIDATA_API_TOKEN environment variable.")
# Action API requires OAuth token in Authorization header
auth_headers = {
"Authorization": f"Bearer {WIKIDATA_API_TOKEN}",
"User-Agent": USER_AGENT,
}
params = {
"action": "query",
"meta": "tokens",
"type": "csrf",
"format": "json",
}
async with httpx.AsyncClient() as client:
response = await client.get(
WIKIDATA_ACTION_API,
headers=auth_headers,
params=params
)
response.raise_for_status()
data = response.json()
# Check for error responses
if "error" in data:
error_info = data["error"]
raise ValueError(f"CSRF token error: {error_info.get('code')} - {error_info.get('info')}")
token = data["query"]["tokens"]["csrftoken"]
if token == "+\\":
raise ValueError("Received anonymous CSRF token. Check OAuth2 authentication.")
return token
@server.tool()
async def create_entity(
labels: Dict[str, str],
descriptions: Optional[Dict[str, str]] = None,
aliases: Optional[Dict[str, List[str]]] = None,
) -> str:
"""
Create a new Wikidata item with labels, descriptions, and aliases.
Requires OAuth2 authentication.
Args:
labels (dict): Labels in different languages, e.g., {"en": "Example Item", "nl": "Voorbeeld item"}
descriptions (dict): Optional descriptions, e.g., {"en": "An example item", "nl": "Een voorbeeld item"}
aliases (dict): Optional aliases, e.g., {"en": ["Alias 1", "Alias 2"]}
Returns:
str: The entity ID of the newly created item (e.g., "Q12345")
Example:
create_entity(
labels={"en": "Amsterdam Museum", "nl": "Amsterdam Museum"},
descriptions={"en": "municipal museum in Amsterdam", "nl": "gemeentelijk museum in Amsterdam"}
) -> "Q98765432"
Note:
- Requires WIKIDATA_API_TOKEN environment variable
- Uses Action API wbeditentity endpoint
- Rate limited by OAuth2 quota (5,000 req/hr for authenticated users)
"""
# Get CSRF token
csrf_token = await _get_csrf_token()
# Build entity data structure
entity_data = {"labels": {}}
for lang, label in labels.items():
entity_data["labels"][lang] = {"language": lang, "value": label}
if descriptions:
entity_data["descriptions"] = {}
for lang, desc in descriptions.items():
entity_data["descriptions"][lang] = {"language": lang, "value": desc}
if aliases:
entity_data["aliases"] = {}
for lang, alias_list in aliases.items():
entity_data["aliases"][lang] = [
{"language": lang, "value": alias} for alias in alias_list
]
# Prepare API request
auth_headers = {
"Authorization": f"Bearer {WIKIDATA_API_TOKEN}",
"User-Agent": USER_AGENT,
"Content-Type": "application/x-www-form-urlencoded",
}
payload = {
"action": "wbeditentity",
"new": "item",
"data": json.dumps(entity_data),
"token": csrf_token,
"format": "json",
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
WIKIDATA_ACTION_API,
headers=auth_headers,
data=payload
)
response.raise_for_status()
result = response.json()
# Check for API errors
if "error" in result:
error_info = result["error"]
raise ValueError(f"Create entity error: {error_info.get('code')} - {error_info.get('info')}")
entity_id = result["entity"]["id"]
return f"Successfully created entity {entity_id}"
@server.tool()
async def edit_entity(
entity_id: str,
labels: Optional[Dict[str, str]] = None,
descriptions: Optional[Dict[str, str]] = None,
aliases: Optional[Dict[str, List[str]]] = None,
) -> str:
"""
Edit an existing Wikidata entity's labels, descriptions, or aliases.
Requires OAuth2 authentication.
Args:
entity_id (str): The entity ID to edit (e.g., "Q12345")
labels (dict): Optional labels to add/update, e.g., {"en": "New Label"}
descriptions (dict): Optional descriptions to add/update, e.g., {"en": "New description"}
aliases (dict): Optional aliases to add, e.g., {"en": ["Alias 1", "Alias 2"]}
Returns:
str: Success message with entity ID
Example:
edit_entity(
entity_id="Q98765",
labels={"en": "Updated Label"},
descriptions={"en": "Updated description"}
) -> "Successfully edited entity Q98765"
Note:
- Requires WIKIDATA_API_TOKEN environment variable
- Uses Action API wbeditentity endpoint
- Existing values are updated; new languages are added
"""
# Get CSRF token
csrf_token = await _get_csrf_token()
# Build entity data structure (only include provided fields)
entity_data = {}
if labels:
entity_data["labels"] = {}
for lang, label in labels.items():
entity_data["labels"][lang] = {"language": lang, "value": label}
if descriptions:
entity_data["descriptions"] = {}
for lang, desc in descriptions.items():
entity_data["descriptions"][lang] = {"language": lang, "value": desc}
if aliases:
entity_data["aliases"] = {}
for lang, alias_list in aliases.items():
entity_data["aliases"][lang] = [
{"language": lang, "value": alias} for alias in alias_list
]
if not entity_data:
return "No changes requested. Provide at least one of: labels, descriptions, or aliases."
# Prepare API request
auth_headers = {
"Authorization": f"Bearer {WIKIDATA_API_TOKEN}",
"User-Agent": USER_AGENT,
"Content-Type": "application/x-www-form-urlencoded",
}
payload = {
"action": "wbeditentity",
"id": entity_id,
"data": json.dumps(entity_data),
"token": csrf_token,
"format": "json",
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
WIKIDATA_ACTION_API,
headers=auth_headers,
data=payload
)
response.raise_for_status()
result = response.json()
# Check for API errors
if "error" in result:
error_info = result["error"]
raise ValueError(f"Edit entity error: {error_info.get('code')} - {error_info.get('info')}")
return f"Successfully edited entity {entity_id}"
@server.tool()
async def add_claim(
entity_id: str,
property_id: str,
value: str,
value_type: str = "string"
) -> str:
"""
Add a claim (statement) to a Wikidata entity.
Requires OAuth2 authentication.
Args:
entity_id (str): The entity ID to add the claim to (e.g., "Q12345")
property_id (str): The property ID for the claim (e.g., "P31" for "instance of")
value (str): The value for the claim:
- For "item": another entity ID (e.g., "Q33506")
- For "string": a text string
- For "url": a URL string
- For "time": ISO 8601 format (e.g., "+2023-01-15T00:00:00Z")
value_type (str): Type of value - "item", "string", "url", or "time" (default: "string")
Returns:
str: Success message with claim ID
Example:
# Add "instance of: museum" (P31: Q33506)
add_claim(entity_id="Q98765", property_id="P31", value="Q33506", value_type="item")
# Add ISIL code
add_claim(entity_id="Q98765", property_id="P791", value="NL-ABC", value_type="string")
# Add official website
add_claim(entity_id="Q98765", property_id="P856", value="https://example.org", value_type="url")
Note:
- Requires WIKIDATA_API_TOKEN environment variable
- Uses Action API wbcreateclaim endpoint
- Does not check for duplicate claims
"""
# Get CSRF token
csrf_token = await _get_csrf_token()
# Build claim value based on type
if value_type == "item":
claim_value = json.dumps({
"entity-type": "item",
"numeric-id": int(value.replace("Q", ""))
})
elif value_type == "string":
claim_value = json.dumps(value)
elif value_type == "url":
claim_value = json.dumps(value)
elif value_type == "time":
# Time values require special structure
claim_value = json.dumps({
"time": value,
"timezone": 0,
"before": 0,
"after": 0,
"precision": 11, # Day precision
"calendarmodel": "http://www.wikidata.org/entity/Q1985727"
})
else:
raise ValueError(f"Unsupported value_type: {value_type}. Use 'item', 'string', 'url', or 'time'.")
# Prepare API request
auth_headers = {
"Authorization": f"Bearer {WIKIDATA_API_TOKEN}",
"User-Agent": USER_AGENT,
"Content-Type": "application/x-www-form-urlencoded",
}
payload = {
"action": "wbcreateclaim",
"entity": entity_id,
"property": property_id,
"snaktype": "value",
"value": claim_value,
"token": csrf_token,
"format": "json",
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
WIKIDATA_ACTION_API,
headers=auth_headers,
data=payload
)
response.raise_for_status()
result = response.json()
# Check for API errors
if "error" in result:
error_info = result["error"]
raise ValueError(f"Add claim error: {error_info.get('code')} - {error_info.get('info')}")
claim_id = result["claim"]["id"]
return f"Successfully added claim {claim_id} to entity {entity_id}"
if __name__ == "__main__":
server.run()