Spaces:
Runtime error
Runtime error
| import asyncio | |
| import re | |
| import os | |
| import gradio as gr | |
| from pydantic_ai import Agent | |
| from pydantic_ai.mcp import MCPServerHTTP | |
| from pydantic_ai.models.openai import OpenAIModel | |
| from pydantic_ai.providers.openai import OpenAIProvider | |
| # MCP Server SSE URL | |
| SSE_URL = "https://oppaai-job-search-mcp-server.hf.space/gradio_api/mcp/sse" | |
| server = MCPServerHTTP(url=SSE_URL) | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Make sure your HF_TOKEN is set in environment variables | |
| # Use HuggingFace hosted Qwen3-30B-A3B model | |
| qwen3_model = OpenAIModel( | |
| model_name="Qwen/Qwen3-30B-A3B", | |
| provider=OpenAIProvider( | |
| base_url="https://api-inference.huggingface.co/models/Qwen/Qwen3-30B-A3B", | |
| token=HF_TOKEN, | |
| ), | |
| ) | |
| # Create Agent with MCP Server | |
| agent = Agent( | |
| model=qwen3_model, | |
| mcp_servers=[server], | |
| instructions=""" | |
| Your name is Jobcy. You are an AI assistant designed to help users to find remote jobs by searching through job listings from various sources, including the Jobicy API and other platforms. | |
| You will list the job listings in a structured format, including the job title, company, location, and the google search link. | |
| """ | |
| ) | |
| async def chat_with_agent(user_input, history): | |
| history = history or [] | |
| result = await agent.run(user_input) | |
| cleaned_output = re.sub(r"<think>.*?</think>", "", result.output, flags=re.DOTALL).strip() | |
| history.append(("User", user_input)) | |
| history.append(("Jobcy", cleaned_output)) | |
| return history, history | |
| async def main(): | |
| async with agent.run_mcp_servers(): | |
| with gr.Blocks() as demo: | |
| chatbot = gr.Chatbot() | |
| user_input = gr.Textbox(placeholder="Ask Jobcy about remote jobs or anything else...", label="Your Message") | |
| state = gr.State([]) | |
| user_input.submit(chat_with_agent, inputs=[user_input, state], outputs=[chatbot, state]) | |
| demo.title = "Jobcy Remote Job Search Assistant" | |
| demo.launch() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |