- Implemented `generate_mermaid_with_instances.py` to create ER diagrams that include all classes, relationships, enum values, and instance data. - Loaded instance data from YAML files and enriched enum definitions with meaningful annotations. - Configured output paths for generated diagrams in both frontend and schema directories. - Added support for excluding technical classes and limiting the number of displayed enum and instance values for readability.
1292 lines
45 KiB
Python
1292 lines
45 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Social Media MCP Server - Unified Media Content Extraction
|
|
|
|
Provides tools to obtain media content from:
|
|
1. YouTube - Video info, transcripts, channel data, search
|
|
2. LinkedIn - Profile scraping, job search, company info, feed posts
|
|
3. Facebook - Page posts, comments, insights, messaging
|
|
4. Instagram - Business profile, media posts, insights, DMs
|
|
|
|
Architecture:
|
|
- Uses official APIs where available (YouTube Data API, Facebook Graph API, Instagram Graph API)
|
|
- Uses unofficial/scraping methods for LinkedIn (no official API for content reading)
|
|
- Supports authentication via environment variables
|
|
|
|
Environment Variables:
|
|
- YOUTUBE_API_KEY: YouTube Data API v3 key
|
|
- LINKEDIN_EMAIL: LinkedIn login email
|
|
- LINKEDIN_PASSWORD: LinkedIn login password
|
|
- LINKEDIN_COOKIE: LinkedIn session cookie (li_at) - alternative to email/password
|
|
- FACEBOOK_ACCESS_TOKEN: Facebook Page Access Token
|
|
- FACEBOOK_PAGE_ID: Facebook Page ID
|
|
- INSTAGRAM_ACCESS_TOKEN: Instagram Business Account Token
|
|
- INSTAGRAM_BUSINESS_ACCOUNT_ID: Instagram Business Account ID
|
|
|
|
Based on patterns from:
|
|
- https://github.com/anaisbetts/mcp-youtube
|
|
- https://github.com/ZubeidHendricks/youtube-mcp-server
|
|
- https://github.com/adhikasp/mcp-linkedin
|
|
- https://github.com/stickerdaniel/linkedin-mcp-server
|
|
- https://github.com/jlbadano/ig-mcp
|
|
- https://github.com/tiroshanm/facebook-mcp-server
|
|
"""
|
|
|
|
import httpx
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
from typing import List, Dict, Optional, Any
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
server = FastMCP("Social Media MCP Server")
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
# User-Agent for API requests
|
|
USER_AGENT = "SocialMediaMCP/1.0"
|
|
|
|
# YouTube configuration
|
|
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY", "")
|
|
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
|
|
|
|
# LinkedIn configuration (uses unofficial API - scraping based)
|
|
LINKEDIN_EMAIL = os.getenv("LINKEDIN_EMAIL", "")
|
|
LINKEDIN_PASSWORD = os.getenv("LINKEDIN_PASSWORD", "")
|
|
LINKEDIN_COOKIE = os.getenv("LINKEDIN_COOKIE", "")
|
|
|
|
# Facebook Graph API configuration
|
|
FACEBOOK_ACCESS_TOKEN = os.getenv("FACEBOOK_ACCESS_TOKEN", "")
|
|
FACEBOOK_PAGE_ID = os.getenv("FACEBOOK_PAGE_ID", "")
|
|
FACEBOOK_API_VERSION = os.getenv("FACEBOOK_API_VERSION", "v19.0")
|
|
FACEBOOK_API_BASE = f"https://graph.facebook.com/{FACEBOOK_API_VERSION}"
|
|
|
|
# Instagram Graph API configuration (uses Facebook Graph API)
|
|
INSTAGRAM_ACCESS_TOKEN = os.getenv("INSTAGRAM_ACCESS_TOKEN", "")
|
|
INSTAGRAM_BUSINESS_ACCOUNT_ID = os.getenv("INSTAGRAM_BUSINESS_ACCOUNT_ID", "")
|
|
INSTAGRAM_API_VERSION = os.getenv("INSTAGRAM_API_VERSION", "v19.0")
|
|
INSTAGRAM_API_BASE = f"https://graph.facebook.com/{INSTAGRAM_API_VERSION}"
|
|
|
|
# Print configuration status
|
|
print("=" * 60)
|
|
print("Social Media MCP Server - Configuration Status")
|
|
print("=" * 60)
|
|
print(f"YouTube API Key: {'✓ Configured' if YOUTUBE_API_KEY else '✗ Not configured'}")
|
|
print(f"LinkedIn Auth: {'✓ Cookie' if LINKEDIN_COOKIE else ('✓ Email/Password' if LINKEDIN_EMAIL else '✗ Not configured')}")
|
|
print(f"Facebook Token: {'✓ Configured' if FACEBOOK_ACCESS_TOKEN else '✗ Not configured'}")
|
|
print(f"Instagram Token: {'✓ Configured' if INSTAGRAM_ACCESS_TOKEN else '✗ Not configured'}")
|
|
print("=" * 60)
|
|
|
|
|
|
# ============================================================================
|
|
# Helper Functions
|
|
# ============================================================================
|
|
|
|
def extract_video_id(url_or_id: str) -> str:
|
|
"""Extract YouTube video ID from URL or return as-is if already an ID."""
|
|
patterns = [
|
|
r'(?:v=|/)([0-9A-Za-z_-]{11})(?:[&?]|$)',
|
|
r'(?:youtu\.be/)([0-9A-Za-z_-]{11})',
|
|
r'^([0-9A-Za-z_-]{11})$'
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url_or_id)
|
|
if match:
|
|
return match.group(1)
|
|
return url_or_id
|
|
|
|
|
|
def extract_channel_id(url_or_id: str) -> str:
|
|
"""Extract YouTube channel ID from URL or return as-is."""
|
|
patterns = [
|
|
r'(?:channel/)([UC][0-9A-Za-z_-]{22})',
|
|
r'^([UC][0-9A-Za-z_-]{22})$'
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url_or_id)
|
|
if match:
|
|
return match.group(1)
|
|
return url_or_id
|
|
|
|
|
|
def extract_linkedin_profile_id(url_or_id: str) -> str:
|
|
"""Extract LinkedIn profile ID from URL or return as-is."""
|
|
patterns = [
|
|
r'linkedin\.com/in/([^/?]+)',
|
|
r'^([a-zA-Z0-9-]+)$'
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url_or_id)
|
|
if match:
|
|
return match.group(1)
|
|
return url_or_id
|
|
|
|
|
|
def extract_linkedin_company_id(url_or_id: str) -> str:
|
|
"""Extract LinkedIn company ID from URL or return as-is."""
|
|
patterns = [
|
|
r'linkedin\.com/company/([^/?]+)',
|
|
r'^([a-zA-Z0-9-]+)$'
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url_or_id)
|
|
if match:
|
|
return match.group(1)
|
|
return url_or_id
|
|
|
|
|
|
# ============================================================================
|
|
# YOUTUBE TOOLS
|
|
# ============================================================================
|
|
|
|
@server.tool()
|
|
async def youtube_get_video_info(video_url_or_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a YouTube video.
|
|
|
|
Args:
|
|
video_url_or_id: YouTube video URL or video ID
|
|
|
|
Returns:
|
|
dict: Video metadata including title, description, duration, view count,
|
|
like count, channel info, publish date, and more.
|
|
|
|
Example:
|
|
youtube_get_video_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
|
|
youtube_get_video_info("dQw4w9WgXcQ")
|
|
"""
|
|
video_id = extract_video_id(video_url_or_id)
|
|
|
|
if YOUTUBE_API_KEY:
|
|
# Use YouTube Data API
|
|
params = {
|
|
"part": "snippet,contentDetails,statistics",
|
|
"id": video_id,
|
|
"key": YOUTUBE_API_KEY
|
|
}
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{YOUTUBE_API_BASE}/videos",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data.get("items"):
|
|
return {"error": f"Video not found: {video_id}"}
|
|
|
|
item = data["items"][0]
|
|
return {
|
|
"video_id": video_id,
|
|
"title": item["snippet"]["title"],
|
|
"description": item["snippet"]["description"],
|
|
"channel_id": item["snippet"]["channelId"],
|
|
"channel_title": item["snippet"]["channelTitle"],
|
|
"published_at": item["snippet"]["publishedAt"],
|
|
"duration": item["contentDetails"]["duration"],
|
|
"view_count": item["statistics"].get("viewCount"),
|
|
"like_count": item["statistics"].get("likeCount"),
|
|
"comment_count": item["statistics"].get("commentCount"),
|
|
"tags": item["snippet"].get("tags", []),
|
|
"thumbnails": item["snippet"]["thumbnails"]
|
|
}
|
|
else:
|
|
# Fallback: Use yt-dlp for metadata extraction
|
|
try:
|
|
result = subprocess.run(
|
|
["yt-dlp", "--dump-json", "--skip-download", f"https://www.youtube.com/watch?v={video_id}"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60
|
|
)
|
|
if result.returncode == 0:
|
|
data = json.loads(result.stdout)
|
|
return {
|
|
"video_id": data.get("id"),
|
|
"title": data.get("title"),
|
|
"description": data.get("description"),
|
|
"channel_id": data.get("channel_id"),
|
|
"channel_title": data.get("uploader"),
|
|
"published_at": data.get("upload_date"),
|
|
"duration": data.get("duration"),
|
|
"view_count": data.get("view_count"),
|
|
"like_count": data.get("like_count"),
|
|
"comment_count": data.get("comment_count"),
|
|
"tags": data.get("tags", []),
|
|
}
|
|
return {"error": f"yt-dlp failed: {result.stderr}"}
|
|
except FileNotFoundError:
|
|
return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"}
|
|
except subprocess.TimeoutExpired:
|
|
return {"error": "Video info extraction timed out"}
|
|
|
|
|
|
@server.tool()
|
|
async def youtube_get_transcript(video_url_or_id: str, language: str = "en") -> Dict[str, Any]:
|
|
"""
|
|
Get the transcript/subtitles from a YouTube video.
|
|
|
|
Uses yt-dlp to extract subtitles in the specified language.
|
|
Falls back to auto-generated captions if manual ones aren't available.
|
|
|
|
Args:
|
|
video_url_or_id: YouTube video URL or video ID
|
|
language: Language code for subtitles (default: "en")
|
|
|
|
Returns:
|
|
dict: Contains transcript text and metadata
|
|
|
|
Example:
|
|
youtube_get_transcript("dQw4w9WgXcQ", "en")
|
|
"""
|
|
video_id = extract_video_id(video_url_or_id)
|
|
video_url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Try to get subtitles with yt-dlp
|
|
result = subprocess.run(
|
|
[
|
|
"yt-dlp",
|
|
"--write-subs",
|
|
"--write-auto-subs",
|
|
"--sub-langs", language,
|
|
"--sub-format", "vtt",
|
|
"--skip-download",
|
|
"--output", f"{tmpdir}/%(id)s",
|
|
video_url
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120
|
|
)
|
|
|
|
# Look for the subtitle file
|
|
import glob
|
|
vtt_files = glob.glob(f"{tmpdir}/*.vtt")
|
|
|
|
if vtt_files:
|
|
with open(vtt_files[0], 'r', encoding='utf-8') as f:
|
|
vtt_content = f.read()
|
|
|
|
# Parse VTT to extract text
|
|
lines = []
|
|
for line in vtt_content.split('\n'):
|
|
line = line.strip()
|
|
# Skip headers, timestamps, and empty lines
|
|
if line and not line.startswith('WEBVTT') and not line.startswith('Kind:') \
|
|
and not line.startswith('Language:') and '-->' not in line \
|
|
and not re.match(r'^\d+$', line):
|
|
# Remove HTML tags
|
|
clean_line = re.sub(r'<[^>]+>', '', line)
|
|
if clean_line:
|
|
lines.append(clean_line)
|
|
|
|
# Remove duplicate consecutive lines
|
|
deduped = []
|
|
for line in lines:
|
|
if not deduped or line != deduped[-1]:
|
|
deduped.append(line)
|
|
|
|
transcript = ' '.join(deduped)
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"language": language,
|
|
"transcript": transcript,
|
|
"source": "auto" if ".auto." in vtt_files[0] else "manual"
|
|
}
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"error": f"No subtitles available in language: {language}",
|
|
"available_info": result.stderr if result.stderr else "Check video for available languages"
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"}
|
|
except subprocess.TimeoutExpired:
|
|
return {"error": "Transcript extraction timed out"}
|
|
|
|
|
|
@server.tool()
|
|
async def youtube_search_videos(
|
|
query: str,
|
|
max_results: int = 10,
|
|
order: str = "relevance"
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Search for YouTube videos.
|
|
|
|
Args:
|
|
query: Search query string
|
|
max_results: Maximum number of results to return (default: 10, max: 50)
|
|
order: Sort order - 'relevance', 'date', 'viewCount', 'rating' (default: relevance)
|
|
|
|
Returns:
|
|
dict: List of video results with basic metadata
|
|
|
|
Example:
|
|
youtube_search_videos("python tutorials", max_results=5)
|
|
"""
|
|
if not YOUTUBE_API_KEY:
|
|
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
|
|
|
|
params = {
|
|
"part": "snippet",
|
|
"q": query,
|
|
"type": "video",
|
|
"maxResults": min(max_results, 50),
|
|
"order": order,
|
|
"key": YOUTUBE_API_KEY
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{YOUTUBE_API_BASE}/search",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = []
|
|
for item in data.get("items", []):
|
|
results.append({
|
|
"video_id": item["id"]["videoId"],
|
|
"title": item["snippet"]["title"],
|
|
"description": item["snippet"]["description"],
|
|
"channel_id": item["snippet"]["channelId"],
|
|
"channel_title": item["snippet"]["channelTitle"],
|
|
"published_at": item["snippet"]["publishedAt"],
|
|
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"]
|
|
})
|
|
|
|
return {
|
|
"query": query,
|
|
"total_results": data.get("pageInfo", {}).get("totalResults"),
|
|
"results": results
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def youtube_get_channel_info(channel_url_or_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information about a YouTube channel.
|
|
|
|
Args:
|
|
channel_url_or_id: YouTube channel URL or channel ID
|
|
|
|
Returns:
|
|
dict: Channel metadata including name, description, subscriber count, etc.
|
|
|
|
Example:
|
|
youtube_get_channel_info("UCsXVk37bltHxD1rDPwtNM8Q")
|
|
"""
|
|
if not YOUTUBE_API_KEY:
|
|
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
|
|
|
|
channel_id = extract_channel_id(channel_url_or_id)
|
|
|
|
params = {
|
|
"part": "snippet,statistics,brandingSettings",
|
|
"id": channel_id,
|
|
"key": YOUTUBE_API_KEY
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{YOUTUBE_API_BASE}/channels",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data.get("items"):
|
|
return {"error": f"Channel not found: {channel_id}"}
|
|
|
|
item = data["items"][0]
|
|
return {
|
|
"channel_id": channel_id,
|
|
"title": item["snippet"]["title"],
|
|
"description": item["snippet"]["description"],
|
|
"custom_url": item["snippet"].get("customUrl"),
|
|
"published_at": item["snippet"]["publishedAt"],
|
|
"country": item["snippet"].get("country"),
|
|
"subscriber_count": item["statistics"].get("subscriberCount"),
|
|
"video_count": item["statistics"].get("videoCount"),
|
|
"view_count": item["statistics"].get("viewCount"),
|
|
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"],
|
|
"banner": item.get("brandingSettings", {}).get("image", {}).get("bannerExternalUrl")
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def youtube_get_channel_videos(
|
|
channel_url_or_id: str,
|
|
max_results: int = 20
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get recent videos from a YouTube channel.
|
|
|
|
Args:
|
|
channel_url_or_id: YouTube channel URL or channel ID
|
|
max_results: Maximum number of videos to return (default: 20, max: 50)
|
|
|
|
Returns:
|
|
dict: List of recent videos from the channel
|
|
|
|
Example:
|
|
youtube_get_channel_videos("UCsXVk37bltHxD1rDPwtNM8Q", max_results=10)
|
|
"""
|
|
if not YOUTUBE_API_KEY:
|
|
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
|
|
|
|
channel_id = extract_channel_id(channel_url_or_id)
|
|
|
|
params = {
|
|
"part": "snippet",
|
|
"channelId": channel_id,
|
|
"type": "video",
|
|
"order": "date",
|
|
"maxResults": min(max_results, 50),
|
|
"key": YOUTUBE_API_KEY
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{YOUTUBE_API_BASE}/search",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
videos = []
|
|
for item in data.get("items", []):
|
|
videos.append({
|
|
"video_id": item["id"]["videoId"],
|
|
"title": item["snippet"]["title"],
|
|
"description": item["snippet"]["description"],
|
|
"published_at": item["snippet"]["publishedAt"],
|
|
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"]
|
|
})
|
|
|
|
return {
|
|
"channel_id": channel_id,
|
|
"videos": videos
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# LINKEDIN TOOLS
|
|
# ============================================================================
|
|
|
|
# Note: LinkedIn doesn't have an official API for reading content.
|
|
# These tools use web scraping / unofficial methods.
|
|
# Users should be aware this may violate LinkedIn ToS.
|
|
|
|
@server.tool()
|
|
async def linkedin_get_profile(profile_url_or_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information from a LinkedIn profile.
|
|
|
|
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
|
|
Requires LINKEDIN_COOKIE environment variable.
|
|
|
|
Args:
|
|
profile_url_or_id: LinkedIn profile URL or vanity name
|
|
|
|
Returns:
|
|
dict: Profile information including name, headline, experience, education
|
|
|
|
Example:
|
|
linkedin_get_profile("https://www.linkedin.com/in/satyanadella/")
|
|
linkedin_get_profile("satyanadella")
|
|
"""
|
|
if not LINKEDIN_COOKIE:
|
|
return {
|
|
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
|
|
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
|
|
}
|
|
|
|
profile_id = extract_linkedin_profile_id(profile_url_or_id)
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
# LinkedIn Voyager API endpoint (unofficial)
|
|
profile_url = f"https://www.linkedin.com/voyager/api/identity/profiles/{profile_id}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
|
|
response = await client.get(profile_url, headers=headers)
|
|
|
|
if response.status_code == 401:
|
|
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
|
|
elif response.status_code == 404:
|
|
return {"error": f"Profile not found: {profile_id}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return {
|
|
"profile_id": profile_id,
|
|
"first_name": data.get("firstName"),
|
|
"last_name": data.get("lastName"),
|
|
"headline": data.get("headline"),
|
|
"summary": data.get("summary"),
|
|
"location": data.get("locationName"),
|
|
"industry": data.get("industryName"),
|
|
"profile_url": f"https://www.linkedin.com/in/{profile_id}/",
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
return {"error": f"LinkedIn API error: {e.response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": f"Failed to fetch profile: {str(e)}"}
|
|
|
|
|
|
@server.tool()
|
|
async def linkedin_get_company(company_url_or_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information about a LinkedIn company page.
|
|
|
|
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
|
|
Requires LINKEDIN_COOKIE environment variable.
|
|
|
|
Args:
|
|
company_url_or_id: LinkedIn company URL or company name/ID
|
|
|
|
Returns:
|
|
dict: Company information including name, description, industry, size
|
|
|
|
Example:
|
|
linkedin_get_company("https://www.linkedin.com/company/microsoft/")
|
|
linkedin_get_company("microsoft")
|
|
"""
|
|
if not LINKEDIN_COOKIE:
|
|
return {
|
|
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
|
|
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
|
|
}
|
|
|
|
company_id = extract_linkedin_company_id(company_url_or_id)
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
# LinkedIn Voyager API endpoint (unofficial)
|
|
company_url = f"https://www.linkedin.com/voyager/api/organization/companies?decorationId=com.linkedin.voyager.deco.organization.web.WebFullCompanyMain-12&q=universalName&universalName={company_id}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
|
|
response = await client.get(company_url, headers=headers)
|
|
|
|
if response.status_code == 401:
|
|
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
elements = data.get("elements", [])
|
|
if not elements:
|
|
return {"error": f"Company not found: {company_id}"}
|
|
|
|
company = elements[0]
|
|
return {
|
|
"company_id": company_id,
|
|
"name": company.get("name"),
|
|
"tagline": company.get("tagline"),
|
|
"description": company.get("description"),
|
|
"industry": company.get("companyIndustries", [{}])[0].get("localizedName") if company.get("companyIndustries") else None,
|
|
"company_size": company.get("staffCountRange", {}).get("start"),
|
|
"headquarters": company.get("headquarter", {}).get("city"),
|
|
"website": company.get("companyPageUrl"),
|
|
"founded_year": company.get("foundedOn", {}).get("year"),
|
|
"specialities": company.get("specialities", []),
|
|
"company_url": f"https://www.linkedin.com/company/{company_id}/",
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
return {"error": f"LinkedIn API error: {e.response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": f"Failed to fetch company: {str(e)}"}
|
|
|
|
|
|
@server.tool()
|
|
async def linkedin_search_jobs(
|
|
keywords: str,
|
|
location: Optional[str] = None,
|
|
limit: int = 10
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Search for jobs on LinkedIn.
|
|
|
|
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
|
|
Requires LINKEDIN_COOKIE environment variable.
|
|
|
|
Args:
|
|
keywords: Job search keywords
|
|
location: Location filter (optional)
|
|
limit: Maximum number of results (default: 10)
|
|
|
|
Returns:
|
|
dict: List of job postings matching the search
|
|
|
|
Example:
|
|
linkedin_search_jobs("software engineer", "San Francisco", limit=5)
|
|
"""
|
|
if not LINKEDIN_COOKIE:
|
|
return {
|
|
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
|
|
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
|
|
}
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
# Build search URL
|
|
params = {
|
|
"keywords": keywords,
|
|
"count": min(limit, 25),
|
|
"start": 0,
|
|
}
|
|
if location:
|
|
params["location"] = location
|
|
|
|
search_url = f"https://www.linkedin.com/voyager/api/voyagerJobsDashJobCards"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
|
|
response = await client.get(search_url, params=params, headers=headers)
|
|
|
|
if response.status_code == 401:
|
|
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
jobs = []
|
|
for element in data.get("elements", [])[:limit]:
|
|
job = element.get("jobCardUnion", {}).get("jobPostingCard", {})
|
|
jobs.append({
|
|
"title": job.get("title"),
|
|
"company": job.get("primaryDescription", {}).get("text"),
|
|
"location": job.get("secondaryDescription", {}).get("text"),
|
|
"posted_time": job.get("tertiaryDescription", {}).get("text"),
|
|
"job_url": f"https://www.linkedin.com/jobs/view/{job.get('jobPostingId')}" if job.get("jobPostingId") else None
|
|
})
|
|
|
|
return {
|
|
"keywords": keywords,
|
|
"location": location,
|
|
"jobs": jobs
|
|
}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
return {"error": f"LinkedIn API error: {e.response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": f"Failed to search jobs: {str(e)}"}
|
|
|
|
|
|
@server.tool()
|
|
async def linkedin_get_feed_posts(limit: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Get recent posts from your LinkedIn feed.
|
|
|
|
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
|
|
Requires LINKEDIN_COOKIE environment variable.
|
|
|
|
Args:
|
|
limit: Maximum number of posts to return (default: 10)
|
|
|
|
Returns:
|
|
dict: List of recent feed posts
|
|
|
|
Example:
|
|
linkedin_get_feed_posts(limit=5)
|
|
"""
|
|
if not LINKEDIN_COOKIE:
|
|
return {
|
|
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
|
|
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
|
|
}
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
feed_url = f"https://www.linkedin.com/voyager/api/feed/updates?count={min(limit, 25)}"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
|
|
response = await client.get(feed_url, headers=headers)
|
|
|
|
if response.status_code == 401:
|
|
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
posts = []
|
|
for element in data.get("elements", [])[:limit]:
|
|
update = element.get("value", {}).get("com.linkedin.voyager.feed.render.UpdateV2", {})
|
|
actor = update.get("actor", {})
|
|
commentary = update.get("commentary", {}).get("text", {}).get("text", "")
|
|
|
|
posts.append({
|
|
"author_name": actor.get("name", {}).get("text"),
|
|
"author_headline": actor.get("description", {}).get("text"),
|
|
"content": commentary[:500] if commentary else None,
|
|
"num_likes": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numLikes"),
|
|
"num_comments": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numComments"),
|
|
})
|
|
|
|
return {"posts": posts}
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
return {"error": f"LinkedIn API error: {e.response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": f"Failed to get feed: {str(e)}"}
|
|
|
|
|
|
# ============================================================================
|
|
# FACEBOOK TOOLS
|
|
# ============================================================================
|
|
|
|
@server.tool()
|
|
async def facebook_get_page_posts(
|
|
page_id: Optional[str] = None,
|
|
limit: int = 10
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get recent posts from a Facebook Page.
|
|
|
|
Requires FACEBOOK_ACCESS_TOKEN and optionally FACEBOOK_PAGE_ID environment variables.
|
|
|
|
Args:
|
|
page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided)
|
|
limit: Maximum number of posts to return (default: 10)
|
|
|
|
Returns:
|
|
dict: List of recent posts with engagement metrics
|
|
|
|
Example:
|
|
facebook_get_page_posts("12345678", limit=5)
|
|
"""
|
|
if not FACEBOOK_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable.",
|
|
"help": "Get a Page Access Token from Facebook Developer Console."
|
|
}
|
|
|
|
target_page_id = page_id or FACEBOOK_PAGE_ID
|
|
if not target_page_id:
|
|
return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."}
|
|
|
|
params = {
|
|
"access_token": FACEBOOK_ACCESS_TOKEN,
|
|
"fields": "id,message,created_time,shares,attachments,likes.summary(true),comments.summary(true)",
|
|
"limit": min(limit, 100)
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{FACEBOOK_API_BASE}/{target_page_id}/posts",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
posts = []
|
|
for post in data.get("data", []):
|
|
posts.append({
|
|
"post_id": post.get("id"),
|
|
"message": post.get("message"),
|
|
"created_time": post.get("created_time"),
|
|
"likes_count": post.get("likes", {}).get("summary", {}).get("total_count", 0),
|
|
"comments_count": post.get("comments", {}).get("summary", {}).get("total_count", 0),
|
|
"shares_count": post.get("shares", {}).get("count", 0),
|
|
"attachments": post.get("attachments", {}).get("data", [])
|
|
})
|
|
|
|
return {
|
|
"page_id": target_page_id,
|
|
"posts": posts
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def facebook_get_post_comments(
|
|
post_id: str,
|
|
limit: int = 25
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get comments on a Facebook post.
|
|
|
|
Requires FACEBOOK_ACCESS_TOKEN environment variable.
|
|
|
|
Args:
|
|
post_id: The Facebook post ID
|
|
limit: Maximum number of comments to return (default: 25)
|
|
|
|
Returns:
|
|
dict: List of comments with user info and engagement
|
|
|
|
Example:
|
|
facebook_get_post_comments("123456789_987654321", limit=10)
|
|
"""
|
|
if not FACEBOOK_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
|
|
}
|
|
|
|
params = {
|
|
"access_token": FACEBOOK_ACCESS_TOKEN,
|
|
"fields": "id,message,created_time,from,like_count,comment_count",
|
|
"limit": min(limit, 100)
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{FACEBOOK_API_BASE}/{post_id}/comments",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
comments = []
|
|
for comment in data.get("data", []):
|
|
comments.append({
|
|
"comment_id": comment.get("id"),
|
|
"message": comment.get("message"),
|
|
"created_time": comment.get("created_time"),
|
|
"from_name": comment.get("from", {}).get("name"),
|
|
"from_id": comment.get("from", {}).get("id"),
|
|
"like_count": comment.get("like_count", 0),
|
|
"reply_count": comment.get("comment_count", 0)
|
|
})
|
|
|
|
return {
|
|
"post_id": post_id,
|
|
"comments": comments
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def facebook_post_to_page(
|
|
message: str,
|
|
page_id: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Post a message to a Facebook Page.
|
|
|
|
Requires FACEBOOK_ACCESS_TOKEN with pages_manage_posts permission.
|
|
|
|
Args:
|
|
message: The message content to post
|
|
page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided)
|
|
|
|
Returns:
|
|
dict: Post ID of the created post
|
|
|
|
Example:
|
|
facebook_post_to_page("Hello from MCP!", "12345678")
|
|
"""
|
|
if not FACEBOOK_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
|
|
}
|
|
|
|
target_page_id = page_id or FACEBOOK_PAGE_ID
|
|
if not target_page_id:
|
|
return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{FACEBOOK_API_BASE}/{target_page_id}/feed",
|
|
data={
|
|
"message": message,
|
|
"access_token": FACEBOOK_ACCESS_TOKEN
|
|
}
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return {
|
|
"success": True,
|
|
"post_id": data.get("id"),
|
|
"page_id": target_page_id
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def facebook_reply_to_comment(
|
|
comment_id: str,
|
|
message: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Reply to a comment on a Facebook post.
|
|
|
|
Requires FACEBOOK_ACCESS_TOKEN with pages_manage_comments permission.
|
|
|
|
Args:
|
|
comment_id: The Facebook comment ID to reply to
|
|
message: The reply message
|
|
|
|
Returns:
|
|
dict: Reply comment ID
|
|
|
|
Example:
|
|
facebook_reply_to_comment("123456789_987654321", "Thanks for your comment!")
|
|
"""
|
|
if not FACEBOOK_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{FACEBOOK_API_BASE}/{comment_id}/comments",
|
|
data={
|
|
"message": message,
|
|
"access_token": FACEBOOK_ACCESS_TOKEN
|
|
}
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return {
|
|
"success": True,
|
|
"reply_id": data.get("id"),
|
|
"parent_comment_id": comment_id
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# INSTAGRAM TOOLS
|
|
# ============================================================================
|
|
|
|
@server.tool()
|
|
async def instagram_get_profile_info() -> Dict[str, Any]:
|
|
"""
|
|
Get Instagram Business profile information.
|
|
|
|
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
|
|
|
|
Returns:
|
|
dict: Profile information including username, bio, follower count, etc.
|
|
|
|
Example:
|
|
instagram_get_profile_info()
|
|
"""
|
|
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
|
|
return {
|
|
"error": "Instagram credentials not configured.",
|
|
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
|
|
}
|
|
|
|
params = {
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN,
|
|
"fields": "id,username,name,biography,followers_count,follows_count,media_count,profile_picture_url,website"
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return {
|
|
"account_id": data.get("id"),
|
|
"username": data.get("username"),
|
|
"name": data.get("name"),
|
|
"biography": data.get("biography"),
|
|
"followers_count": data.get("followers_count"),
|
|
"following_count": data.get("follows_count"),
|
|
"media_count": data.get("media_count"),
|
|
"profile_picture_url": data.get("profile_picture_url"),
|
|
"website": data.get("website")
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def instagram_get_media_posts(limit: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Get recent Instagram media posts from the Business account.
|
|
|
|
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
|
|
|
|
Args:
|
|
limit: Maximum number of posts to return (default: 10)
|
|
|
|
Returns:
|
|
dict: List of recent media posts with engagement metrics
|
|
|
|
Example:
|
|
instagram_get_media_posts(limit=5)
|
|
"""
|
|
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
|
|
return {
|
|
"error": "Instagram credentials not configured.",
|
|
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
|
|
}
|
|
|
|
params = {
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN,
|
|
"fields": "id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count,thumbnail_url",
|
|
"limit": min(limit, 50)
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
posts = []
|
|
for post in data.get("data", []):
|
|
posts.append({
|
|
"media_id": post.get("id"),
|
|
"caption": post.get("caption"),
|
|
"media_type": post.get("media_type"),
|
|
"media_url": post.get("media_url"),
|
|
"permalink": post.get("permalink"),
|
|
"timestamp": post.get("timestamp"),
|
|
"like_count": post.get("like_count"),
|
|
"comments_count": post.get("comments_count"),
|
|
"thumbnail_url": post.get("thumbnail_url")
|
|
})
|
|
|
|
return {"posts": posts}
|
|
|
|
|
|
@server.tool()
|
|
async def instagram_get_media_insights(media_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get insights/analytics for a specific Instagram media post.
|
|
|
|
Requires INSTAGRAM_ACCESS_TOKEN environment variable.
|
|
|
|
Args:
|
|
media_id: The Instagram media ID
|
|
|
|
Returns:
|
|
dict: Post insights including reach, impressions, engagement
|
|
|
|
Example:
|
|
instagram_get_media_insights("17890012345678901")
|
|
"""
|
|
if not INSTAGRAM_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable."
|
|
}
|
|
|
|
# Metrics available depend on media type (IMAGE, VIDEO, CAROUSEL_ALBUM)
|
|
params = {
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN,
|
|
"metric": "engagement,impressions,reach,saved"
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{INSTAGRAM_API_BASE}/{media_id}/insights",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
insights = {}
|
|
for metric in data.get("data", []):
|
|
insights[metric.get("name")] = metric.get("values", [{}])[0].get("value")
|
|
|
|
return {
|
|
"media_id": media_id,
|
|
"insights": insights
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def instagram_publish_media(
|
|
image_url: str,
|
|
caption: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Publish an image to Instagram Business account.
|
|
|
|
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
|
|
The image_url must be a publicly accessible URL.
|
|
|
|
Args:
|
|
image_url: Public URL of the image to publish
|
|
caption: Caption for the post
|
|
|
|
Returns:
|
|
dict: Created media ID
|
|
|
|
Example:
|
|
instagram_publish_media("https://example.com/image.jpg", "Check out this photo!")
|
|
"""
|
|
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
|
|
return {
|
|
"error": "Instagram credentials not configured.",
|
|
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
# Step 1: Create media container
|
|
container_response = await client.post(
|
|
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media",
|
|
data={
|
|
"image_url": image_url,
|
|
"caption": caption,
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN
|
|
}
|
|
)
|
|
|
|
if container_response.status_code == 400:
|
|
error_data = container_response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
container_response.raise_for_status()
|
|
container_id = container_response.json().get("id")
|
|
|
|
# Step 2: Publish the container
|
|
publish_response = await client.post(
|
|
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media_publish",
|
|
data={
|
|
"creation_id": container_id,
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN
|
|
}
|
|
)
|
|
|
|
if publish_response.status_code == 400:
|
|
error_data = publish_response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
publish_response.raise_for_status()
|
|
media_id = publish_response.json().get("id")
|
|
|
|
return {
|
|
"success": True,
|
|
"media_id": media_id,
|
|
"container_id": container_id
|
|
}
|
|
|
|
|
|
@server.tool()
|
|
async def instagram_get_comments(media_id: str, limit: int = 25) -> Dict[str, Any]:
|
|
"""
|
|
Get comments on an Instagram media post.
|
|
|
|
Requires INSTAGRAM_ACCESS_TOKEN environment variable.
|
|
|
|
Args:
|
|
media_id: The Instagram media ID
|
|
limit: Maximum number of comments to return (default: 25)
|
|
|
|
Returns:
|
|
dict: List of comments
|
|
|
|
Example:
|
|
instagram_get_comments("17890012345678901", limit=10)
|
|
"""
|
|
if not INSTAGRAM_ACCESS_TOKEN:
|
|
return {
|
|
"error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable."
|
|
}
|
|
|
|
params = {
|
|
"access_token": INSTAGRAM_ACCESS_TOKEN,
|
|
"fields": "id,text,timestamp,username,like_count",
|
|
"limit": min(limit, 50)
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{INSTAGRAM_API_BASE}/{media_id}/comments",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 400:
|
|
error_data = response.json().get("error", {})
|
|
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
comments = []
|
|
for comment in data.get("data", []):
|
|
comments.append({
|
|
"comment_id": comment.get("id"),
|
|
"text": comment.get("text"),
|
|
"timestamp": comment.get("timestamp"),
|
|
"username": comment.get("username"),
|
|
"like_count": comment.get("like_count")
|
|
})
|
|
|
|
return {
|
|
"media_id": media_id,
|
|
"comments": comments
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server.run()
|