johnbridges commited on
Commit
6195aba
·
1 Parent(s): 3692feb

remove unused files

Browse files
Files changed (7) hide show
  1. cloud_event.py +0 -0
  2. factory.py +0 -8
  3. function_tracker.py +0 -44
  4. runners/base.py +0 -17
  5. runners/echo.py +0 -46
  6. service.py +0 -254
  7. streaming.py +0 -22
cloud_event.py DELETED
File without changes
factory.py DELETED
@@ -1,8 +0,0 @@
1
- # factories.py
2
- from runners.echo import EchoRunner
3
- from runners.base import ILLMRunner
4
- from typing import Dict, Any
5
-
6
- async def default_runner_factory(context: Dict[str, Any]) -> ILLMRunner:
7
- # choose runner by context["LLMRunnerType"] if you need variants
8
- return EchoRunner(publisher=context["_publisher"], settings=context["_settings"])
 
 
 
 
 
 
 
 
 
function_tracker.py DELETED
@@ -1,44 +0,0 @@
1
- # function_tracker.py
2
- from __future__ import annotations
3
- from dataclasses import dataclass
4
- from typing import Dict, List
5
- import random
6
- import logging
7
- logger = logging.getLogger(__name__)
8
-
9
- @dataclass
10
- class TrackedCall:
11
- FunctionCallId: str
12
- FunctionName: str
13
- IsProcessed: bool = False
14
- Payload: str = ""
15
-
16
- class FunctionCallTracker:
17
- def __init__(self) -> None:
18
- self._by_msg: Dict[str, Dict[str, TrackedCall]] = {}
19
-
20
- @staticmethod
21
- def gen_id() -> str:
22
- return f"call_{random.randint(10_000_000, 99_999_999)}"
23
-
24
- def add(self, message_id: str, fn_name: str, payload: str) -> str:
25
- call_id = self.gen_id()
26
- self._by_msg.setdefault(message_id, {})[call_id] = TrackedCall(call_id, fn_name, False, payload)
27
- return call_id
28
-
29
- def mark_processed(self, message_id: str, call_id: str, payload: str = "") -> None:
30
- m = self._by_msg.get(message_id, {})
31
- if call_id in m:
32
- m[call_id].IsProcessed = True
33
- if payload:
34
- m[call_id].Payload = payload
35
-
36
- def all_processed(self, message_id: str) -> bool:
37
- m = self._by_msg.get(message_id, {})
38
- return bool(m) and all(x.IsProcessed for x in m.values())
39
-
40
- def processed_list(self, message_id: str) -> List[TrackedCall]:
41
- return list(self._by_msg.get(message_id, {}).values())
42
-
43
- def clear(self, message_id: str) -> None:
44
- self._by_msg.pop(message_id, None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
runners/base.py DELETED
@@ -1,17 +0,0 @@
1
- from abc import ABC, abstractmethod
2
- from typing import Any
3
-
4
- class ILLMRunner(ABC):
5
- Type: str = "BaseLLM"
6
- IsEnabled: bool = True
7
- IsStateStarting: bool = False
8
- IsStateFailed: bool = False
9
-
10
- @abstractmethod
11
- async def StartProcess(self, llmServiceObj: dict) -> None: ...
12
- @abstractmethod
13
- async def RemoveProcess(self, sessionId: str) -> None: ...
14
- @abstractmethod
15
- async def StopRequest(self, sessionId: str) -> None: ...
16
- @abstractmethod
17
- async def SendInputAndGetResponse(self, llmServiceObj: dict) -> None: ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
runners/echo.py DELETED
@@ -1,46 +0,0 @@
1
- # runners/echo.py
2
- from __future__ import annotations
3
- from typing import Any, Dict, Optional
4
- from .base import ILLMRunner
5
- from models import LLMServiceObj
6
- from function_tracker import FunctionCallTracker
7
- import logging
8
- logger = logging.getLogger(__name__)
9
-
10
- class EchoRunner(ILLMRunner):
11
- Type = "TurboLLM"
12
- IsEnabled = True
13
- IsStateStarting = False
14
- IsStateFailed = False
15
-
16
- def __init__(self, publisher, settings):
17
- self._pub = publisher
18
- self._settings = settings
19
- self._tracker = FunctionCallTracker()
20
-
21
- async def StartProcess(self, llmServiceObj: dict) -> None:
22
- logger.info(f"StartProcess called with: {llmServiceObj}")
23
- # pretend to “warm up”
24
- pass
25
-
26
- async def RemoveProcess(self, sessionId: str) -> None:
27
- logger.info(f"RemoveProcess called for session: {sessionId}")
28
- # nothing to clean here
29
- pass
30
-
31
- async def StopRequest(self, sessionId: str) -> None:
32
- logger.info(f"StopRequest called for session: {sessionId}")
33
- # no streaming loop to stop in echo
34
- pass
35
-
36
- async def SendInputAndGetResponse(self, llmServiceObj: dict) -> None:
37
- logger.info(f"SendInputAndGetResponse called with: {llmServiceObj}")
38
- llm = LLMServiceObj(**llmServiceObj)
39
- if llm.UserInput.startswith("<|START_AUDIO|>") or llm.UserInput.startswith("<|STOP_AUDIO|>"):
40
- logger.debug("Audio input detected, ignoring in echo.")
41
- return
42
-
43
- # Echo behavior (match UI format)
44
- await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<User:> {llm.UserInput}\n\n"))
45
- await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Assistant:> You said: {llm.UserInput}\n"))
46
- await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="<end-of-line>"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
service.py DELETED
@@ -1,254 +0,0 @@
1
- # service.py
2
- import asyncio
3
- from dataclasses import dataclass
4
- from typing import Any, Dict, Optional, Callable, Awaitable
5
-
6
- from config import settings
7
- from models import LLMServiceObj, ResultObj
8
- from rabbit_repo import RabbitRepo
9
- from runners.base import ILLMRunner
10
- from message_helper import success as _ok, error as _err
11
- import logging
12
- logger = logging.getLogger(__name__)
13
-
14
- @dataclass
15
- class _Session:
16
- Runner: Optional[ILLMRunner]
17
- FullSessionId: str
18
-
19
-
20
- class LLMService:
21
- """
22
- Python/Gradio equivalent of your .NET LLMService.
23
- Keeps identical field names and queue semantics when talking to RabbitMQ.
24
- """
25
- def __init__(
26
- self,
27
- publisher: RabbitRepo,
28
- runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]],
29
- ):
30
- self._pub: RabbitRepo = publisher
31
- self._runner_factory = runner_factory # async factory: dict -> ILLMRunner
32
- self._sessions: Dict[str, _Session] = {}
33
- self._ready = asyncio.Event()
34
- self._ready.set() # call clear()/set() if you preload history
35
- self._service_id_lc = settings.SERVICE_ID.lower()
36
-
37
- async def init(self) -> None:
38
- """Hook to preload history/sessions; call self._ready.set() when finished."""
39
- pass
40
-
41
- # ---------------------------- helpers ----------------------------
42
-
43
- def _to_model(self, data: Any) -> LLMServiceObj:
44
- # Defensive: ensure required nested objects are dicts, not None
45
- if data.get("FunctionCallData") is None:
46
- data["FunctionCallData"] = {}
47
- if data.get("UserInfo") is None:
48
- data["UserInfo"] = {}
49
- return LLMServiceObj(**data)
50
-
51
- async def _emit_result(
52
- self,
53
- obj: LLMServiceObj | Dict[str, Any],
54
- message: str,
55
- success: bool,
56
- queue: str,
57
- *,
58
- check_system: bool = False,
59
- include_llm_message: bool = True,
60
- ) -> None:
61
- """
62
- Build a ResultObj-style message on the wire, mirroring your .NET usage.
63
- check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
64
- """
65
- llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj)
66
-
67
- llm.ResultMessage = message
68
- llm.ResultSuccess = success
69
- if include_llm_message:
70
- llm.LlmMessage = _ok(message) if success else _err(message)
71
-
72
- if check_system and llm.IsSystemLlm:
73
- return
74
-
75
- # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
76
- await self._pub.publish(queue, llm)
77
-
78
- def _session_for(self, session_id: str) -> Optional[_Session]:
79
- return self._sessions.get(session_id)
80
-
81
- # ---------------------------- API methods ----------------------------
82
-
83
- async def StartProcess(self, payload: Any) -> None:
84
- llm = self._to_model(payload)
85
-
86
- # Validate critical fields
87
- if not llm.RequestSessionId:
88
- await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage")
89
- return
90
- if not llm.LLMRunnerType:
91
- await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage")
92
- return
93
-
94
- # Construct session id like C#: RequestSessionId + "_" + LLMRunnerType
95
- session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
96
- llm.SessionId = session_id
97
-
98
- # Wait ready (max 120s) exactly like the C# logic
99
- try:
100
- await asyncio.wait_for(self._ready.wait(), timeout=120)
101
- except asyncio.TimeoutError:
102
- await self._emit_result(
103
- llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
104
- )
105
- return
106
-
107
- sess = self._session_for(session_id)
108
- runner = sess.Runner if sess else None
109
- create_new = (runner is None) or getattr(runner, "IsStateFailed", False)
110
-
111
- if create_new:
112
- # Remove previous runner if exists
113
- if runner:
114
- try:
115
- await runner.RemoveProcess(session_id)
116
- except Exception:
117
- pass
118
-
119
- # Create runner from factory (pass a plain dict for decoupling)
120
- runner = await self._runner_factory({
121
- **llm.model_dump(by_alias=True),
122
- "_publisher": self._pub,
123
- "_settings": settings,
124
- })
125
- if not runner.IsEnabled:
126
- await self._emit_result(
127
- llm,
128
- f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
129
- True,
130
- "llmServiceMessage",
131
- )
132
- return
133
-
134
- await self._emit_result(
135
- llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
136
- )
137
-
138
- await runner.StartProcess(llm.model_dump(by_alias=True))
139
-
140
- self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
141
-
142
- # Friendly greeting for your renamed service
143
- if self._service_id_lc in {"monitor", "gradllm"}:
144
- await self._emit_result(
145
- llm,
146
- f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
147
- True,
148
- "llmServiceMessage",
149
- check_system=True,
150
- )
151
-
152
- # Notify "started" (full LLMServiceObj)
153
- await self._pub.publish("llmServiceStarted", llm)
154
-
155
- async def RemoveSession(self, payload: Any) -> None:
156
- llm = self._to_model(payload)
157
- base = (llm.SessionId or "").split("_")[0]
158
- if not base:
159
- await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage")
160
- return
161
-
162
- targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
163
- msgs: list[str] = []
164
- ok = True
165
-
166
- for sid in targets:
167
- s = self._sessions.get(sid)
168
- if not s or not s.Runner:
169
- continue
170
- try:
171
- await s.Runner.RemoveProcess(sid)
172
- s.Runner = None
173
- self._sessions.pop(sid, None) # ← free the entry
174
- msgs.append(sid)
175
- except Exception as e:
176
- ok = False
177
- msgs.append(f"Error {sid}: {e}")
178
-
179
- if ok:
180
- await self._emit_result(
181
- llm,
182
- f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
183
- True,
184
- "llmSessionMessage",
185
- check_system=True,
186
- )
187
- else:
188
- await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
189
-
190
- async def StopRequest(self, payload: Any) -> None:
191
- llm = self._to_model(payload)
192
- sid = llm.SessionId or ""
193
- s = self._session_for(sid)
194
- if not s or not s.Runner:
195
- await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
196
- return
197
-
198
- await s.Runner.StopRequest(sid)
199
- await self._emit_result(
200
- llm,
201
- f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
202
- True,
203
- "llmServiceMessage",
204
- check_system=True,
205
- )
206
-
207
- async def UserInput(self, payload: Any) -> None:
208
- llm = self._to_model(payload)
209
- sid = llm.SessionId or ""
210
- s = self._session_for(sid)
211
- if not s or not s.Runner:
212
- await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
213
- return
214
-
215
- r: ILLMRunner = s.Runner
216
- if getattr(r, "IsStateStarting", False):
217
- await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
218
- return
219
- if getattr(r, "IsStateFailed", False):
220
- await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
221
- return
222
-
223
- # Let runner push partials itself if desired; we still return a small ack
224
- await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))
225
-
226
- async def QueryIndexResult(self, payload: Any) -> None:
227
- try:
228
- data = payload if isinstance(payload, dict) else {}
229
- outputs = data.get("QueryResults") or []
230
- rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
231
-
232
- # NEW: show RAG to the chat like tool output
233
- await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Function Response:> {rag_data}\n\n"))
234
- await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="</functioncall-complete>"))
235
-
236
- # keep your existing summary object (nice for observers/metrics)
237
- await self._pub.publish(
238
- "llmServiceMessage",
239
- ResultObj(Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data),
240
- )
241
- except Exception as e:
242
- await self._pub.publish("llmServiceMessage", ResultObj(Message=str(e), Success=False))
243
-
244
- async def GetFunctionRegistry(self, filtered: bool = False) -> None:
245
- """
246
- Wire up to your real registry when ready.
247
- For now, mimic your success message payload.
248
- """
249
- catalog = "{}" # replace with real JSON
250
- msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
251
- await self._pub.publish(
252
- "llmServiceMessage",
253
- ResultObj(Message=msg, Success=True),
254
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
streaming.py DELETED
@@ -1,22 +0,0 @@
1
- # streaming.py
2
- import asyncio
3
- import logging
4
- logger = logging.getLogger(__name__)
5
-
6
- async def stream_in_chunks(publish, exchange: str, llm_obj_builder, text: str,
7
- batch_size: int = 3, max_chars: int = 100,
8
- base_delay_ms: int = 30, per_char_ms: int = 2) -> None:
9
- seps = set(" ,!?{}.:;\n")
10
- buf, parts, count = [], [], 0
11
- for ch in text:
12
- parts.append(ch)
13
- if ch in seps:
14
- buf.append("".join(parts)); parts.clear(); count += 1
15
- if count >= batch_size or sum(len(x) for x in buf) >= max_chars:
16
- o = llm_obj_builder("".join(buf))
17
- await publish(exchange, o)
18
- await asyncio.sleep((base_delay_ms + per_char_ms * sum(len(x) for x in buf))/1000)
19
- buf.clear(); count = 0
20
- if parts: buf.append("".join(parts))
21
- if buf:
22
- await publish(exchange, llm_obj_builder("".join(buf)))