| from agent import AgentContext, UserMessage |
| from python.helpers.api import ApiHandler, Request, Response |
| from python.helpers import files, dotenv |
| from initialize import initialize_agent |
| import os |
| import json |
| import base64 |
| import queue |
| import traceback |
|
|
| class Stream(ApiHandler): |
| @classmethod |
| def requires_auth(cls) -> bool: |
| return False |
|
|
| @classmethod |
| def requires_csrf(cls) -> bool: |
| return False |
|
|
| @classmethod |
| def requires_api_key(cls) -> bool: |
| return True |
|
|
| async def process(self, input: dict, request: Request) -> Response: |
| try: |
| text = input.get("message") or input.get("text") or "" |
| ctxid = input.get("context") |
| subagent = input.get("subagent") or input.get("profile") |
| file_data = input.get("file") |
| file_name = input.get("file_name", "uploaded_file") |
| |
| dotenv.load_dotenv() |
| |
| |
| blablador_key = os.getenv("BLABLADOR_API_KEY") |
| if blablador_key: |
| os.environ.setdefault("OTHER_API_KEY", blablador_key) |
| os.environ.setdefault("API_KEY_OTHER", blablador_key) |
|
|
| context = self.get_context(ctxid) |
| config = initialize_agent() |
| |
| if config.chat_model.provider == "Other OpenAI compatible": |
| config.chat_model.provider = "other" |
| if config.utility_model.provider == "Other OpenAI compatible": |
| config.utility_model.provider = "other" |
|
|
| if subagent: |
| config.profile = subagent |
| if subagent not in config.knowledge_subdirs: |
| config.knowledge_subdirs.append(subagent) |
| |
| context.config = config |
| curr_agent = context.agent0 |
| while curr_agent: |
| curr_agent.config = config |
| curr_agent = curr_agent.data.get(curr_agent.DATA_NAME_SUBORDINATE) |
|
|
| attachment_paths = [] |
| if file_data: |
| |
| file_name = os.path.basename(file_name) |
| knowledge_dir = files.get_abs_path("knowledge/custom") |
| os.makedirs(knowledge_dir, exist_ok=True) |
| save_path = os.path.join(knowledge_dir, file_name) |
| try: |
| if isinstance(file_data, str) and "," in file_data: |
| header, encoded = file_data.split(",", 1) |
| file_data = encoded |
| decoded_data = base64.b64decode(file_data) |
| with open(save_path, "wb") as f: |
| f.write(decoded_data) |
| except Exception: |
| with open(save_path, "w") as f: |
| f.write(str(file_data)) |
| attachment_paths.append(save_path) |
|
|
| sync_queue = queue.Queue() |
| context.stream_queue = sync_queue |
| |
| msg = UserMessage(text, attachment_paths) |
| task = context.communicate(msg) |
|
|
| def generate(): |
| try: |
| while task.is_alive() or not sync_queue.empty(): |
| try: |
| chunk = sync_queue.get(timeout=0.1) |
| yield f"data: {json.dumps(chunk)}\n\n" |
| except queue.Empty: |
| if not task.is_alive(): |
| break |
| continue |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'text': str(e)})}\n\n" |
| break |
| |
| try: |
| result = task.result_sync(timeout=300) |
| yield f"data: {json.dumps({'type': 'final', 'text': result})}\n\n" |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'text': f'Result error: {str(e)}'})}\n\n" |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'text': f'Generator error: {str(e)}'})}\n\n" |
| finally: |
| if hasattr(context, 'stream_queue'): |
| delattr(context, 'stream_queue') |
|
|
| return Response(generate(), mimetype='text/event-stream') |
| except Exception as e: |
| return Response(f"Error: {str(e)}\n{traceback.format_exc()}", status=500) |
|
|