glam/mcp_servers/social_media/server.py
kempersc 48a2b26f59 feat: Add script to generate Mermaid ER diagrams with instance data from LinkML schemas
- Implemented `generate_mermaid_with_instances.py` to create ER diagrams that include all classes, relationships, enum values, and instance data.
- Loaded instance data from YAML files and enriched enum definitions with meaningful annotations.
- Configured output paths for generated diagrams in both frontend and schema directories.
- Added support for excluding technical classes and limiting the number of displayed enum and instance values for readability.
2025-12-01 16:58:03 +01:00

1292 lines
45 KiB
Python

#!/usr/bin/env python3
"""
Social Media MCP Server - Unified Media Content Extraction
Provides tools to obtain media content from:
1. YouTube - Video info, transcripts, channel data, search
2. LinkedIn - Profile scraping, job search, company info, feed posts
3. Facebook - Page posts, comments, insights, messaging
4. Instagram - Business profile, media posts, insights, DMs
Architecture:
- Uses official APIs where available (YouTube Data API, Facebook Graph API, Instagram Graph API)
- Uses unofficial/scraping methods for LinkedIn (no official API for content reading)
- Supports authentication via environment variables
Environment Variables:
- YOUTUBE_API_KEY: YouTube Data API v3 key
- LINKEDIN_EMAIL: LinkedIn login email
- LINKEDIN_PASSWORD: LinkedIn login password
- LINKEDIN_COOKIE: LinkedIn session cookie (li_at) - alternative to email/password
- FACEBOOK_ACCESS_TOKEN: Facebook Page Access Token
- FACEBOOK_PAGE_ID: Facebook Page ID
- INSTAGRAM_ACCESS_TOKEN: Instagram Business Account Token
- INSTAGRAM_BUSINESS_ACCOUNT_ID: Instagram Business Account ID
Based on patterns from:
- https://github.com/anaisbetts/mcp-youtube
- https://github.com/ZubeidHendricks/youtube-mcp-server
- https://github.com/adhikasp/mcp-linkedin
- https://github.com/stickerdaniel/linkedin-mcp-server
- https://github.com/jlbadano/ig-mcp
- https://github.com/tiroshanm/facebook-mcp-server
"""
import httpx
import json
import os
import re
import subprocess
import tempfile
from typing import List, Dict, Optional, Any
from mcp.server.fastmcp import FastMCP
server = FastMCP("Social Media MCP Server")
# ============================================================================
# Configuration
# ============================================================================
# User-Agent for API requests
USER_AGENT = "SocialMediaMCP/1.0"
# YouTube configuration
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY", "")
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
# LinkedIn configuration (uses unofficial API - scraping based)
LINKEDIN_EMAIL = os.getenv("LINKEDIN_EMAIL", "")
LINKEDIN_PASSWORD = os.getenv("LINKEDIN_PASSWORD", "")
LINKEDIN_COOKIE = os.getenv("LINKEDIN_COOKIE", "")
# Facebook Graph API configuration
FACEBOOK_ACCESS_TOKEN = os.getenv("FACEBOOK_ACCESS_TOKEN", "")
FACEBOOK_PAGE_ID = os.getenv("FACEBOOK_PAGE_ID", "")
FACEBOOK_API_VERSION = os.getenv("FACEBOOK_API_VERSION", "v19.0")
FACEBOOK_API_BASE = f"https://graph.facebook.com/{FACEBOOK_API_VERSION}"
# Instagram Graph API configuration (uses Facebook Graph API)
INSTAGRAM_ACCESS_TOKEN = os.getenv("INSTAGRAM_ACCESS_TOKEN", "")
INSTAGRAM_BUSINESS_ACCOUNT_ID = os.getenv("INSTAGRAM_BUSINESS_ACCOUNT_ID", "")
INSTAGRAM_API_VERSION = os.getenv("INSTAGRAM_API_VERSION", "v19.0")
INSTAGRAM_API_BASE = f"https://graph.facebook.com/{INSTAGRAM_API_VERSION}"
# Print configuration status
print("=" * 60)
print("Social Media MCP Server - Configuration Status")
print("=" * 60)
print(f"YouTube API Key: {'✓ Configured' if YOUTUBE_API_KEY else '✗ Not configured'}")
print(f"LinkedIn Auth: {'✓ Cookie' if LINKEDIN_COOKIE else ('✓ Email/Password' if LINKEDIN_EMAIL else '✗ Not configured')}")
print(f"Facebook Token: {'✓ Configured' if FACEBOOK_ACCESS_TOKEN else '✗ Not configured'}")
print(f"Instagram Token: {'✓ Configured' if INSTAGRAM_ACCESS_TOKEN else '✗ Not configured'}")
print("=" * 60)
# ============================================================================
# Helper Functions
# ============================================================================
def extract_video_id(url_or_id: str) -> str:
"""Extract YouTube video ID from URL or return as-is if already an ID."""
patterns = [
r'(?:v=|/)([0-9A-Za-z_-]{11})(?:[&?]|$)',
r'(?:youtu\.be/)([0-9A-Za-z_-]{11})',
r'^([0-9A-Za-z_-]{11})$'
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
return url_or_id
def extract_channel_id(url_or_id: str) -> str:
"""Extract YouTube channel ID from URL or return as-is."""
patterns = [
r'(?:channel/)([UC][0-9A-Za-z_-]{22})',
r'^([UC][0-9A-Za-z_-]{22})$'
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
return url_or_id
def extract_linkedin_profile_id(url_or_id: str) -> str:
"""Extract LinkedIn profile ID from URL or return as-is."""
patterns = [
r'linkedin\.com/in/([^/?]+)',
r'^([a-zA-Z0-9-]+)$'
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
return url_or_id
def extract_linkedin_company_id(url_or_id: str) -> str:
"""Extract LinkedIn company ID from URL or return as-is."""
patterns = [
r'linkedin\.com/company/([^/?]+)',
r'^([a-zA-Z0-9-]+)$'
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
return url_or_id
# ============================================================================
# YOUTUBE TOOLS
# ============================================================================
@server.tool()
async def youtube_get_video_info(video_url_or_id: str) -> Dict[str, Any]:
"""
Get detailed information about a YouTube video.
Args:
video_url_or_id: YouTube video URL or video ID
Returns:
dict: Video metadata including title, description, duration, view count,
like count, channel info, publish date, and more.
Example:
youtube_get_video_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
youtube_get_video_info("dQw4w9WgXcQ")
"""
video_id = extract_video_id(video_url_or_id)
if YOUTUBE_API_KEY:
# Use YouTube Data API
params = {
"part": "snippet,contentDetails,statistics",
"id": video_id,
"key": YOUTUBE_API_KEY
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{YOUTUBE_API_BASE}/videos",
params=params,
headers={"User-Agent": USER_AGENT}
)
response.raise_for_status()
data = response.json()
if not data.get("items"):
return {"error": f"Video not found: {video_id}"}
item = data["items"][0]
return {
"video_id": video_id,
"title": item["snippet"]["title"],
"description": item["snippet"]["description"],
"channel_id": item["snippet"]["channelId"],
"channel_title": item["snippet"]["channelTitle"],
"published_at": item["snippet"]["publishedAt"],
"duration": item["contentDetails"]["duration"],
"view_count": item["statistics"].get("viewCount"),
"like_count": item["statistics"].get("likeCount"),
"comment_count": item["statistics"].get("commentCount"),
"tags": item["snippet"].get("tags", []),
"thumbnails": item["snippet"]["thumbnails"]
}
else:
# Fallback: Use yt-dlp for metadata extraction
try:
result = subprocess.run(
["yt-dlp", "--dump-json", "--skip-download", f"https://www.youtube.com/watch?v={video_id}"],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
"video_id": data.get("id"),
"title": data.get("title"),
"description": data.get("description"),
"channel_id": data.get("channel_id"),
"channel_title": data.get("uploader"),
"published_at": data.get("upload_date"),
"duration": data.get("duration"),
"view_count": data.get("view_count"),
"like_count": data.get("like_count"),
"comment_count": data.get("comment_count"),
"tags": data.get("tags", []),
}
return {"error": f"yt-dlp failed: {result.stderr}"}
except FileNotFoundError:
return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"}
except subprocess.TimeoutExpired:
return {"error": "Video info extraction timed out"}
@server.tool()
async def youtube_get_transcript(video_url_or_id: str, language: str = "en") -> Dict[str, Any]:
"""
Get the transcript/subtitles from a YouTube video.
Uses yt-dlp to extract subtitles in the specified language.
Falls back to auto-generated captions if manual ones aren't available.
Args:
video_url_or_id: YouTube video URL or video ID
language: Language code for subtitles (default: "en")
Returns:
dict: Contains transcript text and metadata
Example:
youtube_get_transcript("dQw4w9WgXcQ", "en")
"""
video_id = extract_video_id(video_url_or_id)
video_url = f"https://www.youtube.com/watch?v={video_id}"
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Try to get subtitles with yt-dlp
result = subprocess.run(
[
"yt-dlp",
"--write-subs",
"--write-auto-subs",
"--sub-langs", language,
"--sub-format", "vtt",
"--skip-download",
"--output", f"{tmpdir}/%(id)s",
video_url
],
capture_output=True,
text=True,
timeout=120
)
# Look for the subtitle file
import glob
vtt_files = glob.glob(f"{tmpdir}/*.vtt")
if vtt_files:
with open(vtt_files[0], 'r', encoding='utf-8') as f:
vtt_content = f.read()
# Parse VTT to extract text
lines = []
for line in vtt_content.split('\n'):
line = line.strip()
# Skip headers, timestamps, and empty lines
if line and not line.startswith('WEBVTT') and not line.startswith('Kind:') \
and not line.startswith('Language:') and '-->' not in line \
and not re.match(r'^\d+$', line):
# Remove HTML tags
clean_line = re.sub(r'<[^>]+>', '', line)
if clean_line:
lines.append(clean_line)
# Remove duplicate consecutive lines
deduped = []
for line in lines:
if not deduped or line != deduped[-1]:
deduped.append(line)
transcript = ' '.join(deduped)
return {
"video_id": video_id,
"language": language,
"transcript": transcript,
"source": "auto" if ".auto." in vtt_files[0] else "manual"
}
return {
"video_id": video_id,
"error": f"No subtitles available in language: {language}",
"available_info": result.stderr if result.stderr else "Check video for available languages"
}
except FileNotFoundError:
return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"}
except subprocess.TimeoutExpired:
return {"error": "Transcript extraction timed out"}
@server.tool()
async def youtube_search_videos(
query: str,
max_results: int = 10,
order: str = "relevance"
) -> Dict[str, Any]:
"""
Search for YouTube videos.
Args:
query: Search query string
max_results: Maximum number of results to return (default: 10, max: 50)
order: Sort order - 'relevance', 'date', 'viewCount', 'rating' (default: relevance)
Returns:
dict: List of video results with basic metadata
Example:
youtube_search_videos("python tutorials", max_results=5)
"""
if not YOUTUBE_API_KEY:
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": min(max_results, 50),
"order": order,
"key": YOUTUBE_API_KEY
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{YOUTUBE_API_BASE}/search",
params=params,
headers={"User-Agent": USER_AGENT}
)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("items", []):
results.append({
"video_id": item["id"]["videoId"],
"title": item["snippet"]["title"],
"description": item["snippet"]["description"],
"channel_id": item["snippet"]["channelId"],
"channel_title": item["snippet"]["channelTitle"],
"published_at": item["snippet"]["publishedAt"],
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"]
})
return {
"query": query,
"total_results": data.get("pageInfo", {}).get("totalResults"),
"results": results
}
@server.tool()
async def youtube_get_channel_info(channel_url_or_id: str) -> Dict[str, Any]:
"""
Get information about a YouTube channel.
Args:
channel_url_or_id: YouTube channel URL or channel ID
Returns:
dict: Channel metadata including name, description, subscriber count, etc.
Example:
youtube_get_channel_info("UCsXVk37bltHxD1rDPwtNM8Q")
"""
if not YOUTUBE_API_KEY:
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
channel_id = extract_channel_id(channel_url_or_id)
params = {
"part": "snippet,statistics,brandingSettings",
"id": channel_id,
"key": YOUTUBE_API_KEY
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{YOUTUBE_API_BASE}/channels",
params=params,
headers={"User-Agent": USER_AGENT}
)
response.raise_for_status()
data = response.json()
if not data.get("items"):
return {"error": f"Channel not found: {channel_id}"}
item = data["items"][0]
return {
"channel_id": channel_id,
"title": item["snippet"]["title"],
"description": item["snippet"]["description"],
"custom_url": item["snippet"].get("customUrl"),
"published_at": item["snippet"]["publishedAt"],
"country": item["snippet"].get("country"),
"subscriber_count": item["statistics"].get("subscriberCount"),
"video_count": item["statistics"].get("videoCount"),
"view_count": item["statistics"].get("viewCount"),
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"],
"banner": item.get("brandingSettings", {}).get("image", {}).get("bannerExternalUrl")
}
@server.tool()
async def youtube_get_channel_videos(
channel_url_or_id: str,
max_results: int = 20
) -> Dict[str, Any]:
"""
Get recent videos from a YouTube channel.
Args:
channel_url_or_id: YouTube channel URL or channel ID
max_results: Maximum number of videos to return (default: 20, max: 50)
Returns:
dict: List of recent videos from the channel
Example:
youtube_get_channel_videos("UCsXVk37bltHxD1rDPwtNM8Q", max_results=10)
"""
if not YOUTUBE_API_KEY:
return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."}
channel_id = extract_channel_id(channel_url_or_id)
params = {
"part": "snippet",
"channelId": channel_id,
"type": "video",
"order": "date",
"maxResults": min(max_results, 50),
"key": YOUTUBE_API_KEY
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{YOUTUBE_API_BASE}/search",
params=params,
headers={"User-Agent": USER_AGENT}
)
response.raise_for_status()
data = response.json()
videos = []
for item in data.get("items", []):
videos.append({
"video_id": item["id"]["videoId"],
"title": item["snippet"]["title"],
"description": item["snippet"]["description"],
"published_at": item["snippet"]["publishedAt"],
"thumbnail": item["snippet"]["thumbnails"]["high"]["url"]
})
return {
"channel_id": channel_id,
"videos": videos
}
# ============================================================================
# LINKEDIN TOOLS
# ============================================================================
# Note: LinkedIn doesn't have an official API for reading content.
# These tools use web scraping / unofficial methods.
# Users should be aware this may violate LinkedIn ToS.
@server.tool()
async def linkedin_get_profile(profile_url_or_id: str) -> Dict[str, Any]:
"""
Get information from a LinkedIn profile.
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
Requires LINKEDIN_COOKIE environment variable.
Args:
profile_url_or_id: LinkedIn profile URL or vanity name
Returns:
dict: Profile information including name, headline, experience, education
Example:
linkedin_get_profile("https://www.linkedin.com/in/satyanadella/")
linkedin_get_profile("satyanadella")
"""
if not LINKEDIN_COOKIE:
return {
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
}
profile_id = extract_linkedin_profile_id(profile_url_or_id)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
"Accept": "application/json",
}
# LinkedIn Voyager API endpoint (unofficial)
profile_url = f"https://www.linkedin.com/voyager/api/identity/profiles/{profile_id}"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
response = await client.get(profile_url, headers=headers)
if response.status_code == 401:
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
elif response.status_code == 404:
return {"error": f"Profile not found: {profile_id}"}
response.raise_for_status()
data = response.json()
return {
"profile_id": profile_id,
"first_name": data.get("firstName"),
"last_name": data.get("lastName"),
"headline": data.get("headline"),
"summary": data.get("summary"),
"location": data.get("locationName"),
"industry": data.get("industryName"),
"profile_url": f"https://www.linkedin.com/in/{profile_id}/",
}
except httpx.HTTPStatusError as e:
return {"error": f"LinkedIn API error: {e.response.status_code}"}
except Exception as e:
return {"error": f"Failed to fetch profile: {str(e)}"}
@server.tool()
async def linkedin_get_company(company_url_or_id: str) -> Dict[str, Any]:
"""
Get information about a LinkedIn company page.
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
Requires LINKEDIN_COOKIE environment variable.
Args:
company_url_or_id: LinkedIn company URL or company name/ID
Returns:
dict: Company information including name, description, industry, size
Example:
linkedin_get_company("https://www.linkedin.com/company/microsoft/")
linkedin_get_company("microsoft")
"""
if not LINKEDIN_COOKIE:
return {
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
}
company_id = extract_linkedin_company_id(company_url_or_id)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
"Accept": "application/json",
}
# LinkedIn Voyager API endpoint (unofficial)
company_url = f"https://www.linkedin.com/voyager/api/organization/companies?decorationId=com.linkedin.voyager.deco.organization.web.WebFullCompanyMain-12&q=universalName&universalName={company_id}"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
response = await client.get(company_url, headers=headers)
if response.status_code == 401:
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
response.raise_for_status()
data = response.json()
elements = data.get("elements", [])
if not elements:
return {"error": f"Company not found: {company_id}"}
company = elements[0]
return {
"company_id": company_id,
"name": company.get("name"),
"tagline": company.get("tagline"),
"description": company.get("description"),
"industry": company.get("companyIndustries", [{}])[0].get("localizedName") if company.get("companyIndustries") else None,
"company_size": company.get("staffCountRange", {}).get("start"),
"headquarters": company.get("headquarter", {}).get("city"),
"website": company.get("companyPageUrl"),
"founded_year": company.get("foundedOn", {}).get("year"),
"specialities": company.get("specialities", []),
"company_url": f"https://www.linkedin.com/company/{company_id}/",
}
except httpx.HTTPStatusError as e:
return {"error": f"LinkedIn API error: {e.response.status_code}"}
except Exception as e:
return {"error": f"Failed to fetch company: {str(e)}"}
@server.tool()
async def linkedin_search_jobs(
keywords: str,
location: Optional[str] = None,
limit: int = 10
) -> Dict[str, Any]:
"""
Search for jobs on LinkedIn.
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
Requires LINKEDIN_COOKIE environment variable.
Args:
keywords: Job search keywords
location: Location filter (optional)
limit: Maximum number of results (default: 10)
Returns:
dict: List of job postings matching the search
Example:
linkedin_search_jobs("software engineer", "San Francisco", limit=5)
"""
if not LINKEDIN_COOKIE:
return {
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
"Accept": "application/json",
}
# Build search URL
params = {
"keywords": keywords,
"count": min(limit, 25),
"start": 0,
}
if location:
params["location"] = location
search_url = f"https://www.linkedin.com/voyager/api/voyagerJobsDashJobCards"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
response = await client.get(search_url, params=params, headers=headers)
if response.status_code == 401:
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
response.raise_for_status()
data = response.json()
jobs = []
for element in data.get("elements", [])[:limit]:
job = element.get("jobCardUnion", {}).get("jobPostingCard", {})
jobs.append({
"title": job.get("title"),
"company": job.get("primaryDescription", {}).get("text"),
"location": job.get("secondaryDescription", {}).get("text"),
"posted_time": job.get("tertiaryDescription", {}).get("text"),
"job_url": f"https://www.linkedin.com/jobs/view/{job.get('jobPostingId')}" if job.get("jobPostingId") else None
})
return {
"keywords": keywords,
"location": location,
"jobs": jobs
}
except httpx.HTTPStatusError as e:
return {"error": f"LinkedIn API error: {e.response.status_code}"}
except Exception as e:
return {"error": f"Failed to search jobs: {str(e)}"}
@server.tool()
async def linkedin_get_feed_posts(limit: int = 10) -> Dict[str, Any]:
"""
Get recent posts from your LinkedIn feed.
WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service.
Requires LINKEDIN_COOKIE environment variable.
Args:
limit: Maximum number of posts to return (default: 10)
Returns:
dict: List of recent feed posts
Example:
linkedin_get_feed_posts(limit=5)
"""
if not LINKEDIN_COOKIE:
return {
"error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.",
"help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools."
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}",
"Accept": "application/json",
}
feed_url = f"https://www.linkedin.com/voyager/api/feed/updates?count={min(limit, 25)}"
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
response = await client.get(feed_url, headers=headers)
if response.status_code == 401:
return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."}
response.raise_for_status()
data = response.json()
posts = []
for element in data.get("elements", [])[:limit]:
update = element.get("value", {}).get("com.linkedin.voyager.feed.render.UpdateV2", {})
actor = update.get("actor", {})
commentary = update.get("commentary", {}).get("text", {}).get("text", "")
posts.append({
"author_name": actor.get("name", {}).get("text"),
"author_headline": actor.get("description", {}).get("text"),
"content": commentary[:500] if commentary else None,
"num_likes": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numLikes"),
"num_comments": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numComments"),
})
return {"posts": posts}
except httpx.HTTPStatusError as e:
return {"error": f"LinkedIn API error: {e.response.status_code}"}
except Exception as e:
return {"error": f"Failed to get feed: {str(e)}"}
# ============================================================================
# FACEBOOK TOOLS
# ============================================================================
@server.tool()
async def facebook_get_page_posts(
page_id: Optional[str] = None,
limit: int = 10
) -> Dict[str, Any]:
"""
Get recent posts from a Facebook Page.
Requires FACEBOOK_ACCESS_TOKEN and optionally FACEBOOK_PAGE_ID environment variables.
Args:
page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided)
limit: Maximum number of posts to return (default: 10)
Returns:
dict: List of recent posts with engagement metrics
Example:
facebook_get_page_posts("12345678", limit=5)
"""
if not FACEBOOK_ACCESS_TOKEN:
return {
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable.",
"help": "Get a Page Access Token from Facebook Developer Console."
}
target_page_id = page_id or FACEBOOK_PAGE_ID
if not target_page_id:
return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."}
params = {
"access_token": FACEBOOK_ACCESS_TOKEN,
"fields": "id,message,created_time,shares,attachments,likes.summary(true),comments.summary(true)",
"limit": min(limit, 100)
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{FACEBOOK_API_BASE}/{target_page_id}/posts",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
posts = []
for post in data.get("data", []):
posts.append({
"post_id": post.get("id"),
"message": post.get("message"),
"created_time": post.get("created_time"),
"likes_count": post.get("likes", {}).get("summary", {}).get("total_count", 0),
"comments_count": post.get("comments", {}).get("summary", {}).get("total_count", 0),
"shares_count": post.get("shares", {}).get("count", 0),
"attachments": post.get("attachments", {}).get("data", [])
})
return {
"page_id": target_page_id,
"posts": posts
}
@server.tool()
async def facebook_get_post_comments(
post_id: str,
limit: int = 25
) -> Dict[str, Any]:
"""
Get comments on a Facebook post.
Requires FACEBOOK_ACCESS_TOKEN environment variable.
Args:
post_id: The Facebook post ID
limit: Maximum number of comments to return (default: 25)
Returns:
dict: List of comments with user info and engagement
Example:
facebook_get_post_comments("123456789_987654321", limit=10)
"""
if not FACEBOOK_ACCESS_TOKEN:
return {
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
}
params = {
"access_token": FACEBOOK_ACCESS_TOKEN,
"fields": "id,message,created_time,from,like_count,comment_count",
"limit": min(limit, 100)
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{FACEBOOK_API_BASE}/{post_id}/comments",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
comments = []
for comment in data.get("data", []):
comments.append({
"comment_id": comment.get("id"),
"message": comment.get("message"),
"created_time": comment.get("created_time"),
"from_name": comment.get("from", {}).get("name"),
"from_id": comment.get("from", {}).get("id"),
"like_count": comment.get("like_count", 0),
"reply_count": comment.get("comment_count", 0)
})
return {
"post_id": post_id,
"comments": comments
}
@server.tool()
async def facebook_post_to_page(
message: str,
page_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Post a message to a Facebook Page.
Requires FACEBOOK_ACCESS_TOKEN with pages_manage_posts permission.
Args:
message: The message content to post
page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided)
Returns:
dict: Post ID of the created post
Example:
facebook_post_to_page("Hello from MCP!", "12345678")
"""
if not FACEBOOK_ACCESS_TOKEN:
return {
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
}
target_page_id = page_id or FACEBOOK_PAGE_ID
if not target_page_id:
return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{FACEBOOK_API_BASE}/{target_page_id}/feed",
data={
"message": message,
"access_token": FACEBOOK_ACCESS_TOKEN
}
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
return {
"success": True,
"post_id": data.get("id"),
"page_id": target_page_id
}
@server.tool()
async def facebook_reply_to_comment(
comment_id: str,
message: str
) -> Dict[str, Any]:
"""
Reply to a comment on a Facebook post.
Requires FACEBOOK_ACCESS_TOKEN with pages_manage_comments permission.
Args:
comment_id: The Facebook comment ID to reply to
message: The reply message
Returns:
dict: Reply comment ID
Example:
facebook_reply_to_comment("123456789_987654321", "Thanks for your comment!")
"""
if not FACEBOOK_ACCESS_TOKEN:
return {
"error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable."
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{FACEBOOK_API_BASE}/{comment_id}/comments",
data={
"message": message,
"access_token": FACEBOOK_ACCESS_TOKEN
}
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
return {
"success": True,
"reply_id": data.get("id"),
"parent_comment_id": comment_id
}
# ============================================================================
# INSTAGRAM TOOLS
# ============================================================================
@server.tool()
async def instagram_get_profile_info() -> Dict[str, Any]:
"""
Get Instagram Business profile information.
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
Returns:
dict: Profile information including username, bio, follower count, etc.
Example:
instagram_get_profile_info()
"""
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
return {
"error": "Instagram credentials not configured.",
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
}
params = {
"access_token": INSTAGRAM_ACCESS_TOKEN,
"fields": "id,username,name,biography,followers_count,follows_count,media_count,profile_picture_url,website"
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
return {
"account_id": data.get("id"),
"username": data.get("username"),
"name": data.get("name"),
"biography": data.get("biography"),
"followers_count": data.get("followers_count"),
"following_count": data.get("follows_count"),
"media_count": data.get("media_count"),
"profile_picture_url": data.get("profile_picture_url"),
"website": data.get("website")
}
@server.tool()
async def instagram_get_media_posts(limit: int = 10) -> Dict[str, Any]:
"""
Get recent Instagram media posts from the Business account.
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
Args:
limit: Maximum number of posts to return (default: 10)
Returns:
dict: List of recent media posts with engagement metrics
Example:
instagram_get_media_posts(limit=5)
"""
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
return {
"error": "Instagram credentials not configured.",
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
}
params = {
"access_token": INSTAGRAM_ACCESS_TOKEN,
"fields": "id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count,thumbnail_url",
"limit": min(limit, 50)
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
posts = []
for post in data.get("data", []):
posts.append({
"media_id": post.get("id"),
"caption": post.get("caption"),
"media_type": post.get("media_type"),
"media_url": post.get("media_url"),
"permalink": post.get("permalink"),
"timestamp": post.get("timestamp"),
"like_count": post.get("like_count"),
"comments_count": post.get("comments_count"),
"thumbnail_url": post.get("thumbnail_url")
})
return {"posts": posts}
@server.tool()
async def instagram_get_media_insights(media_id: str) -> Dict[str, Any]:
"""
Get insights/analytics for a specific Instagram media post.
Requires INSTAGRAM_ACCESS_TOKEN environment variable.
Args:
media_id: The Instagram media ID
Returns:
dict: Post insights including reach, impressions, engagement
Example:
instagram_get_media_insights("17890012345678901")
"""
if not INSTAGRAM_ACCESS_TOKEN:
return {
"error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable."
}
# Metrics available depend on media type (IMAGE, VIDEO, CAROUSEL_ALBUM)
params = {
"access_token": INSTAGRAM_ACCESS_TOKEN,
"metric": "engagement,impressions,reach,saved"
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{INSTAGRAM_API_BASE}/{media_id}/insights",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
insights = {}
for metric in data.get("data", []):
insights[metric.get("name")] = metric.get("values", [{}])[0].get("value")
return {
"media_id": media_id,
"insights": insights
}
@server.tool()
async def instagram_publish_media(
image_url: str,
caption: str
) -> Dict[str, Any]:
"""
Publish an image to Instagram Business account.
Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables.
The image_url must be a publicly accessible URL.
Args:
image_url: Public URL of the image to publish
caption: Caption for the post
Returns:
dict: Created media ID
Example:
instagram_publish_media("https://example.com/image.jpg", "Check out this photo!")
"""
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID:
return {
"error": "Instagram credentials not configured.",
"help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables."
}
async with httpx.AsyncClient() as client:
# Step 1: Create media container
container_response = await client.post(
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media",
data={
"image_url": image_url,
"caption": caption,
"access_token": INSTAGRAM_ACCESS_TOKEN
}
)
if container_response.status_code == 400:
error_data = container_response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
container_response.raise_for_status()
container_id = container_response.json().get("id")
# Step 2: Publish the container
publish_response = await client.post(
f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media_publish",
data={
"creation_id": container_id,
"access_token": INSTAGRAM_ACCESS_TOKEN
}
)
if publish_response.status_code == 400:
error_data = publish_response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
publish_response.raise_for_status()
media_id = publish_response.json().get("id")
return {
"success": True,
"media_id": media_id,
"container_id": container_id
}
@server.tool()
async def instagram_get_comments(media_id: str, limit: int = 25) -> Dict[str, Any]:
"""
Get comments on an Instagram media post.
Requires INSTAGRAM_ACCESS_TOKEN environment variable.
Args:
media_id: The Instagram media ID
limit: Maximum number of comments to return (default: 25)
Returns:
dict: List of comments
Example:
instagram_get_comments("17890012345678901", limit=10)
"""
if not INSTAGRAM_ACCESS_TOKEN:
return {
"error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable."
}
params = {
"access_token": INSTAGRAM_ACCESS_TOKEN,
"fields": "id,text,timestamp,username,like_count",
"limit": min(limit, 50)
}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{INSTAGRAM_API_BASE}/{media_id}/comments",
params=params
)
if response.status_code == 400:
error_data = response.json().get("error", {})
return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"}
response.raise_for_status()
data = response.json()
comments = []
for comment in data.get("data", []):
comments.append({
"comment_id": comment.get("id"),
"text": comment.get("text"),
"timestamp": comment.get("timestamp"),
"username": comment.get("username"),
"like_count": comment.get("like_count")
})
return {
"media_id": media_id,
"comments": comments
}
if __name__ == "__main__":
server.run()