Spaces:
Running
Running
| import enum | |
| from lynxkite_core import ops | |
| from lynxkite_graph_analytics import core | |
| import fastapi | |
| from lynxscribe.components.chat.api import ChatAPI | |
| from lynxscribe.components.rag.rag_chatbot import Mode, RAGChatbot, Scenario, ScenarioSelector | |
| from lynxscribe.core.llm.base import get_llm_engine | |
| from lynxscribe.core.models.prompts import ChatCompletionPrompt | |
| lsop = ops.op_registration( | |
| "LynxKite Graph Analytics", "LynxScribe", dir="bottom-to-top", color="blue" | |
| ) | |
| dsop = ops.op_registration("LynxKite Graph Analytics", "Data Science") | |
| Placeholder = str | |
| def chat_frontend(agent: Placeholder): | |
| return ChatAPIService(agent) | |
| def agent( | |
| tools: list[Placeholder], | |
| *, | |
| purpose: str = "", | |
| description: ops.LongStr = "This agent helps with various tasks.", | |
| system_prompt: ops.LongStr = "You are a helpful assistant.", | |
| ): | |
| ts = [] | |
| for t in tools: | |
| t = "\n ".join(t.split("\n")) | |
| ts.append(f"- {t}\n") | |
| return f"Agent for {purpose} with {len(tools)} tools:\n{''.join(ts)}" | |
| def sql_tool(db: Placeholder): | |
| return f"SQL over {db}" | |
| def db(data_pipeline: list[core.Bundle]): | |
| return f"DB with {len(data_pipeline)} bundles" | |
| class MCPSearchEngine(str, enum.Enum): | |
| Google = "Google" | |
| Bing = "Bing" | |
| DuckDuckGo = "DuckDuckGo" | |
| def web_search(*, engine: MCPSearchEngine = MCPSearchEngine.Google): | |
| return f"Web search ({engine.name})" | |
| def run_comfyui_workflow(*, workflow_name: str): | |
| return f"Run comfyui workflow ({workflow_name})" | |
| def calculator(): | |
| return "Calculator" | |
| class ChatAPIService: | |
| def __init__(self, agent: str): | |
| self.agent = agent | |
| self.chat_api = ChatAPI( | |
| chatbot=RAGChatbot( | |
| None, | |
| ScenarioSelector([Scenario(name="single", mode=Mode.LLM_ONLY)]), | |
| llm=get_llm_engine(name="openai"), | |
| ), | |
| model="gpt-4o-mini", | |
| ) | |
| async def get(self, request: fastapi.Request) -> dict: | |
| if request.state.remaining_path == "models": | |
| return { | |
| "object": "list", | |
| "data": [ | |
| { | |
| "id": "LynxScribe", | |
| "object": "model", | |
| "created": 0, | |
| "owned_by": "lynxkite", | |
| "meta": {"profile_image_url": "https://lynxkite.com/favicon.png"}, | |
| } | |
| ], | |
| } | |
| return {"error": "Not found"} | |
| async def post(self, request: fastapi.Request) -> dict: | |
| if request.state.remaining_path == "chat/completions": | |
| request = await request.json() | |
| if request["stream"]: | |
| from sse_starlette.sse import EventSourceResponse | |
| return EventSourceResponse(self.stream_chat_api_response(request)) | |
| else: | |
| return await self.get_chat_api_response(request) | |
| return {"error": "Not found"} | |
| async def stream_chat_api_response(self, request): | |
| request = ChatCompletionPrompt(**request) | |
| async for chunk in await self.chat_api.answer(request, stream=True): | |
| chunk.choices[0].delta.content += f"({self.agent})" | |
| yield chunk.model_dump_json() | |
| async def get_chat_api_response(self, request): | |
| request = ChatCompletionPrompt(**request) | |
| response = await self.chat_api.answer(request, stream=False) | |
| return response.model_dump() | |