import asyncio import os import re import gradio as gr from pydantic_ai import Agent from pydantic_ai.mcp import MCPServerHTTP from pydantic_ai.models.gemini import GeminiModel from pydantic_ai.providers.google_gla import GoogleGLAProvider # MCP Server SSE URL SSE_URL = "https://oppaai-job-search-mcp-server.hf.space/gradio_api/mcp/sse" server = MCPServerHTTP(url=SSE_URL) # Load Gemini API Key from environment GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Define model model = GeminiModel( "gemini-2.0-flash", provider=GoogleGLAProvider(api_key=GEMINI_API_KEY) ) # Define agent agent = Agent( model=model, mcp_servers=[server], instructions=""" Your name is Jobcy. You are an AI assistant designed to help users find remote jobs by searching through job listings from various sources, including the Jobicy API and other platforms. Return job listings in a structured format: job title, company, location, and a Google search link. """ ) # Async chat function with timeout protection async def chat_with_agent(user_input, history): history = history or [] history.append(("You", user_input)) try: result = await asyncio.wait_for(agent.run(user_input), timeout=30) history.append(("Jobcy", result)) except asyncio.TimeoutError: history.append(("Jobcy", "⏳ Sorry, I took too long to respond. Try again in a moment.")) except Exception as e: history.append(("Jobcy", f"⚠️ Error: {type(e).__name__}: {e}")) return history, history # Main app UI with gr.Blocks() as demo: gr.Markdown("## 💼 Jobcy – Your Remote Job Assistant") chatbot = gr.Chatbot() user_input = gr.Textbox( placeholder="Ask Jobcy about remote jobs...", label="Your Message", lines=2 ) state = gr.State([]) user_input.submit(chat_with_agent, inputs=[user_input, state], outputs=[chatbot, state]) gr.Examples( examples=[ "Find remote software engineer jobs in Canada", "Show me part-time data analyst jobs I can do from home", "List any remote marketing jobs with no degree required", ], inputs=user_input ) demo.queue(concurrency_count=2) demo.launch(share=True)