Spaces:
Sleeping
Sleeping
File size: 5,656 Bytes
3a507e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import uuid
import traceback
import logging
from enum import Enum
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from services.supabase_service import db
from services.github_service import GitHubService
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
GITHUB_API = "github_api"
GEMINI_API = "gemini_api"
SUPABASE = "supabase"
FIREBASE_AUTH = "firebase_auth"
WEBHOOK = "webhook"
AGENT_TASK = "agent_task"
CONTEXT_BUILD = "context_build"
TEMPLATE = "template"
VERSIONING = "versioning"
EXTERNAL_API = "external_api"
class ErrorService:
def __init__(self):
self.github_service = GitHubService()
async def log_error(
self,
category: ErrorCategory,
error: Exception,
context: Dict[str, Any],
repo_id: Optional[str] = None,
task_id: Optional[str] = None,
severity: str = "error"
):
"""Log an error to the database and optionally create a GitHub issue."""
error_msg = str(error)
tb = traceback.format_exc()
logger.error(f"[ERROR_SVC] Logging {severity} error in {category.value}: {error_msg}")
if repo_id:
logger.error(f"[ERROR_SVC] Repo ID: {repo_id}")
if task_id:
logger.error(f"[ERROR_SVC] Task ID: {task_id}")
log_data = {
"repo_id": repo_id,
"task_id": task_id,
"category": category.value,
"severity": severity,
"message": error_msg,
"traceback": tb,
"context": context
}
# Save to Supabase
try:
result = db.client.table("error_logs").insert(log_data).execute()
log_record = result.data[0] if result.data else None
logger.info(f"[ERROR_SVC] Error logged to database with ID: {log_record.get('id') if log_record else 'unknown'}")
except Exception as e:
logger.error(f"[ERROR_SVC] Failed to log error to Supabase: {e}")
return None
# If critical and we have a repo_id, create a GitHub issue
if severity == "critical" and repo_id:
logger.warning(f"[ERROR_SVC] Critical error detected for repo {repo_id}. Attempting to create GitHub issue.")
try:
repo = await db.get_repo_by_id(repo_id)
if repo and repo.get("github_full_name"):
issue_title = f"[ContriBot Error] {category.value}: {error_msg[:50]}..."
issue_body = f"""
## ContriBot Critical Error
An automated critical error was detected by ContriBot.
**Category:** {category.value}
**Context:**
```json
{context}
```
**Traceback:**
```python
{tb}
```
"""
logger.info(f"[ERROR_SVC] Would create GitHub issue on {repo['github_full_name']}: {issue_title}")
# We would need the user's installation token here, but for now we'll just log it
# In a real scenario, we'd fetch the installation token and create the issue
# await self.github_service.create_issue(installation_id, repo["github_full_name"], issue_title, issue_body, labels=["contribot-error"])
pass
except Exception as e:
logger.error(f"[ERROR_SVC] Failed to create GitHub issue for critical error: {e}")
return log_record
async def get_errors(self, repo_id: Optional[str] = None, category: Optional[str] = None, since_hours: int = 24) -> List[Dict[str, Any]]:
"""Get recent errors, optionally filtered by repo or category."""
time_threshold = (datetime.utcnow() - timedelta(hours=since_hours)).isoformat()
query = db.client.table("error_logs").select("*").gte("created_at", time_threshold).order("created_at", desc=True)
if repo_id:
query = query.eq("repo_id", repo_id)
if category:
query = query.eq("category", category)
result = query.execute()
return result.data
async def get_error_summary(self) -> Dict[str, Any]:
"""Get a summary of recent errors."""
time_threshold = (datetime.utcnow() - timedelta(hours=24)).isoformat()
# Get all recent errors
result = db.client.table("error_logs").select("category, severity, resolved").gte("created_at", time_threshold).execute()
errors = result.data
summary = {
"total_24h": len(errors),
"unresolved": len([e for e in errors if not e.get("resolved")]),
"critical": len([e for e in errors if e.get("severity") == "critical"]),
"by_category": {}
}
for e in errors:
cat = e.get("category")
if cat not in summary["by_category"]:
summary["by_category"][cat] = 0
summary["by_category"][cat] += 1
return summary
async def clear_old_errors(self, days: int = 30):
"""Delete errors older than the specified number of days."""
time_threshold = (datetime.utcnow() - timedelta(days=days)).isoformat()
db.client.table("error_logs").delete().lt("created_at", time_threshold).execute()
async def mark_resolved(self, error_id: str, resolved: bool = True):
"""Mark an error as resolved or unresolved."""
result = db.client.table("error_logs").update({"resolved": resolved}).eq("id", error_id).execute()
return result.data[0] if result.data else None
error_service = ErrorService()
|