Spaces:
Runtime error
Runtime error
| import logging | |
| from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Response | |
| from fastapi_pagination import Page | |
| from fastapi_pagination.ext.sqlalchemy import apaginate | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src import crud, schemas | |
| from src.config import settings | |
| from src.dependencies import db | |
| from src.deriver.enqueue import enqueue_deletion, enqueue_dream | |
| from src.exceptions import AuthenticationException | |
| from src.security import JWTParams, require_auth | |
| from src.utils.search import search | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter( | |
| prefix="/workspaces", | |
| tags=["workspaces"], | |
| ) | |
| async def get_or_create_workspace( | |
| response: Response, | |
| workspace: schemas.WorkspaceCreate = Body( | |
| ..., description="Workspace creation parameters" | |
| ), | |
| jwt_params: JWTParams = Depends(require_auth()), | |
| db: AsyncSession = db, | |
| ): | |
| """ | |
| Get a Workspace by ID. | |
| If workspace_id is provided as a query parameter, it uses that (must match JWT workspace_id). | |
| Otherwise, it uses the workspace_id from the JWT. | |
| """ | |
| # If workspace_id provided in query, check if it matches jwt or user is admin | |
| if workspace.name: | |
| if not jwt_params.ad and jwt_params.w != workspace.name: | |
| raise AuthenticationException("Unauthorized access to resource") | |
| else: | |
| # Use workspace_id from JWT | |
| if not jwt_params.w: | |
| raise AuthenticationException( | |
| "Workspace ID not found in query parameter or JWT" | |
| ) | |
| workspace.name = jwt_params.w | |
| result = await crud.get_or_create_workspace(db, workspace=workspace) | |
| await db.commit() | |
| await result.post_commit() | |
| response.status_code = 201 if result.created else 200 | |
| return result.resource | |
| async def get_all_workspaces( | |
| options: schemas.WorkspaceGet | None = Body( | |
| None, description="Filtering and pagination options for the workspaces list" | |
| ), | |
| db: AsyncSession = db, | |
| ): | |
| """Get all Workspaces, paginated with optional filters.""" | |
| filter_param = None | |
| if options and hasattr(options, "filters"): | |
| filter_param = options.filters | |
| if filter_param == {}: | |
| filter_param = None | |
| return await apaginate( | |
| db, | |
| await crud.get_all_workspaces(filters=filter_param), | |
| ) | |
| async def update_workspace( | |
| workspace_id: str = Path(...), | |
| workspace: schemas.WorkspaceUpdate = Body( | |
| ..., description="Updated workspace parameters" | |
| ), | |
| db: AsyncSession = db, | |
| ): | |
| """Update Workspace metadata and/or configuration.""" | |
| # ResourceNotFoundException will be caught by global handler if workspace not found | |
| honcho_workspace = await crud.update_workspace( | |
| db, workspace_name=workspace_id, workspace=workspace | |
| ) | |
| return honcho_workspace | |
| async def delete_workspace( | |
| workspace_id: str = Path(...), | |
| db: AsyncSession = db, | |
| ): | |
| """ | |
| Delete a Workspace. This accepts the deletion request and processes it in the background, | |
| permanently deleting all peers, messages, conclusions, and other resources associated | |
| with the workspace. | |
| Returns 409 Conflict if the workspace contains active sessions. | |
| Delete all sessions first, then delete the workspace. | |
| This action cannot be undone. | |
| """ | |
| # Verify workspace exists | |
| await crud.get_workspace(db, workspace_name=workspace_id) | |
| # Check for active sessions before accepting | |
| await crud.check_no_active_sessions(db, workspace_name=workspace_id) | |
| # Enqueue for background deletion | |
| await enqueue_deletion(workspace_id, "workspace", workspace_id, db_session=db) | |
| await db.commit() | |
| return {"message": "Workspace deletion accepted"} | |
| async def search_workspace( | |
| workspace_id: str = Path(...), | |
| body: schemas.MessageSearchOptions = Body( | |
| ..., description="Message search parameters" | |
| ), | |
| ): | |
| """ | |
| Search messages in a Workspace using optional filters. Use `limit` to control the number of | |
| results returned. | |
| """ | |
| # take user-provided filter and add workspace_id to it | |
| filters = body.filters or {} | |
| filters["workspace_id"] = workspace_id | |
| return await search(body.query, filters=filters, limit=body.limit) | |
| async def get_queue_status( | |
| workspace_id: str = Path(...), | |
| observer_id: str | None = Query( | |
| None, description="Optional observer ID to filter by" | |
| ), | |
| sender_id: str | None = Query(None, description="Optional sender ID to filter by"), | |
| session_id: str | None = Query( | |
| None, description="Optional session ID to filter by" | |
| ), | |
| db: AsyncSession = db, | |
| ): | |
| """ | |
| Get the processing queue status for a Workspace, optionally scoped to an observer, sender, | |
| and/or session. | |
| Only tracks user-facing task types (representation, summary, dream). | |
| Internal infrastructure tasks (reconciler, webhook, deletion) are excluded. | |
| Note: completed counts reflect items since the last periodic queue cleanup, | |
| not lifetime totals. | |
| """ | |
| try: | |
| return await crud.get_queue_status( | |
| db, | |
| workspace_name=workspace_id, | |
| session_name=session_id, | |
| observer=observer_id, | |
| observed=sender_id, | |
| ) | |
| except ValueError as e: | |
| logger.warning(f"Invalid request parameters: {str(e)}") | |
| raise HTTPException(status_code=400, detail=str(e)) from e | |
| async def schedule_dream( | |
| workspace_id: str = Path(...), | |
| request: schemas.ScheduleDreamRequest = Body( | |
| ..., description="Dream scheduling parameters" | |
| ), | |
| ): | |
| """ | |
| Manually schedule a dream task for a specific collection. | |
| This endpoint bypasses all automatic dream conditions (document threshold, | |
| minimum hours between dreams) and schedules the dream task for a future execution. | |
| Currently this endpoint only supports scheduling immediate dreams. In the future, | |
| users may pass a cron-style expression to schedule dreams at specific times. | |
| """ | |
| # Check if dreams are enabled | |
| if not settings.DREAM.ENABLED: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Dreams are not enabled in the system configuration", | |
| ) | |
| # Default observed to observer if not provided | |
| observer = request.observer | |
| observed = request.observed if request.observed is not None else request.observer | |
| dream_type = request.dream_type | |
| await enqueue_dream( | |
| workspace_id, | |
| observer=observer, | |
| observed=observed, | |
| dream_type=dream_type, | |
| session_name=request.session_id, | |
| ) | |
| logger.info( | |
| "Manually scheduled dream: %s for %s/%s/%s (session: %s)", | |
| dream_type.value, | |
| workspace_id, | |
| observer, | |
| observed, | |
| request.session_id, | |
| ) | |