File size: 3,774 Bytes
57f808e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e05fb8f
 
57f808e
 
 
 
 
 
 
 
 
 
 
 
 
 
e05fb8f
 
57f808e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import enum
from lynxkite_core import ops
from lynxkite_graph_analytics import core
import fastapi
from lynxscribe.components.chat.api import ChatAPI
from lynxscribe.components.rag.rag_chatbot import Mode, RAGChatbot, Scenario, ScenarioSelector
from lynxscribe.core.llm.base import get_llm_engine
from lynxscribe.core.models.prompts import ChatCompletionPrompt

lsop = ops.op_registration(
    "LynxKite Graph Analytics", "LynxScribe", dir="bottom-to-top", color="blue"
)

dsop = ops.op_registration("LynxKite Graph Analytics", "Data Science")

Placeholder = str


@lsop("Chat frontend", color="gray", outputs=[], view="service")
def chat_frontend(agent: Placeholder):
    return ChatAPIService(agent)


@lsop("Agent")
def agent(
    tools: list[Placeholder],
    *,
    purpose: str = "",
    description: ops.LongStr = "This agent helps with various tasks.",
    system_prompt: ops.LongStr = "You are a helpful assistant.",
):
    ts = []
    for t in tools:
        t = "\n  ".join(t.split("\n"))
        ts.append(f"- {t}\n")
    return f"Agent for {purpose} with {len(tools)} tools:\n{''.join(ts)}"


@lsop("MCP: Query database with SQL", color="green")
def sql_tool(db: Placeholder):
    return f"SQL over {db}"


@ops.output_position(output="top")
@dsop("Database")
def db(data_pipeline: list[core.Bundle]):
    return f"DB with {len(data_pipeline)} bundles"


class MCPSearchEngine(str, enum.Enum):
    Google = "Google"
    Bing = "Bing"
    DuckDuckGo = "DuckDuckGo"


@lsop("MCP: Search web", color="green")
def web_search(*, engine: MCPSearchEngine = MCPSearchEngine.Google):
    return f"Web search ({engine.name})"


@lsop("MCP: Run ComfyUI workflow", color="green")
def run_comfyui_workflow(*, workflow_name: str):
    return f"Run comfyui workflow ({workflow_name})"


@lsop("MCP: Calculator", color="green")
def calculator():
    return "Calculator"


class ChatAPIService:
    def __init__(self, agent: str):
        self.agent = agent
        self.chat_api = ChatAPI(
            chatbot=RAGChatbot(
                None,
                ScenarioSelector([Scenario(name="single", mode=Mode.LLM_ONLY)]),
                llm=get_llm_engine(name="openai"),
            ),
            model="gpt-4o-mini",
        )

    async def get(self, request: fastapi.Request) -> dict:
        if request.state.remaining_path == "models":
            return {
                "object": "list",
                "data": [
                    {
                        "id": "LynxScribe",
                        "object": "model",
                        "created": 0,
                        "owned_by": "lynxkite",
                        "meta": {"profile_image_url": "https://lynxkite.com/favicon.png"},
                    }
                ],
            }
        return {"error": "Not found"}

    async def post(self, request: fastapi.Request) -> dict:
        if request.state.remaining_path == "chat/completions":
            request = await request.json()
            if request["stream"]:
                from sse_starlette.sse import EventSourceResponse

                return EventSourceResponse(self.stream_chat_api_response(request))
            else:
                return await self.get_chat_api_response(request)
        return {"error": "Not found"}

    async def stream_chat_api_response(self, request):
        request = ChatCompletionPrompt(**request)
        async for chunk in await self.chat_api.answer(request, stream=True):
            chunk.choices[0].delta.content += f"({self.agent})"
            yield chunk.model_dump_json()

    async def get_chat_api_response(self, request):
        request = ChatCompletionPrompt(**request)
        response = await self.chat_api.answer(request, stream=False)
        return response.model_dump()