#!/usr/bin/env python3 """ Social Media MCP Server - Unified Media Content Extraction Provides tools to obtain media content from: 1. YouTube - Video info, transcripts, channel data, search 2. LinkedIn - Profile scraping, job search, company info, feed posts 3. Facebook - Page posts, comments, insights, messaging 4. Instagram - Business profile, media posts, insights, DMs Architecture: - Uses official APIs where available (YouTube Data API, Facebook Graph API, Instagram Graph API) - Uses unofficial/scraping methods for LinkedIn (no official API for content reading) - Supports authentication via environment variables Environment Variables: - YOUTUBE_API_KEY: YouTube Data API v3 key - LINKEDIN_EMAIL: LinkedIn login email - LINKEDIN_PASSWORD: LinkedIn login password - LINKEDIN_COOKIE: LinkedIn session cookie (li_at) - alternative to email/password - FACEBOOK_ACCESS_TOKEN: Facebook Page Access Token - FACEBOOK_PAGE_ID: Facebook Page ID - INSTAGRAM_ACCESS_TOKEN: Instagram Business Account Token - INSTAGRAM_BUSINESS_ACCOUNT_ID: Instagram Business Account ID Based on patterns from: - https://github.com/anaisbetts/mcp-youtube - https://github.com/ZubeidHendricks/youtube-mcp-server - https://github.com/adhikasp/mcp-linkedin - https://github.com/stickerdaniel/linkedin-mcp-server - https://github.com/jlbadano/ig-mcp - https://github.com/tiroshanm/facebook-mcp-server """ import httpx import json import os import re import subprocess import tempfile from typing import List, Dict, Optional, Any from mcp.server.fastmcp import FastMCP server = FastMCP("Social Media MCP Server") # ============================================================================ # Configuration # ============================================================================ # User-Agent for API requests USER_AGENT = "SocialMediaMCP/1.0" # YouTube configuration YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY", "") YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3" # LinkedIn configuration (uses unofficial API - scraping based) LINKEDIN_EMAIL = os.getenv("LINKEDIN_EMAIL", "") LINKEDIN_PASSWORD = os.getenv("LINKEDIN_PASSWORD", "") LINKEDIN_COOKIE = os.getenv("LINKEDIN_COOKIE", "") # Facebook Graph API configuration FACEBOOK_ACCESS_TOKEN = os.getenv("FACEBOOK_ACCESS_TOKEN", "") FACEBOOK_PAGE_ID = os.getenv("FACEBOOK_PAGE_ID", "") FACEBOOK_API_VERSION = os.getenv("FACEBOOK_API_VERSION", "v19.0") FACEBOOK_API_BASE = f"https://graph.facebook.com/{FACEBOOK_API_VERSION}" # Instagram Graph API configuration (uses Facebook Graph API) INSTAGRAM_ACCESS_TOKEN = os.getenv("INSTAGRAM_ACCESS_TOKEN", "") INSTAGRAM_BUSINESS_ACCOUNT_ID = os.getenv("INSTAGRAM_BUSINESS_ACCOUNT_ID", "") INSTAGRAM_API_VERSION = os.getenv("INSTAGRAM_API_VERSION", "v19.0") INSTAGRAM_API_BASE = f"https://graph.facebook.com/{INSTAGRAM_API_VERSION}" # Print configuration status print("=" * 60) print("Social Media MCP Server - Configuration Status") print("=" * 60) print(f"YouTube API Key: {'✓ Configured' if YOUTUBE_API_KEY else '✗ Not configured'}") print(f"LinkedIn Auth: {'✓ Cookie' if LINKEDIN_COOKIE else ('✓ Email/Password' if LINKEDIN_EMAIL else '✗ Not configured')}") print(f"Facebook Token: {'✓ Configured' if FACEBOOK_ACCESS_TOKEN else '✗ Not configured'}") print(f"Instagram Token: {'✓ Configured' if INSTAGRAM_ACCESS_TOKEN else '✗ Not configured'}") print("=" * 60) # ============================================================================ # Helper Functions # ============================================================================ def extract_video_id(url_or_id: str) -> str: """Extract YouTube video ID from URL or return as-is if already an ID.""" patterns = [ r'(?:v=|/)([0-9A-Za-z_-]{11})(?:[&?]|$)', r'(?:youtu\.be/)([0-9A-Za-z_-]{11})', r'^([0-9A-Za-z_-]{11})$' ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) return url_or_id def extract_channel_id(url_or_id: str) -> str: """Extract YouTube channel ID from URL or return as-is.""" patterns = [ r'(?:channel/)([UC][0-9A-Za-z_-]{22})', r'^([UC][0-9A-Za-z_-]{22})$' ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) return url_or_id def extract_linkedin_profile_id(url_or_id: str) -> str: """Extract LinkedIn profile ID from URL or return as-is.""" patterns = [ r'linkedin\.com/in/([^/?]+)', r'^([a-zA-Z0-9-]+)$' ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) return url_or_id def extract_linkedin_company_id(url_or_id: str) -> str: """Extract LinkedIn company ID from URL or return as-is.""" patterns = [ r'linkedin\.com/company/([^/?]+)', r'^([a-zA-Z0-9-]+)$' ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) return url_or_id # ============================================================================ # YOUTUBE TOOLS # ============================================================================ @server.tool() async def youtube_get_video_info(video_url_or_id: str) -> Dict[str, Any]: """ Get detailed information about a YouTube video. Args: video_url_or_id: YouTube video URL or video ID Returns: dict: Video metadata including title, description, duration, view count, like count, channel info, publish date, and more. Example: youtube_get_video_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ") youtube_get_video_info("dQw4w9WgXcQ") """ video_id = extract_video_id(video_url_or_id) if YOUTUBE_API_KEY: # Use YouTube Data API params = { "part": "snippet,contentDetails,statistics", "id": video_id, "key": YOUTUBE_API_KEY } async with httpx.AsyncClient() as client: response = await client.get( f"{YOUTUBE_API_BASE}/videos", params=params, headers={"User-Agent": USER_AGENT} ) response.raise_for_status() data = response.json() if not data.get("items"): return {"error": f"Video not found: {video_id}"} item = data["items"][0] return { "video_id": video_id, "title": item["snippet"]["title"], "description": item["snippet"]["description"], "channel_id": item["snippet"]["channelId"], "channel_title": item["snippet"]["channelTitle"], "published_at": item["snippet"]["publishedAt"], "duration": item["contentDetails"]["duration"], "view_count": item["statistics"].get("viewCount"), "like_count": item["statistics"].get("likeCount"), "comment_count": item["statistics"].get("commentCount"), "tags": item["snippet"].get("tags", []), "thumbnails": item["snippet"]["thumbnails"] } else: # Fallback: Use yt-dlp for metadata extraction try: result = subprocess.run( ["yt-dlp", "--dump-json", "--skip-download", f"https://www.youtube.com/watch?v={video_id}"], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: data = json.loads(result.stdout) return { "video_id": data.get("id"), "title": data.get("title"), "description": data.get("description"), "channel_id": data.get("channel_id"), "channel_title": data.get("uploader"), "published_at": data.get("upload_date"), "duration": data.get("duration"), "view_count": data.get("view_count"), "like_count": data.get("like_count"), "comment_count": data.get("comment_count"), "tags": data.get("tags", []), } return {"error": f"yt-dlp failed: {result.stderr}"} except FileNotFoundError: return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"} except subprocess.TimeoutExpired: return {"error": "Video info extraction timed out"} @server.tool() async def youtube_get_transcript(video_url_or_id: str, language: str = "en") -> Dict[str, Any]: """ Get the transcript/subtitles from a YouTube video. Uses yt-dlp to extract subtitles in the specified language. Falls back to auto-generated captions if manual ones aren't available. Args: video_url_or_id: YouTube video URL or video ID language: Language code for subtitles (default: "en") Returns: dict: Contains transcript text and metadata Example: youtube_get_transcript("dQw4w9WgXcQ", "en") """ video_id = extract_video_id(video_url_or_id) video_url = f"https://www.youtube.com/watch?v={video_id}" try: with tempfile.TemporaryDirectory() as tmpdir: # Try to get subtitles with yt-dlp result = subprocess.run( [ "yt-dlp", "--write-subs", "--write-auto-subs", "--sub-langs", language, "--sub-format", "vtt", "--skip-download", "--output", f"{tmpdir}/%(id)s", video_url ], capture_output=True, text=True, timeout=120 ) # Look for the subtitle file import glob vtt_files = glob.glob(f"{tmpdir}/*.vtt") if vtt_files: with open(vtt_files[0], 'r', encoding='utf-8') as f: vtt_content = f.read() # Parse VTT to extract text lines = [] for line in vtt_content.split('\n'): line = line.strip() # Skip headers, timestamps, and empty lines if line and not line.startswith('WEBVTT') and not line.startswith('Kind:') \ and not line.startswith('Language:') and '-->' not in line \ and not re.match(r'^\d+$', line): # Remove HTML tags clean_line = re.sub(r'<[^>]+>', '', line) if clean_line: lines.append(clean_line) # Remove duplicate consecutive lines deduped = [] for line in lines: if not deduped or line != deduped[-1]: deduped.append(line) transcript = ' '.join(deduped) return { "video_id": video_id, "language": language, "transcript": transcript, "source": "auto" if ".auto." in vtt_files[0] else "manual" } return { "video_id": video_id, "error": f"No subtitles available in language: {language}", "available_info": result.stderr if result.stderr else "Check video for available languages" } except FileNotFoundError: return {"error": "yt-dlp not installed. Install with: brew install yt-dlp or pip install yt-dlp"} except subprocess.TimeoutExpired: return {"error": "Transcript extraction timed out"} @server.tool() async def youtube_search_videos( query: str, max_results: int = 10, order: str = "relevance" ) -> Dict[str, Any]: """ Search for YouTube videos. Args: query: Search query string max_results: Maximum number of results to return (default: 10, max: 50) order: Sort order - 'relevance', 'date', 'viewCount', 'rating' (default: relevance) Returns: dict: List of video results with basic metadata Example: youtube_search_videos("python tutorials", max_results=5) """ if not YOUTUBE_API_KEY: return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."} params = { "part": "snippet", "q": query, "type": "video", "maxResults": min(max_results, 50), "order": order, "key": YOUTUBE_API_KEY } async with httpx.AsyncClient() as client: response = await client.get( f"{YOUTUBE_API_BASE}/search", params=params, headers={"User-Agent": USER_AGENT} ) response.raise_for_status() data = response.json() results = [] for item in data.get("items", []): results.append({ "video_id": item["id"]["videoId"], "title": item["snippet"]["title"], "description": item["snippet"]["description"], "channel_id": item["snippet"]["channelId"], "channel_title": item["snippet"]["channelTitle"], "published_at": item["snippet"]["publishedAt"], "thumbnail": item["snippet"]["thumbnails"]["high"]["url"] }) return { "query": query, "total_results": data.get("pageInfo", {}).get("totalResults"), "results": results } @server.tool() async def youtube_get_channel_info(channel_url_or_id: str) -> Dict[str, Any]: """ Get information about a YouTube channel. Args: channel_url_or_id: YouTube channel URL or channel ID Returns: dict: Channel metadata including name, description, subscriber count, etc. Example: youtube_get_channel_info("UCsXVk37bltHxD1rDPwtNM8Q") """ if not YOUTUBE_API_KEY: return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."} channel_id = extract_channel_id(channel_url_or_id) params = { "part": "snippet,statistics,brandingSettings", "id": channel_id, "key": YOUTUBE_API_KEY } async with httpx.AsyncClient() as client: response = await client.get( f"{YOUTUBE_API_BASE}/channels", params=params, headers={"User-Agent": USER_AGENT} ) response.raise_for_status() data = response.json() if not data.get("items"): return {"error": f"Channel not found: {channel_id}"} item = data["items"][0] return { "channel_id": channel_id, "title": item["snippet"]["title"], "description": item["snippet"]["description"], "custom_url": item["snippet"].get("customUrl"), "published_at": item["snippet"]["publishedAt"], "country": item["snippet"].get("country"), "subscriber_count": item["statistics"].get("subscriberCount"), "video_count": item["statistics"].get("videoCount"), "view_count": item["statistics"].get("viewCount"), "thumbnail": item["snippet"]["thumbnails"]["high"]["url"], "banner": item.get("brandingSettings", {}).get("image", {}).get("bannerExternalUrl") } @server.tool() async def youtube_get_channel_videos( channel_url_or_id: str, max_results: int = 20 ) -> Dict[str, Any]: """ Get recent videos from a YouTube channel. Args: channel_url_or_id: YouTube channel URL or channel ID max_results: Maximum number of videos to return (default: 20, max: 50) Returns: dict: List of recent videos from the channel Example: youtube_get_channel_videos("UCsXVk37bltHxD1rDPwtNM8Q", max_results=10) """ if not YOUTUBE_API_KEY: return {"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."} channel_id = extract_channel_id(channel_url_or_id) params = { "part": "snippet", "channelId": channel_id, "type": "video", "order": "date", "maxResults": min(max_results, 50), "key": YOUTUBE_API_KEY } async with httpx.AsyncClient() as client: response = await client.get( f"{YOUTUBE_API_BASE}/search", params=params, headers={"User-Agent": USER_AGENT} ) response.raise_for_status() data = response.json() videos = [] for item in data.get("items", []): videos.append({ "video_id": item["id"]["videoId"], "title": item["snippet"]["title"], "description": item["snippet"]["description"], "published_at": item["snippet"]["publishedAt"], "thumbnail": item["snippet"]["thumbnails"]["high"]["url"] }) return { "channel_id": channel_id, "videos": videos } # ============================================================================ # LINKEDIN TOOLS # ============================================================================ # Note: LinkedIn doesn't have an official API for reading content. # These tools use web scraping / unofficial methods. # Users should be aware this may violate LinkedIn ToS. @server.tool() async def linkedin_get_profile(profile_url_or_id: str) -> Dict[str, Any]: """ Get information from a LinkedIn profile. WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service. Requires LINKEDIN_COOKIE environment variable. Args: profile_url_or_id: LinkedIn profile URL or vanity name Returns: dict: Profile information including name, headline, experience, education Example: linkedin_get_profile("https://www.linkedin.com/in/satyanadella/") linkedin_get_profile("satyanadella") """ if not LINKEDIN_COOKIE: return { "error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.", "help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools." } profile_id = extract_linkedin_profile_id(profile_url_or_id) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}", "Accept": "application/json", } # LinkedIn Voyager API endpoint (unofficial) profile_url = f"https://www.linkedin.com/voyager/api/identity/profiles/{profile_id}" try: async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client: response = await client.get(profile_url, headers=headers) if response.status_code == 401: return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."} elif response.status_code == 404: return {"error": f"Profile not found: {profile_id}"} response.raise_for_status() data = response.json() return { "profile_id": profile_id, "first_name": data.get("firstName"), "last_name": data.get("lastName"), "headline": data.get("headline"), "summary": data.get("summary"), "location": data.get("locationName"), "industry": data.get("industryName"), "profile_url": f"https://www.linkedin.com/in/{profile_id}/", } except httpx.HTTPStatusError as e: return {"error": f"LinkedIn API error: {e.response.status_code}"} except Exception as e: return {"error": f"Failed to fetch profile: {str(e)}"} @server.tool() async def linkedin_get_company(company_url_or_id: str) -> Dict[str, Any]: """ Get information about a LinkedIn company page. WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service. Requires LINKEDIN_COOKIE environment variable. Args: company_url_or_id: LinkedIn company URL or company name/ID Returns: dict: Company information including name, description, industry, size Example: linkedin_get_company("https://www.linkedin.com/company/microsoft/") linkedin_get_company("microsoft") """ if not LINKEDIN_COOKIE: return { "error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.", "help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools." } company_id = extract_linkedin_company_id(company_url_or_id) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}", "Accept": "application/json", } # LinkedIn Voyager API endpoint (unofficial) company_url = f"https://www.linkedin.com/voyager/api/organization/companies?decorationId=com.linkedin.voyager.deco.organization.web.WebFullCompanyMain-12&q=universalName&universalName={company_id}" try: async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client: response = await client.get(company_url, headers=headers) if response.status_code == 401: return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."} response.raise_for_status() data = response.json() elements = data.get("elements", []) if not elements: return {"error": f"Company not found: {company_id}"} company = elements[0] return { "company_id": company_id, "name": company.get("name"), "tagline": company.get("tagline"), "description": company.get("description"), "industry": company.get("companyIndustries", [{}])[0].get("localizedName") if company.get("companyIndustries") else None, "company_size": company.get("staffCountRange", {}).get("start"), "headquarters": company.get("headquarter", {}).get("city"), "website": company.get("companyPageUrl"), "founded_year": company.get("foundedOn", {}).get("year"), "specialities": company.get("specialities", []), "company_url": f"https://www.linkedin.com/company/{company_id}/", } except httpx.HTTPStatusError as e: return {"error": f"LinkedIn API error: {e.response.status_code}"} except Exception as e: return {"error": f"Failed to fetch company: {str(e)}"} @server.tool() async def linkedin_search_jobs( keywords: str, location: Optional[str] = None, limit: int = 10 ) -> Dict[str, Any]: """ Search for jobs on LinkedIn. WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service. Requires LINKEDIN_COOKIE environment variable. Args: keywords: Job search keywords location: Location filter (optional) limit: Maximum number of results (default: 10) Returns: dict: List of job postings matching the search Example: linkedin_search_jobs("software engineer", "San Francisco", limit=5) """ if not LINKEDIN_COOKIE: return { "error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.", "help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools." } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}", "Accept": "application/json", } # Build search URL params = { "keywords": keywords, "count": min(limit, 25), "start": 0, } if location: params["location"] = location search_url = f"https://www.linkedin.com/voyager/api/voyagerJobsDashJobCards" try: async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client: response = await client.get(search_url, params=params, headers=headers) if response.status_code == 401: return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."} response.raise_for_status() data = response.json() jobs = [] for element in data.get("elements", [])[:limit]: job = element.get("jobCardUnion", {}).get("jobPostingCard", {}) jobs.append({ "title": job.get("title"), "company": job.get("primaryDescription", {}).get("text"), "location": job.get("secondaryDescription", {}).get("text"), "posted_time": job.get("tertiaryDescription", {}).get("text"), "job_url": f"https://www.linkedin.com/jobs/view/{job.get('jobPostingId')}" if job.get("jobPostingId") else None }) return { "keywords": keywords, "location": location, "jobs": jobs } except httpx.HTTPStatusError as e: return {"error": f"LinkedIn API error: {e.response.status_code}"} except Exception as e: return {"error": f"Failed to search jobs: {str(e)}"} @server.tool() async def linkedin_get_feed_posts(limit: int = 10) -> Dict[str, Any]: """ Get recent posts from your LinkedIn feed. WARNING: Uses unofficial methods. May violate LinkedIn Terms of Service. Requires LINKEDIN_COOKIE environment variable. Args: limit: Maximum number of posts to return (default: 10) Returns: dict: List of recent feed posts Example: linkedin_get_feed_posts(limit=5) """ if not LINKEDIN_COOKIE: return { "error": "LinkedIn cookie not configured. Set LINKEDIN_COOKIE environment variable.", "help": "Get your li_at cookie from LinkedIn after logging in via browser DevTools." } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Cookie": f"li_at={LINKEDIN_COOKIE.replace('li_at=', '')}", "Accept": "application/json", } feed_url = f"https://www.linkedin.com/voyager/api/feed/updates?count={min(limit, 25)}" try: async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client: response = await client.get(feed_url, headers=headers) if response.status_code == 401: return {"error": "LinkedIn session expired. Please update LINKEDIN_COOKIE."} response.raise_for_status() data = response.json() posts = [] for element in data.get("elements", [])[:limit]: update = element.get("value", {}).get("com.linkedin.voyager.feed.render.UpdateV2", {}) actor = update.get("actor", {}) commentary = update.get("commentary", {}).get("text", {}).get("text", "") posts.append({ "author_name": actor.get("name", {}).get("text"), "author_headline": actor.get("description", {}).get("text"), "content": commentary[:500] if commentary else None, "num_likes": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numLikes"), "num_comments": update.get("socialDetail", {}).get("totalSocialActivityCounts", {}).get("numComments"), }) return {"posts": posts} except httpx.HTTPStatusError as e: return {"error": f"LinkedIn API error: {e.response.status_code}"} except Exception as e: return {"error": f"Failed to get feed: {str(e)}"} # ============================================================================ # FACEBOOK TOOLS # ============================================================================ @server.tool() async def facebook_get_page_posts( page_id: Optional[str] = None, limit: int = 10 ) -> Dict[str, Any]: """ Get recent posts from a Facebook Page. Requires FACEBOOK_ACCESS_TOKEN and optionally FACEBOOK_PAGE_ID environment variables. Args: page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided) limit: Maximum number of posts to return (default: 10) Returns: dict: List of recent posts with engagement metrics Example: facebook_get_page_posts("12345678", limit=5) """ if not FACEBOOK_ACCESS_TOKEN: return { "error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable.", "help": "Get a Page Access Token from Facebook Developer Console." } target_page_id = page_id or FACEBOOK_PAGE_ID if not target_page_id: return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."} params = { "access_token": FACEBOOK_ACCESS_TOKEN, "fields": "id,message,created_time,shares,attachments,likes.summary(true),comments.summary(true)", "limit": min(limit, 100) } async with httpx.AsyncClient() as client: response = await client.get( f"{FACEBOOK_API_BASE}/{target_page_id}/posts", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() posts = [] for post in data.get("data", []): posts.append({ "post_id": post.get("id"), "message": post.get("message"), "created_time": post.get("created_time"), "likes_count": post.get("likes", {}).get("summary", {}).get("total_count", 0), "comments_count": post.get("comments", {}).get("summary", {}).get("total_count", 0), "shares_count": post.get("shares", {}).get("count", 0), "attachments": post.get("attachments", {}).get("data", []) }) return { "page_id": target_page_id, "posts": posts } @server.tool() async def facebook_get_post_comments( post_id: str, limit: int = 25 ) -> Dict[str, Any]: """ Get comments on a Facebook post. Requires FACEBOOK_ACCESS_TOKEN environment variable. Args: post_id: The Facebook post ID limit: Maximum number of comments to return (default: 25) Returns: dict: List of comments with user info and engagement Example: facebook_get_post_comments("123456789_987654321", limit=10) """ if not FACEBOOK_ACCESS_TOKEN: return { "error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable." } params = { "access_token": FACEBOOK_ACCESS_TOKEN, "fields": "id,message,created_time,from,like_count,comment_count", "limit": min(limit, 100) } async with httpx.AsyncClient() as client: response = await client.get( f"{FACEBOOK_API_BASE}/{post_id}/comments", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() comments = [] for comment in data.get("data", []): comments.append({ "comment_id": comment.get("id"), "message": comment.get("message"), "created_time": comment.get("created_time"), "from_name": comment.get("from", {}).get("name"), "from_id": comment.get("from", {}).get("id"), "like_count": comment.get("like_count", 0), "reply_count": comment.get("comment_count", 0) }) return { "post_id": post_id, "comments": comments } @server.tool() async def facebook_post_to_page( message: str, page_id: Optional[str] = None ) -> Dict[str, Any]: """ Post a message to a Facebook Page. Requires FACEBOOK_ACCESS_TOKEN with pages_manage_posts permission. Args: message: The message content to post page_id: Facebook Page ID (uses FACEBOOK_PAGE_ID env var if not provided) Returns: dict: Post ID of the created post Example: facebook_post_to_page("Hello from MCP!", "12345678") """ if not FACEBOOK_ACCESS_TOKEN: return { "error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable." } target_page_id = page_id or FACEBOOK_PAGE_ID if not target_page_id: return {"error": "Page ID required. Provide page_id or set FACEBOOK_PAGE_ID environment variable."} async with httpx.AsyncClient() as client: response = await client.post( f"{FACEBOOK_API_BASE}/{target_page_id}/feed", data={ "message": message, "access_token": FACEBOOK_ACCESS_TOKEN } ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() return { "success": True, "post_id": data.get("id"), "page_id": target_page_id } @server.tool() async def facebook_reply_to_comment( comment_id: str, message: str ) -> Dict[str, Any]: """ Reply to a comment on a Facebook post. Requires FACEBOOK_ACCESS_TOKEN with pages_manage_comments permission. Args: comment_id: The Facebook comment ID to reply to message: The reply message Returns: dict: Reply comment ID Example: facebook_reply_to_comment("123456789_987654321", "Thanks for your comment!") """ if not FACEBOOK_ACCESS_TOKEN: return { "error": "Facebook access token not configured. Set FACEBOOK_ACCESS_TOKEN environment variable." } async with httpx.AsyncClient() as client: response = await client.post( f"{FACEBOOK_API_BASE}/{comment_id}/comments", data={ "message": message, "access_token": FACEBOOK_ACCESS_TOKEN } ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Facebook API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() return { "success": True, "reply_id": data.get("id"), "parent_comment_id": comment_id } # ============================================================================ # INSTAGRAM TOOLS # ============================================================================ @server.tool() async def instagram_get_profile_info() -> Dict[str, Any]: """ Get Instagram Business profile information. Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables. Returns: dict: Profile information including username, bio, follower count, etc. Example: instagram_get_profile_info() """ if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID: return { "error": "Instagram credentials not configured.", "help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables." } params = { "access_token": INSTAGRAM_ACCESS_TOKEN, "fields": "id,username,name,biography,followers_count,follows_count,media_count,profile_picture_url,website" } async with httpx.AsyncClient() as client: response = await client.get( f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() return { "account_id": data.get("id"), "username": data.get("username"), "name": data.get("name"), "biography": data.get("biography"), "followers_count": data.get("followers_count"), "following_count": data.get("follows_count"), "media_count": data.get("media_count"), "profile_picture_url": data.get("profile_picture_url"), "website": data.get("website") } @server.tool() async def instagram_get_media_posts(limit: int = 10) -> Dict[str, Any]: """ Get recent Instagram media posts from the Business account. Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables. Args: limit: Maximum number of posts to return (default: 10) Returns: dict: List of recent media posts with engagement metrics Example: instagram_get_media_posts(limit=5) """ if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID: return { "error": "Instagram credentials not configured.", "help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables." } params = { "access_token": INSTAGRAM_ACCESS_TOKEN, "fields": "id,caption,media_type,media_url,permalink,timestamp,like_count,comments_count,thumbnail_url", "limit": min(limit, 50) } async with httpx.AsyncClient() as client: response = await client.get( f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() posts = [] for post in data.get("data", []): posts.append({ "media_id": post.get("id"), "caption": post.get("caption"), "media_type": post.get("media_type"), "media_url": post.get("media_url"), "permalink": post.get("permalink"), "timestamp": post.get("timestamp"), "like_count": post.get("like_count"), "comments_count": post.get("comments_count"), "thumbnail_url": post.get("thumbnail_url") }) return {"posts": posts} @server.tool() async def instagram_get_media_insights(media_id: str) -> Dict[str, Any]: """ Get insights/analytics for a specific Instagram media post. Requires INSTAGRAM_ACCESS_TOKEN environment variable. Args: media_id: The Instagram media ID Returns: dict: Post insights including reach, impressions, engagement Example: instagram_get_media_insights("17890012345678901") """ if not INSTAGRAM_ACCESS_TOKEN: return { "error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable." } # Metrics available depend on media type (IMAGE, VIDEO, CAROUSEL_ALBUM) params = { "access_token": INSTAGRAM_ACCESS_TOKEN, "metric": "engagement,impressions,reach,saved" } async with httpx.AsyncClient() as client: response = await client.get( f"{INSTAGRAM_API_BASE}/{media_id}/insights", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() insights = {} for metric in data.get("data", []): insights[metric.get("name")] = metric.get("values", [{}])[0].get("value") return { "media_id": media_id, "insights": insights } @server.tool() async def instagram_publish_media( image_url: str, caption: str ) -> Dict[str, Any]: """ Publish an image to Instagram Business account. Requires INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables. The image_url must be a publicly accessible URL. Args: image_url: Public URL of the image to publish caption: Caption for the post Returns: dict: Created media ID Example: instagram_publish_media("https://example.com/image.jpg", "Check out this photo!") """ if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_BUSINESS_ACCOUNT_ID: return { "error": "Instagram credentials not configured.", "help": "Set INSTAGRAM_ACCESS_TOKEN and INSTAGRAM_BUSINESS_ACCOUNT_ID environment variables." } async with httpx.AsyncClient() as client: # Step 1: Create media container container_response = await client.post( f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media", data={ "image_url": image_url, "caption": caption, "access_token": INSTAGRAM_ACCESS_TOKEN } ) if container_response.status_code == 400: error_data = container_response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} container_response.raise_for_status() container_id = container_response.json().get("id") # Step 2: Publish the container publish_response = await client.post( f"{INSTAGRAM_API_BASE}/{INSTAGRAM_BUSINESS_ACCOUNT_ID}/media_publish", data={ "creation_id": container_id, "access_token": INSTAGRAM_ACCESS_TOKEN } ) if publish_response.status_code == 400: error_data = publish_response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} publish_response.raise_for_status() media_id = publish_response.json().get("id") return { "success": True, "media_id": media_id, "container_id": container_id } @server.tool() async def instagram_get_comments(media_id: str, limit: int = 25) -> Dict[str, Any]: """ Get comments on an Instagram media post. Requires INSTAGRAM_ACCESS_TOKEN environment variable. Args: media_id: The Instagram media ID limit: Maximum number of comments to return (default: 25) Returns: dict: List of comments Example: instagram_get_comments("17890012345678901", limit=10) """ if not INSTAGRAM_ACCESS_TOKEN: return { "error": "Instagram access token not configured. Set INSTAGRAM_ACCESS_TOKEN environment variable." } params = { "access_token": INSTAGRAM_ACCESS_TOKEN, "fields": "id,text,timestamp,username,like_count", "limit": min(limit, 50) } async with httpx.AsyncClient() as client: response = await client.get( f"{INSTAGRAM_API_BASE}/{media_id}/comments", params=params ) if response.status_code == 400: error_data = response.json().get("error", {}) return {"error": f"Instagram API error: {error_data.get('message', 'Unknown error')}"} response.raise_for_status() data = response.json() comments = [] for comment in data.get("data", []): comments.append({ "comment_id": comment.get("id"), "text": comment.get("text"), "timestamp": comment.get("timestamp"), "username": comment.get("username"), "like_count": comment.get("like_count") }) return { "media_id": media_id, "comments": comments } if __name__ == "__main__": server.run()