File size: 9,531 Bytes
3927523 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
import boto3
import os
from datetime import datetime
# DynamoDB client
dynamodb = boto3.client("dynamodb", region_name="ap-south-1")
# Tables (set via env vars or fallback defaults)
COURSE_TABLE = os.environ.get("COURSE_TABLE", "CourseTracking")
SESSION_TABLE = os.environ.get("SESSION_TABLE", "SessionTracking")
def _utc_now():
"""Return ISO 8601 UTC timestamp with Z suffix."""
return datetime.utcnow().isoformat() + "Z"
def save_course(course_detail: dict, course_name: str):
"""Save a course (only once per course_id)."""
now = _utc_now()
# Enhanced course details with more information
course_detail_item = {
"course_id": {"N": str(course_detail["course_id"])},
"topic_id": {"N": str(course_detail.get("topic_id", 0))},
"chapter_id": {"N": str(course_detail.get("chapter_id", 0))},
"total_videos": {"N": str(course_detail.get("total_videos", 0))},
"programming_language": {"S": course_detail.get("programming_language", "Python")},
"target_language": {"S": course_detail.get("target_language", "english")},
"tts_voice": {"S": course_detail.get("tts_voice", "onyx")},
"tts_gender": {"S": course_detail.get("tts_gender", "male")}
}
dynamodb.put_item(
TableName=COURSE_TABLE,
Item={
"course_id": {"N": str(course_detail["course_id"])},
"course_name": {"S": course_name},
"course_detail": {"M": course_detail_item},
"total_videos": {"N": str(course_detail.get("total_videos", 0))},
"status": {"S": "active"},
"created_at": {"S": now},
"updated_at": {"S": now},
},
# Only create if course doesn't exist
ConditionExpression="attribute_not_exists(course_id)"
)
print(f"β
Saved course: {course_name} (Course ID: {course_detail['course_id']})")
def save_session(session_id, course_id, topic_id, topic_title, node, status, course_name=None):
"""Save a session record for a specific topic/video."""
now = _utc_now()
item = {
"session_id": {"S": session_id},
"course_id": {"N": str(course_id)},
"topic_id": {"N": str(topic_id)},
"topic_title": {"S": topic_title},
"status": {"S": status},
"node": {"S": node},
"created_at": {"S": now},
"updated_at": {"S": now},
}
# Add course name if provided
if course_name:
item["course_name"] = {"S": course_name}
dynamodb.put_item(TableName=SESSION_TABLE, Item=item)
print(f"β
Saved session {session_id} for course {course_id} - topic {topic_id}")
def update_session(session_id, status=None, node=None, video_url=None):
"""Update session progress (status, node, timestamp, video_url)."""
now = _utc_now()
expr_attr_names = {"#ut": "updated_at"}
expr_attr_values = {":u": {"S": now}}
update_expr = ["#ut = :u"]
if status:
expr_attr_names["#st"] = "status"
expr_attr_values[":s"] = {"S": status}
update_expr.append("#st = :s")
if node:
expr_attr_names["#nd"] = "node"
expr_attr_values[":n"] = {"S": node}
update_expr.append("#nd = :n")
if video_url:
expr_attr_names["#vu"] = "video_url"
expr_attr_values[":v"] = {"S": video_url}
update_expr.append("#vu = :v")
dynamodb.update_item(
TableName=SESSION_TABLE,
Key={"session_id": {"S": session_id}},
UpdateExpression="SET " + ", ".join(update_expr),
ExpressionAttributeNames=expr_attr_names,
ExpressionAttributeValues=expr_attr_values,
)
print(f"β
Updated session {session_id}: node={node}, status={status}, video_url={video_url}")
def update_course_status(course_id, status, completed_videos=None):
"""Update course status and progress."""
now = _utc_now()
expr_attr_names = {"#ut": "updated_at", "#st": "status"}
expr_attr_values = {":u": {"S": now}, ":s": {"S": status}}
update_expr = ["#ut = :u", "#st = :s"]
if completed_videos is not None:
expr_attr_names["#cv"] = "completed_videos"
expr_attr_values[":c"] = {"N": str(completed_videos)}
update_expr.append("#cv = :c")
dynamodb.update_item(
TableName=COURSE_TABLE,
Key={"course_id": {"N": str(course_id)}},
UpdateExpression="SET " + ", ".join(update_expr),
ExpressionAttributeNames=expr_attr_names,
ExpressionAttributeValues=expr_attr_values,
)
print(f"β
Updated course {course_id}: status={status}, completed_videos={completed_videos}")
# π§ Enhanced Helpers
def get_course(course_id: int):
"""Fetch a course record by course_id."""
response = dynamodb.get_item(
TableName=COURSE_TABLE,
Key={"course_id": {"N": str(course_id)}}
)
return response.get("Item")
def get_session(session_id: str):
"""Fetch a session record by session_id."""
response = dynamodb.get_item(
TableName=SESSION_TABLE,
Key={"session_id": {"S": session_id}}
)
return response.get("Item")
def get_sessions_by_course(course_id: int):
"""Get all sessions for a specific course."""
response = dynamodb.scan(
TableName=SESSION_TABLE,
FilterExpression="course_id = :cid",
ExpressionAttributeValues={":cid": {"N": str(course_id)}}
)
return response.get("Items", [])
def get_course_progress(course_id: int):
"""Get course progress statistics."""
sessions = get_sessions_by_course(course_id)
if not sessions:
return {"total": 0, "completed": 0, "processing": 0, "failed": 0, "progress_percent": 0}
total = len(sessions)
completed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() == "completed")
processing = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["processing", "in_progress", "running"])
failed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["failed", "error"])
progress_percent = (completed / total) * 100 if total > 0 else 0
return {
"total": total,
"completed": completed,
"processing": processing,
"failed": failed,
"progress_percent": round(progress_percent, 1)
}
def list_all_courses():
"""List all courses with their basic info."""
response = dynamodb.scan(TableName=COURSE_TABLE)
return response.get("Items", [])
def list_recent_sessions(limit=10):
"""List recent sessions across all courses."""
response = dynamodb.scan(TableName=SESSION_TABLE)
sessions = response.get("Items", [])
# Sort by updated_at descending
sessions.sort(key=lambda x: x.get("updated_at", {}).get("S", ""), reverse=True)
return sessions[:limit]
def delete_course(course_id: int):
"""Delete a course and all its associated sessions."""
try:
# Delete all sessions for this course
sessions = get_sessions_by_course(course_id)
for session in sessions:
session_id = session.get("session_id", {}).get("S")
if session_id:
dynamodb.delete_item(
TableName=SESSION_TABLE,
Key={"session_id": {"S": session_id}}
)
# Delete the course
dynamodb.delete_item(
TableName=COURSE_TABLE,
Key={"course_id": {"N": str(course_id)}}
)
print(f"β
Deleted course {course_id} and {len(sessions)} associated sessions")
return True
except Exception as e:
print(f"β Error deleting course {course_id}: {e}")
return False
def _dict_to_dynamodb(data):
"""Recursively convert dict/list into DynamoDB structure."""
if isinstance(data, dict):
return {"M": {k: _dict_to_dynamodb(v) for k, v in data.items()}}
elif isinstance(data, list):
return {"L": [_dict_to_dynamodb(v) for v in data]}
elif isinstance(data, str):
return {"S": data}
elif isinstance(data, bool):
return {"BOOL": data}
elif isinstance(data, (int, float)):
return {"N": str(data)}
elif data is None:
return {"NULL": True}
else:
raise TypeError(f"Unsupported type: {type(data)}")
# Course name mapping helper
COURSE_ID_TO_NAME = {
47: "AI Ready Course",
48: "MERN",
49: "Machine Learning",
50: "Data Science",
51: "Flask",
52: "Django"
}
def get_course_name_by_id(course_id):
"""Get course name by course ID."""
return COURSE_ID_TO_NAME.get(int(course_id), f"Course {course_id}")
# Batch operations
def save_course_with_sessions(course_detail, course_name, session_data_list):
"""Save course and multiple sessions in sequence."""
try:
# Save course first
save_course(course_detail, course_name)
# Save all sessions
for session_data in session_data_list:
save_session(
session_data["session_id"],
session_data["course_id"],
session_data["topic_id"],
session_data["topic_title"],
session_data.get("node", "initialization"),
session_data.get("status", "created"),
course_name
)
print(f"β
Saved course {course_name} with {len(session_data_list)} sessions")
return True
except Exception as e:
print(f"β Error saving course with sessions: {e}")
return False |