Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import json | |
| import random | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any | |
| import asyncio | |
| import logging | |
| from utils.getNews import fetch_trending_news | |
| import re | |
| import requests | |
| import tempfile | |
| import os | |
| import subprocess | |
| import platform | |
| import dotenv | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| dotenv.load_dotenv(Path(__file__).parent / ".env") | |
| TEXT_TO_SPEECH_API_URL = os.environ.get("TEXT_TO_SPEECH_BASE_URL") | |
| # MCP imports - adjusted for actual library structure | |
| try: | |
| from mcp.server import Server | |
| MCP_AVAILABLE = True | |
| except ImportError: | |
| # Fallback if MCP is not available | |
| MCP_AVAILABLE = False | |
| print("MCP library not available - running in simulation mode only") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class NewsAggregatorMCP: | |
| def __init__(self): | |
| if MCP_AVAILABLE: | |
| self.server = Server("news-aggregator") | |
| self.setup_mcp_server() | |
| else: | |
| self.server = None | |
| def setup_mcp_server(self): | |
| """Setup MCP server tools and resources""" | |
| if not MCP_AVAILABLE: | |
| return | |
| # Define tools | |
| self.tools = [ | |
| { | |
| "name": "search_news", | |
| "description": "Search for news articles based on keywords", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "keywords": { | |
| "type": "array", | |
| "items": { | |
| "type": "string" | |
| }, | |
| "description": "Array of keywords to search for in news articles" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Maximum number of articles to return", | |
| "default": 5 | |
| }, | |
| "date_from": { | |
| "type": "integer", | |
| "description": "How many days old the news can be (optional)", | |
| "minimum": 1 | |
| } | |
| }, | |
| "required": ["keywords"] | |
| } | |
| }, | |
| { | |
| "name": "text_to_audio", | |
| "description": "Convert a SINGLE text snippet to speech. Only use this when you want to convert a specific piece of text to audio, NOT for every news article.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "text": { | |
| "type": "string", | |
| "description": "Text to convert to audio", | |
| "maxLength": 2000 | |
| }, | |
| "use_music": { | |
| "type": "boolean", | |
| "description": "Whether to include background music", | |
| "default": True | |
| } | |
| }, | |
| "required": ["text"] | |
| }, | |
| "outputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "audio_file": { | |
| "type": "string", | |
| "format": "binary", | |
| "description": "Generated audio file in MP3 format" | |
| }, | |
| "success": { | |
| "type": "boolean", | |
| "description": "Whether the audio generation was successful" | |
| }, | |
| "message": { | |
| "type": "string", | |
| "description": "Status message about the audio generation" | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| async def search_news(self, args: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Search for news articles based on keywords and optional parameters. | |
| Args: | |
| keywords (list): List of keywords to search for | |
| limit (int, optional): Maximum number of results to return. Defaults to 10. | |
| date_from (int, optional): How many days old the news can be. Defaults to None. | |
| Returns: | |
| list: List of news articles matching the search criteria | |
| """ | |
| keywords = args.get("keywords", "") | |
| limit = args.get("limit", 5) | |
| if not keywords: | |
| return {"error": "Keywords are required"} | |
| # Split keywords into list for Guardian API | |
| keyword_list = keywords.split() | |
| date_from_input = args.get("date_from_input", 2) | |
| date_from = (datetime.now() - timedelta(days=date_from_input)).strftime("%Y-%m-%d") | |
| # Fetch articles from Guardian API | |
| articles = fetch_trending_news(keyword_list, date_from, page_size=limit, max_pages=1) | |
| # Format results to match expected output | |
| formatted_articles = [] | |
| for article in articles: | |
| fields = article.get("fields", {}) | |
| full_body = fields.get("body", "") | |
| sentences = re.split(r'(?<=[.!?])\s+', full_body) | |
| limited_body = " ".join(sentences[:10]) if sentences else "" | |
| formatted_article = { | |
| "title": fields.get("headline", article.get("webTitle", "No title")), | |
| "section": article.get("sectionName", "The Guardian"), | |
| "url": article.get("webUrl", ""), | |
| "summary": fields.get("trailText", fields.get("standfirst", "No summary")), | |
| "published": article.get("webPublicationDate", ""), | |
| "body": limited_body | |
| } | |
| formatted_articles.append(formatted_article) | |
| response = { | |
| "query": keywords, | |
| "total_results": len(articles), | |
| "articles": formatted_articles | |
| } | |
| return response | |
| async def text_to_audio(self, args: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Convert text to audio using AI text-to-speech API with optional background music. | |
| IMPORTANT: | |
| - This tool should be used ONLY ONCE for summarized news content. | |
| - DO NOT call this function separately for each news article. | |
| - Instead, first summarize the key points from all news articles. | |
| - Then call this function ONCE with the compiled summary. | |
| - Maximum allowed text length: 1000 characters. (about 125 words) | |
| Args: | |
| text (str): Text to convert to audio | |
| use_music (bool, optional): Whether to include background music. Defaults to False. | |
| Returns: | |
| dict: Response containing audio file path and metadata | |
| """ | |
| text = args.get("text", "") | |
| use_music = args.get("use_music", False) | |
| if not text: | |
| return {"error": "Text is required for audio conversion"} | |
| if len(text) > 1000: | |
| return {"error": "Text too long. Maximum 1000 characters allowed."} | |
| try: | |
| # Prepare API request | |
| api_url = TEXT_TO_SPEECH_API_URL | |
| payload = { | |
| "prompt": text, | |
| "use_music": str(use_music).lower() | |
| } | |
| # Make API request | |
| logger.info(f"Converting text to audio: {text[:50]}...") | |
| response = requests.post(api_url, json=payload, timeout=60) | |
| if response.status_code == 200: | |
| # Create temporary file to store audio | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_file: | |
| temp_file.write(response.content) | |
| temp_file_path = temp_file.name | |
| # Get file size | |
| file_size = len(response.content) | |
| result = { | |
| "success": True, | |
| "text": text, | |
| "use_music": use_music, | |
| "audio_file": temp_file_path, | |
| "file_size_bytes": file_size, | |
| "content_type": "audio/mpeg", | |
| "generated_at": datetime.now().isoformat(), | |
| "message": "Audio generated successfully" | |
| } | |
| logger.info(f"Audio generated successfully. File size: {file_size} bytes") | |
| return result | |
| else: | |
| error_msg = f"API request failed with status code: {response.status_code}" | |
| logger.error(error_msg) | |
| return { | |
| "success": False, | |
| "error": error_msg, | |
| "status_code": response.status_code | |
| } | |
| except requests.exceptions.Timeout: | |
| error_msg = "Request timed out. The text-to-speech service may be busy." | |
| logger.error(error_msg) | |
| return {"success": False, "error": error_msg} | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"Request failed: {str(e)}" | |
| logger.error(error_msg) | |
| return {"success": False, "error": error_msg} | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| logger.error(error_msg) | |
| return {"success": False, "error": error_msg} | |
| # Add this to your class | |
| async def play_audio(self, args: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Play an audio file using the system's default audio player. | |
| Args: | |
| filepath (str): Path to the audio file to play | |
| Returns: | |
| dict: Status of the audio playback | |
| """ | |
| filepath = args.get("filepath") | |
| if not filepath: | |
| return { | |
| "success": False, | |
| "error": "No filepath provided" | |
| } | |
| if not os.path.exists(filepath): | |
| return { | |
| "success": False, | |
| "error": f"File not found: {filepath}" | |
| } | |
| # Check if file is a supported audio format | |
| allowed_extensions = ['.mp3', '.wav'] | |
| if not any(filepath.lower().endswith(ext) for ext in allowed_extensions): | |
| return { | |
| "success": False, | |
| "error": "File must be a supported audio format (MP3, WAV)" | |
| } | |
| try: | |
| # Get the absolute path | |
| abs_filepath = os.path.abspath(filepath) | |
| # Platform-specific command to play audio | |
| if platform.system() == 'Windows': | |
| # Using start command on Windows | |
| subprocess.Popen(['start', '', abs_filepath], shell=True) | |
| elif platform.system() == 'Darwin': # macOS | |
| subprocess.Popen(['afplay', abs_filepath]) | |
| else: # Linux and others | |
| subprocess.Popen(['aplay', abs_filepath]) | |
| return { | |
| "success": True, | |
| "message": f"Playing audio file: {os.path.basename(filepath)}", | |
| "filepath": filepath | |
| } | |
| except Exception as e: | |
| logger.error(f"Error playing audio: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to play audio: {str(e)}" | |
| } | |
| async def stop_audio(self, args: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Stop currently playing audio. | |
| Returns: | |
| dict: Status of the stop operation | |
| """ | |
| try: | |
| if platform.system() == 'Windows': | |
| # Kill media player processes on Windows | |
| subprocess.run(['taskkill', '/F', '/IM', 'wmplayer.exe'], shell=True, stderr=subprocess.DEVNULL) | |
| elif platform.system() == 'Darwin': # macOS | |
| subprocess.run(['killall', 'afplay'], stderr=subprocess.DEVNULL) | |
| else: # Linux and others | |
| subprocess.run(['killall', 'aplay'], stderr=subprocess.DEVNULL) | |
| return { | |
| "success": True, | |
| "message": "Stopped audio playback" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error stopping audio: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": f"Failed to stop audio: {str(e)}" | |
| } | |
| # Initialize MCP server | |
| news_mcp = NewsAggregatorMCP() | |
| def search_news_interface(keywords, limit, date_from_input): | |
| """ | |
| Search for recent news articles based on provided keywords. | |
| Args: | |
| keywords (str): Comma-separated keywords to search for in news articles. | |
| limit (int): Maximum number of articles to return. The maximum value must be less than 15. | |
| date_from_input (int): Number of days in the past to consider for articles (e.g., 7 for last week). | |
| Returns: | |
| str: A JSON-formatted string containing the total number of results and a list of matched articles. | |
| """ | |
| if limit>15: | |
| limit=15 | |
| if not keywords: | |
| return "Please enter keywords to search for news." | |
| # Simulate the MCP tool call | |
| args = {"keywords": keywords, "limit": int(limit), "date_from_input":date_from_input} | |
| # Simulate async call | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(news_mcp.search_news(args)) | |
| return json.dumps(result, indent=2) | |
| # Now update the text_to_audio_interface function to optionally play the audio | |
| def text_to_audio_interface(text, use_music, auto_play): | |
| """ | |
| Convert a short piece of text to speech using an AI text-to-audio model. Provide all the news in one call instead of making multiple separate calls. This helps improve the quality and flow of the speech output | |
| Args: | |
| text (str): The input text to convert into speech. Max length: 1000 characters. | |
| use_music (bool): Whether to add soft background music to the audio output. Default value is True. | |
| auto_play (bool): Whether to automatically play the generated audio after conversion. Must be True if called from Claude or other MCP clients which doesn't support audio output play in UI. | |
| Returns: | |
| tuple: | |
| - str: A JSON-formatted string with metadata about the generated audio. | |
| - str | None: Filepath to the generated audio file (MP3), or None if generation failed. | |
| """ | |
| if not text: | |
| return "Please enter text to convert to audio.", None | |
| args = {"text": text, "use_music": use_music} | |
| # Simulate async call | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(news_mcp.text_to_audio(args)) | |
| if result.get("success"): | |
| if(auto_play): | |
| # Optionally play the audio immediately | |
| play_args = {"filepath": result.get("audio_file")} | |
| play_result = loop.run_until_complete(news_mcp.play_audio(play_args)) | |
| return json.dumps(result, indent=2), result.get("audio_file") | |
| else: | |
| return json.dumps(result, indent=2), None | |
| # Create Gradio interface | |
| with gr.Blocks(title="News Aggregator with Text to audio - MCP Server", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # ๐ฐ News Aggregator with Text-to-Audio | |
| ## MCP Server for Gradio Hackathon | |
| This application serves as both a Gradio interface and an MCP (Model Context Protocol) server for news aggregation, text-to-audio conversion. | |
| ### Available MCP Tools: | |
| - `search_news`: Search for news articles based on keywords | |
| - `text_to_audio`: Convert text to audio with optional background music. | |
| **Note:** Due to the application's continuous inactivity, the text-to-speech feature may not work on the first run. If it doesn't respond initially, please try running it again | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("๐ Search News"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| keywords_input = gr.Textbox( | |
| label="Keywords (comma-separated)", | |
| placeholder="Enter keywords separated by commas...", | |
| value="technology, innovation" | |
| ) | |
| limit_input = gr.Slider( | |
| label="Max Results", | |
| minimum=1, | |
| maximum=20, | |
| value=5, | |
| step=1 | |
| ) | |
| date_from_input = gr.Slider( | |
| label="News Age (days)", | |
| minimum=1, | |
| maximum=30, | |
| value=7, | |
| step=1, | |
| info="How many days old the news can be" | |
| ) | |
| search_btn = gr.Button("Search News", variant="primary") | |
| with gr.Column(): | |
| search_output = gr.JSON(label="Search Results") | |
| search_btn.click( | |
| search_news_interface, | |
| inputs=[keywords_input, limit_input, date_from_input], | |
| outputs=[search_output] | |
| ) | |
| with gr.TabItem("๐ต Text to Audio"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="Text to Convert", | |
| placeholder="Enter text to convert to audio...", | |
| lines=4, | |
| value="Hello! This is a test of the text-to-audio conversion feature.", | |
| max_lines=10 | |
| ) | |
| music_checkbox = gr.Checkbox( | |
| label="Add Background Music", | |
| value=False, | |
| info="Include background music in the generated audio" | |
| ) | |
| auto_play = gr.Checkbox( | |
| label="Auto play", | |
| value=False, | |
| info="Auto play the generated audio using local audio player after conversion." | |
| ) | |
| convert_btn = gr.Button("Convert to Audio", variant="primary") | |
| with gr.Column(): | |
| audio_output_json = gr.JSON(label="Conversion Results") | |
| audio_player = gr.Audio( | |
| label="Generated Audio", | |
| type="filepath", | |
| interactive=False | |
| ) | |
| convert_btn.click( | |
| text_to_audio_interface, | |
| inputs=[text_input, music_checkbox,auto_play ], | |
| outputs=[audio_output_json, audio_player] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### MCP Server Information | |
| This Gradio app also functions as an MCP server that can be connected to MCP clients like Claude Desktop or Cursor. | |
| **Server Name**: `news-aggregator` | |
| **Available Tools**: | |
| 1. `search_news(keywords, category?, limit?)` - Search news articles: Keywords in array format | |
| 2. `text_to_audio(text, use_music?)` - Convert text to audio with optional background music | |
| Connect your MCP client to this server to use these tools programmatically! | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True,share=True) |