import boto3 import os from datetime import datetime # DynamoDB client dynamodb = boto3.client("dynamodb", region_name="ap-south-1") # Tables (set via env vars or fallback defaults) COURSE_TABLE = os.environ.get("COURSE_TABLE", "CourseTracking") SESSION_TABLE = os.environ.get("SESSION_TABLE", "SessionTracking") def _utc_now(): """Return ISO 8601 UTC timestamp with Z suffix.""" return datetime.utcnow().isoformat() + "Z" def save_course(course_detail: dict, course_name: str): """Save a course (only once per course_id).""" now = _utc_now() # Enhanced course details with more information course_detail_item = { "course_id": {"N": str(course_detail["course_id"])}, "topic_id": {"N": str(course_detail.get("topic_id", 0))}, "chapter_id": {"N": str(course_detail.get("chapter_id", 0))}, "total_videos": {"N": str(course_detail.get("total_videos", 0))}, "programming_language": {"S": course_detail.get("programming_language", "Python")}, "target_language": {"S": course_detail.get("target_language", "english")}, "tts_voice": {"S": course_detail.get("tts_voice", "onyx")}, "tts_gender": {"S": course_detail.get("tts_gender", "male")} } dynamodb.put_item( TableName=COURSE_TABLE, Item={ "course_id": {"N": str(course_detail["course_id"])}, "course_name": {"S": course_name}, "course_detail": {"M": course_detail_item}, "total_videos": {"N": str(course_detail.get("total_videos", 0))}, "status": {"S": "active"}, "created_at": {"S": now}, "updated_at": {"S": now}, }, # Only create if course doesn't exist ConditionExpression="attribute_not_exists(course_id)" ) print(f"✅ Saved course: {course_name} (Course ID: {course_detail['course_id']})") def save_session(session_id, course_id, topic_id, topic_title, node, status, course_name=None): """Save a session record for a specific topic/video.""" now = _utc_now() item = { "session_id": {"S": session_id}, "course_id": {"N": str(course_id)}, "topic_id": {"N": str(topic_id)}, "topic_title": {"S": topic_title}, "status": {"S": status}, "node": {"S": node}, "created_at": {"S": now}, "updated_at": {"S": now}, } # Add course name if provided if course_name: item["course_name"] = {"S": course_name} dynamodb.put_item(TableName=SESSION_TABLE, Item=item) print(f"✅ Saved session {session_id} for course {course_id} - topic {topic_id}") def update_session(session_id, status=None, node=None, video_url=None): """Update session progress (status, node, timestamp, video_url).""" now = _utc_now() expr_attr_names = {"#ut": "updated_at"} expr_attr_values = {":u": {"S": now}} update_expr = ["#ut = :u"] if status: expr_attr_names["#st"] = "status" expr_attr_values[":s"] = {"S": status} update_expr.append("#st = :s") if node: expr_attr_names["#nd"] = "node" expr_attr_values[":n"] = {"S": node} update_expr.append("#nd = :n") if video_url: expr_attr_names["#vu"] = "video_url" expr_attr_values[":v"] = {"S": video_url} update_expr.append("#vu = :v") dynamodb.update_item( TableName=SESSION_TABLE, Key={"session_id": {"S": session_id}}, UpdateExpression="SET " + ", ".join(update_expr), ExpressionAttributeNames=expr_attr_names, ExpressionAttributeValues=expr_attr_values, ) print(f"✅ Updated session {session_id}: node={node}, status={status}, video_url={video_url}") def update_course_status(course_id, status, completed_videos=None): """Update course status and progress.""" now = _utc_now() expr_attr_names = {"#ut": "updated_at", "#st": "status"} expr_attr_values = {":u": {"S": now}, ":s": {"S": status}} update_expr = ["#ut = :u", "#st = :s"] if completed_videos is not None: expr_attr_names["#cv"] = "completed_videos" expr_attr_values[":c"] = {"N": str(completed_videos)} update_expr.append("#cv = :c") dynamodb.update_item( TableName=COURSE_TABLE, Key={"course_id": {"N": str(course_id)}}, UpdateExpression="SET " + ", ".join(update_expr), ExpressionAttributeNames=expr_attr_names, ExpressionAttributeValues=expr_attr_values, ) print(f"✅ Updated course {course_id}: status={status}, completed_videos={completed_videos}") # 🔧 Enhanced Helpers def get_course(course_id: int): """Fetch a course record by course_id.""" response = dynamodb.get_item( TableName=COURSE_TABLE, Key={"course_id": {"N": str(course_id)}} ) return response.get("Item") def get_session(session_id: str): """Fetch a session record by session_id.""" response = dynamodb.get_item( TableName=SESSION_TABLE, Key={"session_id": {"S": session_id}} ) return response.get("Item") def get_sessions_by_course(course_id: int): """Get all sessions for a specific course.""" response = dynamodb.scan( TableName=SESSION_TABLE, FilterExpression="course_id = :cid", ExpressionAttributeValues={":cid": {"N": str(course_id)}} ) return response.get("Items", []) def get_course_progress(course_id: int): """Get course progress statistics.""" sessions = get_sessions_by_course(course_id) if not sessions: return {"total": 0, "completed": 0, "processing": 0, "failed": 0, "progress_percent": 0} total = len(sessions) completed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() == "completed") processing = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["processing", "in_progress", "running"]) failed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["failed", "error"]) progress_percent = (completed / total) * 100 if total > 0 else 0 return { "total": total, "completed": completed, "processing": processing, "failed": failed, "progress_percent": round(progress_percent, 1) } def list_all_courses(): """List all courses with their basic info.""" response = dynamodb.scan(TableName=COURSE_TABLE) return response.get("Items", []) def list_recent_sessions(limit=10): """List recent sessions across all courses.""" response = dynamodb.scan(TableName=SESSION_TABLE) sessions = response.get("Items", []) # Sort by updated_at descending sessions.sort(key=lambda x: x.get("updated_at", {}).get("S", ""), reverse=True) return sessions[:limit] def delete_course(course_id: int): """Delete a course and all its associated sessions.""" try: # Delete all sessions for this course sessions = get_sessions_by_course(course_id) for session in sessions: session_id = session.get("session_id", {}).get("S") if session_id: dynamodb.delete_item( TableName=SESSION_TABLE, Key={"session_id": {"S": session_id}} ) # Delete the course dynamodb.delete_item( TableName=COURSE_TABLE, Key={"course_id": {"N": str(course_id)}} ) print(f"✅ Deleted course {course_id} and {len(sessions)} associated sessions") return True except Exception as e: print(f"❌ Error deleting course {course_id}: {e}") return False def _dict_to_dynamodb(data): """Recursively convert dict/list into DynamoDB structure.""" if isinstance(data, dict): return {"M": {k: _dict_to_dynamodb(v) for k, v in data.items()}} elif isinstance(data, list): return {"L": [_dict_to_dynamodb(v) for v in data]} elif isinstance(data, str): return {"S": data} elif isinstance(data, bool): return {"BOOL": data} elif isinstance(data, (int, float)): return {"N": str(data)} elif data is None: return {"NULL": True} else: raise TypeError(f"Unsupported type: {type(data)}") # Course name mapping helper COURSE_ID_TO_NAME = { 47: "AI Ready Course", 48: "MERN", 49: "Machine Learning", 50: "Data Science", 51: "Flask", 52: "Django" } def get_course_name_by_id(course_id): """Get course name by course ID.""" return COURSE_ID_TO_NAME.get(int(course_id), f"Course {course_id}") # Batch operations def save_course_with_sessions(course_detail, course_name, session_data_list): """Save course and multiple sessions in sequence.""" try: # Save course first save_course(course_detail, course_name) # Save all sessions for session_data in session_data_list: save_session( session_data["session_id"], session_data["course_id"], session_data["topic_id"], session_data["topic_title"], session_data.get("node", "initialization"), session_data.get("status", "created"), course_name ) print(f"✅ Saved course {course_name} with {len(session_data_list)} sessions") return True except Exception as e: print(f"❌ Error saving course with sessions: {e}") return False