| import base64 |
| import os |
| import uuid |
| from datetime import datetime, timedelta |
| from agent import AgentContext, UserMessage, AgentContextType |
| from helpers.api import ApiHandler, Request, Response |
| from helpers import files, projects |
| from helpers.print_style import PrintStyle |
| from helpers.projects import activate_project |
| from helpers.security import safe_filename |
| from initialize import initialize_agent |
| import threading |
|
|
|
|
| class ApiMessage(ApiHandler): |
| |
| _chat_lifetimes = {} |
| _cleanup_lock = threading.Lock() |
|
|
| @classmethod |
| def requires_auth(cls) -> bool: |
| return False |
|
|
| @classmethod |
| def requires_csrf(cls) -> bool: |
| return False |
|
|
| @classmethod |
| def requires_api_key(cls) -> bool: |
| return True |
|
|
| async def process(self, input: dict, request: Request) -> dict | Response: |
| |
| context_id = input.get("context_id", "") |
| message = input.get("message", "") |
| attachments = input.get("attachments", []) |
| lifetime_hours = input.get("lifetime_hours", 24) |
| project_name = input.get("project_name", None) |
| agent_profile = input.get("agent_profile", None) |
| |
| |
| override_settings = {} |
| if agent_profile: |
| override_settings["agent_profile"] = agent_profile |
|
|
| if not message: |
| return Response('{"error": "Message is required"}', status=400, mimetype="application/json") |
|
|
| |
| attachment_paths = [] |
| if attachments: |
| upload_folder_int = "/a0/usr/uploads" |
| upload_folder_ext = files.get_abs_path("usr/uploads") |
| os.makedirs(upload_folder_ext, exist_ok=True) |
|
|
| for attachment in attachments: |
| if not isinstance(attachment, dict) or "filename" not in attachment or "base64" not in attachment: |
| continue |
|
|
| try: |
| filename = safe_filename(attachment["filename"]) |
| if not filename: |
| raise ValueError("Invalid filename") |
|
|
| |
| file_content = base64.b64decode(attachment["base64"]) |
|
|
| |
| save_path = os.path.join(upload_folder_ext, filename) |
| with open(save_path, "wb") as f: |
| f.write(file_content) |
|
|
| attachment_paths.append(os.path.join(upload_folder_int, filename)) |
| except Exception as e: |
| PrintStyle.error(f"Failed to process attachment {attachment.get('filename', 'unknown')}: {e}") |
| continue |
|
|
| |
| if context_id: |
| context = AgentContext.use(context_id) |
| if not context: |
| return Response('{"error": "Context not found"}', status=404, mimetype="application/json") |
|
|
| |
| if agent_profile and context.agent0.config.profile != agent_profile: |
| return Response('{"error": "Cannot override agent profile on existing context"}', status=400, mimetype="application/json") |
| |
|
|
| |
| existing_project = context.get_data(projects.CONTEXT_DATA_KEY_PROJECT) |
| if project_name and existing_project and existing_project != project_name: |
| return Response('{"error": "Project can only be set on first message"}', status=400, mimetype="application/json") |
| else: |
| config = initialize_agent(override_settings=override_settings) |
| context = AgentContext(config=config, type=AgentContextType.USER) |
| AgentContext.use(context.id) |
| context_id = context.id |
| |
| if project_name: |
| try: |
| activate_project(context_id, project_name) |
| except Exception as e: |
| |
| error_msg = str(e) |
| PrintStyle.error(f"Failed to activate project '{project_name}' for context '{context_id}': {error_msg}") |
| return Response( |
| f'{{"error": "Failed to activate project \\"{project_name}\\""}}', |
| status=500, |
| mimetype="application/json", |
| ) |
|
|
| |
| if project_name: |
| try: |
| projects.activate_project(context_id, project_name) |
| except Exception as e: |
| return Response(f'{{"error": "Failed to activate project: {str(e)}"}}', status=400, mimetype="application/json") |
|
|
| |
| with self._cleanup_lock: |
| self._chat_lifetimes[context_id] = datetime.now() + timedelta(hours=lifetime_hours) |
|
|
| |
| try: |
| |
| attachment_filenames = [os.path.basename(path) for path in attachment_paths] if attachment_paths else [] |
|
|
| PrintStyle( |
| background_color="#6C3483", font_color="white", bold=True, padding=True |
| ).print("External API message:") |
| PrintStyle(font_color="white", padding=False).print(f"> {message}") |
| if attachment_filenames: |
| PrintStyle(font_color="white", padding=False).print("Attachments:") |
| for filename in attachment_filenames: |
| PrintStyle(font_color="white", padding=False).print(f"- {filename}") |
|
|
| |
| msg_id = str(uuid.uuid4()) |
| context.log.log( |
| type="user", |
| heading="", |
| content=message, |
| kvps={"attachments": attachment_filenames}, |
| id=msg_id, |
| ) |
|
|
| |
| task = context.communicate(UserMessage(message=message, attachments=attachment_paths, id=msg_id)) |
| result = await task.result() |
|
|
| |
| self._cleanup_expired_chats() |
|
|
| return { |
| "context_id": context_id, |
| "response": result |
| } |
|
|
| except Exception as e: |
| PrintStyle.error(f"External API error: {e}") |
| return Response(f'{{"error": "{str(e)}"}}', status=500, mimetype="application/json") |
|
|
| @classmethod |
| def _cleanup_expired_chats(cls): |
| """Clean up expired chats""" |
| with cls._cleanup_lock: |
| now = datetime.now() |
| expired_contexts = [ |
| context_id for context_id, expiry in cls._chat_lifetimes.items() |
| if now > expiry |
| ] |
|
|
| for context_id in expired_contexts: |
| try: |
| context = AgentContext.get(context_id) |
| if context: |
| context.reset() |
| AgentContext.remove(context_id) |
| del cls._chat_lifetimes[context_id] |
| PrintStyle().print(f"Cleaned up expired chat: {context_id}") |
| except Exception as e: |
| PrintStyle.error(f"Failed to cleanup chat {context_id}: {e}") |
|
|