samchun-gemini / lib /job_manager.py
JHyeok5's picture
Upload folder using huggingface_hub
f583944 verified
"""
Job status management for course_generation_jobs table
Provides helpers to update job progress from HuggingFace Space backend
"""
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from db import get_supabase
logger = logging.getLogger(__name__)
async def update_job_progress(
job_id: str,
progress: int,
message: str
) -> bool:
"""
Update job progress. On first call (progress <= 5), also sets status to 'processing'.
Args:
job_id: Job UUID
progress: Progress percentage (0-100)
message: Progress message to display
Returns:
bool: True if update succeeded
"""
try:
supabase = get_supabase()
update_data = {
"progress": progress,
"progress_message": message,
"updated_at": datetime.now(timezone.utc).isoformat()
}
# First progress update: transition from 'pending' to 'processing'
if progress <= 5:
update_data["status"] = "processing"
update_data["started_at"] = datetime.now(timezone.utc).isoformat()
result = supabase.table("course_generation_jobs").update(
update_data
).eq("id", job_id).execute()
if result.data:
logger.info(f"[job_manager] Job {job_id} progress: {progress}% - {message}")
return True
else:
logger.warning(f"[job_manager] No job found with id {job_id}")
return False
except Exception as e:
logger.error(f"[job_manager] Failed to update job progress: {e}")
return False
async def complete_job(
job_id: str,
result: Dict[str, Any]
) -> bool:
"""
Mark job as completed
Args:
job_id: Job UUID
result: Result data (e.g., {"courses": [...], "reasoning": "..."})
Returns:
bool: True if update succeeded
Example:
await complete_job(
job_id="123e4567-e89b-12d3-a456-426614174000",
result={"courses": [course1, course2], "reasoning": "..."}
)
"""
try:
supabase = get_supabase()
update_result = supabase.table("course_generation_jobs").update({
"status": "completed",
"result": result,
"progress": 100,
"progress_message": "completed",
"completed_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat()
}).eq("id", job_id).execute()
if update_result.data:
logger.info(f"[job_manager] Job {job_id} completed successfully")
return True
else:
logger.warning(f"[job_manager] No job found with id {job_id}")
return False
except Exception as e:
logger.error(f"[job_manager] Failed to complete job: {e}")
return False
async def fail_job(
job_id: str,
error: str
) -> bool:
"""
Mark job as failed
Args:
job_id: Job UUID
error: Error message
Returns:
bool: True if update succeeded
Example:
await fail_job(
job_id="123e4567-e89b-12d3-a456-426614174000",
error="Gemini API timeout"
)
"""
try:
supabase = get_supabase()
update_result = supabase.table("course_generation_jobs").update({
"status": "failed",
"error": error,
"progress_message": f"failed: {error}",
"completed_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat()
}).eq("id", job_id).execute()
if update_result.data:
logger.warning(f"[job_manager] Job {job_id} failed: {error}")
return True
else:
logger.warning(f"[job_manager] No job found with id {job_id}")
return False
except Exception as e:
logger.error(f"[job_manager] Failed to mark job as failed: {e}")
return False
async def get_job_status(job_id: str) -> Optional[Dict[str, Any]]:
"""
Get current job status (for debugging/monitoring)
Args:
job_id: Job UUID
Returns:
Optional[Dict]: Job data or None if not found
Example:
status = await get_job_status("123e4567-e89b-12d3-a456-426614174000")
print(status["status"], status["progress"])
"""
try:
supabase = get_supabase()
result = supabase.table("course_generation_jobs") \
.select("*") \
.eq("id", job_id) \
.execute()
if result.data and len(result.data) > 0:
return result.data[0]
else:
logger.warning(f"[job_manager] No job found with id {job_id}")
return None
except Exception as e:
logger.error(f"[job_manager] Failed to get job status: {e}")
return None