Spaces:
Sleeping
Sleeping
File size: 7,030 Bytes
f871fed | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | from fastapi import APIRouter, HTTPException
from loguru import logger
from surreal_commands import get_command_status
from api.command_service import CommandService
from api.models import (
RebuildProgress,
RebuildRequest,
RebuildResponse,
RebuildStats,
RebuildStatusResponse,
)
from open_notebook.database.repository import repo_query
router = APIRouter()
@router.post("/rebuild", response_model=RebuildResponse)
async def start_rebuild(request: RebuildRequest):
"""
Start a background job to rebuild embeddings.
- **mode**: "existing" (re-embed items with embeddings) or "all" (embed everything)
- **include_sources**: Include sources in rebuild (default: true)
- **include_notes**: Include notes in rebuild (default: true)
- **include_insights**: Include insights in rebuild (default: true)
Returns command ID to track progress and estimated item count.
"""
try:
logger.info(f"Starting rebuild request: mode={request.mode}")
# Import commands to ensure they're registered
import commands.embedding_commands # noqa: F401
# Estimate total items (quick count query)
# This is a rough estimate before the command runs
total_estimate = 0
if request.include_sources:
if request.mode == "existing":
# Count sources with embeddings
result = await repo_query(
"""
SELECT VALUE count(array::distinct(
SELECT VALUE source.id
FROM source_embedding
WHERE embedding != none AND array::len(embedding) > 0
)) as count FROM {}
"""
)
else:
# Count all sources with content
result = await repo_query(
"SELECT VALUE count() as count FROM source WHERE full_text != none GROUP ALL"
)
if result and isinstance(result[0], dict):
total_estimate += result[0].get("count", 0)
elif result:
total_estimate += result[0] if isinstance(result[0], int) else 0
if request.include_notes:
if request.mode == "existing":
result = await repo_query(
"SELECT VALUE count() as count FROM note WHERE embedding != none AND array::len(embedding) > 0 GROUP ALL"
)
else:
result = await repo_query(
"SELECT VALUE count() as count FROM note WHERE content != none GROUP ALL"
)
if result and isinstance(result[0], dict):
total_estimate += result[0].get("count", 0)
elif result:
total_estimate += result[0] if isinstance(result[0], int) else 0
if request.include_insights:
if request.mode == "existing":
result = await repo_query(
"SELECT VALUE count() as count FROM source_insight WHERE embedding != none AND array::len(embedding) > 0 GROUP ALL"
)
else:
result = await repo_query(
"SELECT VALUE count() as count FROM source_insight GROUP ALL"
)
if result and isinstance(result[0], dict):
total_estimate += result[0].get("count", 0)
elif result:
total_estimate += result[0] if isinstance(result[0], int) else 0
logger.info(f"Estimated {total_estimate} items to process")
# Submit command
command_id = await CommandService.submit_command_job(
"open_notebook",
"rebuild_embeddings",
{
"mode": request.mode,
"include_sources": request.include_sources,
"include_notes": request.include_notes,
"include_insights": request.include_insights,
},
)
logger.info(f"Submitted rebuild command: {command_id}")
return RebuildResponse(
command_id=command_id,
total_items=total_estimate,
message=f"Rebuild operation started. Estimated {total_estimate} items to process.",
)
except Exception as e:
logger.error(f"Failed to start rebuild: {e}")
logger.exception(e)
raise HTTPException(
status_code=500, detail=f"Failed to start rebuild operation: {str(e)}"
)
@router.get("/rebuild/{command_id}/status", response_model=RebuildStatusResponse)
async def get_rebuild_status(command_id: str):
"""
Get the status of a rebuild operation.
Returns:
- **status**: queued, running, completed, failed
- **progress**: processed count, total count, percentage
- **stats**: breakdown by type (sources, notes, insights, failed)
- **timestamps**: started_at, completed_at
"""
try:
# Get command status from surreal_commands
status = await get_command_status(command_id)
if not status:
raise HTTPException(status_code=404, detail="Rebuild command not found")
# Build response based on status
response = RebuildStatusResponse(
command_id=command_id,
status=status.status,
)
# Extract metadata from command result
if status.result and isinstance(status.result, dict):
result = status.result
# Build progress info
if "total_items" in result and "processed_items" in result:
total = result["total_items"]
processed = result["processed_items"]
response.progress = RebuildProgress(
processed=processed,
total=total,
percentage=round((processed / total * 100) if total > 0 else 0, 2),
)
# Build stats
response.stats = RebuildStats(
sources=result.get("sources_processed", 0),
notes=result.get("notes_processed", 0),
insights=result.get("insights_processed", 0),
failed=result.get("failed_items", 0),
)
# Add timestamps
if hasattr(status, "created") and status.created:
response.started_at = str(status.created)
if hasattr(status, "updated") and status.updated:
response.completed_at = str(status.updated)
# Add error message if failed
if status.status == "failed" and status.result and isinstance(status.result, dict):
response.error_message = status.result.get(
"error_message", "Unknown error"
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get rebuild status: {e}")
logger.exception(e)
raise HTTPException(
status_code=500, detail=f"Failed to get rebuild status: {str(e)}"
)
|