darabos's picture
Take remaining path from request.state.
e05fb8f
import enum
from lynxkite_core import ops
from lynxkite_graph_analytics import core
import fastapi
from lynxscribe.components.chat.api import ChatAPI
from lynxscribe.components.rag.rag_chatbot import Mode, RAGChatbot, Scenario, ScenarioSelector
from lynxscribe.core.llm.base import get_llm_engine
from lynxscribe.core.models.prompts import ChatCompletionPrompt
lsop = ops.op_registration(
"LynxKite Graph Analytics", "LynxScribe", dir="bottom-to-top", color="blue"
)
dsop = ops.op_registration("LynxKite Graph Analytics", "Data Science")
Placeholder = str
@lsop("Chat frontend", color="gray", outputs=[], view="service")
def chat_frontend(agent: Placeholder):
return ChatAPIService(agent)
@lsop("Agent")
def agent(
tools: list[Placeholder],
*,
purpose: str = "",
description: ops.LongStr = "This agent helps with various tasks.",
system_prompt: ops.LongStr = "You are a helpful assistant.",
):
ts = []
for t in tools:
t = "\n ".join(t.split("\n"))
ts.append(f"- {t}\n")
return f"Agent for {purpose} with {len(tools)} tools:\n{''.join(ts)}"
@lsop("MCP: Query database with SQL", color="green")
def sql_tool(db: Placeholder):
return f"SQL over {db}"
@ops.output_position(output="top")
@dsop("Database")
def db(data_pipeline: list[core.Bundle]):
return f"DB with {len(data_pipeline)} bundles"
class MCPSearchEngine(str, enum.Enum):
Google = "Google"
Bing = "Bing"
DuckDuckGo = "DuckDuckGo"
@lsop("MCP: Search web", color="green")
def web_search(*, engine: MCPSearchEngine = MCPSearchEngine.Google):
return f"Web search ({engine.name})"
@lsop("MCP: Run ComfyUI workflow", color="green")
def run_comfyui_workflow(*, workflow_name: str):
return f"Run comfyui workflow ({workflow_name})"
@lsop("MCP: Calculator", color="green")
def calculator():
return "Calculator"
class ChatAPIService:
def __init__(self, agent: str):
self.agent = agent
self.chat_api = ChatAPI(
chatbot=RAGChatbot(
None,
ScenarioSelector([Scenario(name="single", mode=Mode.LLM_ONLY)]),
llm=get_llm_engine(name="openai"),
),
model="gpt-4o-mini",
)
async def get(self, request: fastapi.Request) -> dict:
if request.state.remaining_path == "models":
return {
"object": "list",
"data": [
{
"id": "LynxScribe",
"object": "model",
"created": 0,
"owned_by": "lynxkite",
"meta": {"profile_image_url": "https://lynxkite.com/favicon.png"},
}
],
}
return {"error": "Not found"}
async def post(self, request: fastapi.Request) -> dict:
if request.state.remaining_path == "chat/completions":
request = await request.json()
if request["stream"]:
from sse_starlette.sse import EventSourceResponse
return EventSourceResponse(self.stream_chat_api_response(request))
else:
return await self.get_chat_api_response(request)
return {"error": "Not found"}
async def stream_chat_api_response(self, request):
request = ChatCompletionPrompt(**request)
async for chunk in await self.chat_api.answer(request, stream=True):
chunk.choices[0].delta.content += f"({self.agent})"
yield chunk.model_dump_json()
async def get_chat_api_response(self, request):
request = ChatCompletionPrompt(**request)
response = await self.chat_api.answer(request, stream=False)
return response.model_dump()