OppaAI's picture
Update app.py
6ae40f8 verified
import asyncio
import os
import re
import gradio as gr
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.providers.google_gla import GoogleGLAProvider
# MCP Server SSE URL
SSE_URL = "https://oppaai-job-search-mcp-server.hf.space/gradio_api/mcp/sse"
server = MCPServerHTTP(url=SSE_URL)
# Load Gemini API Key from environment
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Define model
model = GeminiModel(
"gemini-2.0-flash",
provider=GoogleGLAProvider(api_key=GEMINI_API_KEY)
)
# Define agent
agent = Agent(
model=model,
mcp_servers=[server],
instructions="""
Your name is Jobcy. You are an AI assistant designed to help users find remote jobs by searching through job listings from various sources, including the Jobicy API and other platforms.
Return job listings in a structured format: job title, company, location, and a Google search link.
"""
)
# Async chat function with timeout protection
async def chat_with_agent(user_input, history):
history = history or []
history.append(("You", user_input))
try:
result = await asyncio.wait_for(agent.run(user_input), timeout=30)
history.append(("Jobcy", result))
except asyncio.TimeoutError:
history.append(("Jobcy", "⏳ Sorry, I took too long to respond. Try again in a moment."))
except Exception as e:
history.append(("Jobcy", f"⚠️ Error: {type(e).__name__}: {e}"))
return history, history
# Main app UI
with gr.Blocks() as demo:
gr.Markdown("## 💼 Jobcy – Your Remote Job Assistant")
chatbot = gr.Chatbot()
user_input = gr.Textbox(
placeholder="Ask Jobcy about remote jobs...",
label="Your Message",
lines=2
)
state = gr.State([])
user_input.submit(chat_with_agent, inputs=[user_input, state], outputs=[chatbot, state])
gr.Examples(
examples=[
"Find remote software engineer jobs in Canada",
"Show me part-time data analyst jobs I can do from home",
"List any remote marketing jobs with no degree required",
],
inputs=user_input
)
demo.queue(concurrency_count=2)
demo.launch(share=True)