Spaces:
Sleeping
Sleeping
| import os | |
| from datetime import datetime, timedelta | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pymongo import MongoClient | |
| from bson import ObjectId | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| app = FastAPI(title="QuickTask Analytics Service") | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # MongoDB connection | |
| client = MongoClient(os.getenv("MONGO_URI")) | |
| db = client.quicktask | |
| tasks_collection = db.tasks | |
| async def root(): | |
| return {"message": "QuickTask Analytics Service API"} | |
| async def get_user_stats(user_id: str): | |
| try: | |
| user_oid = ObjectId(user_id) | |
| # Total tasks | |
| total_tasks = tasks_collection.count_documents({"user": user_oid}) | |
| if total_tasks == 0: | |
| return { | |
| "total_tasks": 0, | |
| "avg_completion_time_hrs": 0, | |
| "overdue_tasks": 0, | |
| "productivity_score": 0 | |
| } | |
| # Completed tasks | |
| completed_tasks = list(tasks_collection.find({"user": user_oid, "status": "completed"})) | |
| completed_count = len(completed_tasks) | |
| # Average completion time | |
| total_completion_time_sec = 0 | |
| for task in completed_tasks: | |
| # Re-calculating from timestamps | |
| created_at = task.get("createdAt") | |
| updated_at = task.get("updatedAt") | |
| if created_at and updated_at: | |
| diff = (updated_at - created_at).total_seconds() | |
| total_completion_time_sec += diff | |
| avg_completion_time = (total_completion_time_sec / completed_count / 3600) if completed_count > 0 else 0 | |
| # Overdue tasks | |
| now = datetime.now() | |
| overdue_tasks = tasks_collection.count_documents({ | |
| "user": user_oid, | |
| "status": {"$ne": "completed"}, | |
| "dueDate": {"$lt": now} | |
| }) | |
| # Productivity score | |
| productivity_score = (completed_count / total_tasks * 100) | |
| return { | |
| "total_tasks": total_tasks, | |
| "avg_completion_time_hrs": round(avg_completion_time, 2), | |
| "overdue_tasks": overdue_tasks, | |
| "productivity_score": round(productivity_score, 2) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_productivity_trends(user_id: str, days: int = Query(7, ge=1, le=30)): | |
| try: | |
| user_oid = ObjectId(user_id) | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days) | |
| # Aggregate tasks completed per day | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "user": user_oid, | |
| "status": "completed", | |
| "updatedAt": {"$gte": start_date, "$lte": end_date} | |
| } | |
| }, | |
| { | |
| "$group": { | |
| "_id": { | |
| "$dateToString": {"format": "%Y-%m-%d", "date": "$updatedAt"} | |
| }, | |
| "count": {"$sum": 1} | |
| } | |
| }, | |
| {"$sort": {"_id": 1}} | |
| ] | |
| results = list(tasks_collection.aggregate(pipeline)) | |
| # Fill in missing dates | |
| trend_data = [] | |
| current_date = start_date | |
| results_dict = {item["_id"]: item["count"] for item in results} | |
| while current_date <= end_date: | |
| date_str = current_date.strftime("%Y-%m-%d") | |
| trend_data.append({ | |
| "date": date_str, | |
| "count": results_dict.get(date_str, 0) | |
| }) | |
| current_date += timedelta(days=1) | |
| return trend_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 8000)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |