|
|
import boto3 |
|
|
import os |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
dynamodb = boto3.client("dynamodb", region_name="ap-south-1") |
|
|
|
|
|
|
|
|
COURSE_TABLE = os.environ.get("COURSE_TABLE", "CourseTracking") |
|
|
SESSION_TABLE = os.environ.get("SESSION_TABLE", "SessionTracking") |
|
|
|
|
|
|
|
|
def _utc_now(): |
|
|
"""Return ISO 8601 UTC timestamp with Z suffix.""" |
|
|
return datetime.utcnow().isoformat() + "Z" |
|
|
|
|
|
|
|
|
def save_course(course_detail: dict, course_name: str): |
|
|
"""Save a course (only once per course_id).""" |
|
|
now = _utc_now() |
|
|
|
|
|
|
|
|
course_detail_item = { |
|
|
"course_id": {"N": str(course_detail["course_id"])}, |
|
|
"topic_id": {"N": str(course_detail.get("topic_id", 0))}, |
|
|
"chapter_id": {"N": str(course_detail.get("chapter_id", 0))}, |
|
|
"total_videos": {"N": str(course_detail.get("total_videos", 0))}, |
|
|
"programming_language": {"S": course_detail.get("programming_language", "Python")}, |
|
|
"target_language": {"S": course_detail.get("target_language", "english")}, |
|
|
"tts_voice": {"S": course_detail.get("tts_voice", "onyx")}, |
|
|
"tts_gender": {"S": course_detail.get("tts_gender", "male")} |
|
|
} |
|
|
|
|
|
dynamodb.put_item( |
|
|
TableName=COURSE_TABLE, |
|
|
Item={ |
|
|
"course_id": {"N": str(course_detail["course_id"])}, |
|
|
"course_name": {"S": course_name}, |
|
|
"course_detail": {"M": course_detail_item}, |
|
|
"total_videos": {"N": str(course_detail.get("total_videos", 0))}, |
|
|
"status": {"S": "active"}, |
|
|
"created_at": {"S": now}, |
|
|
"updated_at": {"S": now}, |
|
|
}, |
|
|
|
|
|
ConditionExpression="attribute_not_exists(course_id)" |
|
|
) |
|
|
print(f"β
Saved course: {course_name} (Course ID: {course_detail['course_id']})") |
|
|
|
|
|
|
|
|
def save_session(session_id, course_id, topic_id, topic_title, node, status, course_name=None): |
|
|
"""Save a session record for a specific topic/video.""" |
|
|
now = _utc_now() |
|
|
|
|
|
item = { |
|
|
"session_id": {"S": session_id}, |
|
|
"course_id": {"N": str(course_id)}, |
|
|
"topic_id": {"N": str(topic_id)}, |
|
|
"topic_title": {"S": topic_title}, |
|
|
"status": {"S": status}, |
|
|
"node": {"S": node}, |
|
|
"created_at": {"S": now}, |
|
|
"updated_at": {"S": now}, |
|
|
} |
|
|
|
|
|
|
|
|
if course_name: |
|
|
item["course_name"] = {"S": course_name} |
|
|
|
|
|
dynamodb.put_item(TableName=SESSION_TABLE, Item=item) |
|
|
print(f"β
Saved session {session_id} for course {course_id} - topic {topic_id}") |
|
|
|
|
|
|
|
|
def update_session(session_id, status=None, node=None, video_url=None): |
|
|
"""Update session progress (status, node, timestamp, video_url).""" |
|
|
now = _utc_now() |
|
|
|
|
|
expr_attr_names = {"#ut": "updated_at"} |
|
|
expr_attr_values = {":u": {"S": now}} |
|
|
update_expr = ["#ut = :u"] |
|
|
|
|
|
if status: |
|
|
expr_attr_names["#st"] = "status" |
|
|
expr_attr_values[":s"] = {"S": status} |
|
|
update_expr.append("#st = :s") |
|
|
|
|
|
if node: |
|
|
expr_attr_names["#nd"] = "node" |
|
|
expr_attr_values[":n"] = {"S": node} |
|
|
update_expr.append("#nd = :n") |
|
|
|
|
|
if video_url: |
|
|
expr_attr_names["#vu"] = "video_url" |
|
|
expr_attr_values[":v"] = {"S": video_url} |
|
|
update_expr.append("#vu = :v") |
|
|
|
|
|
dynamodb.update_item( |
|
|
TableName=SESSION_TABLE, |
|
|
Key={"session_id": {"S": session_id}}, |
|
|
UpdateExpression="SET " + ", ".join(update_expr), |
|
|
ExpressionAttributeNames=expr_attr_names, |
|
|
ExpressionAttributeValues=expr_attr_values, |
|
|
) |
|
|
print(f"β
Updated session {session_id}: node={node}, status={status}, video_url={video_url}") |
|
|
|
|
|
|
|
|
def update_course_status(course_id, status, completed_videos=None): |
|
|
"""Update course status and progress.""" |
|
|
now = _utc_now() |
|
|
|
|
|
expr_attr_names = {"#ut": "updated_at", "#st": "status"} |
|
|
expr_attr_values = {":u": {"S": now}, ":s": {"S": status}} |
|
|
update_expr = ["#ut = :u", "#st = :s"] |
|
|
|
|
|
if completed_videos is not None: |
|
|
expr_attr_names["#cv"] = "completed_videos" |
|
|
expr_attr_values[":c"] = {"N": str(completed_videos)} |
|
|
update_expr.append("#cv = :c") |
|
|
|
|
|
dynamodb.update_item( |
|
|
TableName=COURSE_TABLE, |
|
|
Key={"course_id": {"N": str(course_id)}}, |
|
|
UpdateExpression="SET " + ", ".join(update_expr), |
|
|
ExpressionAttributeNames=expr_attr_names, |
|
|
ExpressionAttributeValues=expr_attr_values, |
|
|
) |
|
|
print(f"β
Updated course {course_id}: status={status}, completed_videos={completed_videos}") |
|
|
|
|
|
|
|
|
|
|
|
def get_course(course_id: int): |
|
|
"""Fetch a course record by course_id.""" |
|
|
response = dynamodb.get_item( |
|
|
TableName=COURSE_TABLE, |
|
|
Key={"course_id": {"N": str(course_id)}} |
|
|
) |
|
|
return response.get("Item") |
|
|
|
|
|
|
|
|
def get_session(session_id: str): |
|
|
"""Fetch a session record by session_id.""" |
|
|
response = dynamodb.get_item( |
|
|
TableName=SESSION_TABLE, |
|
|
Key={"session_id": {"S": session_id}} |
|
|
) |
|
|
return response.get("Item") |
|
|
|
|
|
|
|
|
def get_sessions_by_course(course_id: int): |
|
|
"""Get all sessions for a specific course.""" |
|
|
response = dynamodb.scan( |
|
|
TableName=SESSION_TABLE, |
|
|
FilterExpression="course_id = :cid", |
|
|
ExpressionAttributeValues={":cid": {"N": str(course_id)}} |
|
|
) |
|
|
return response.get("Items", []) |
|
|
|
|
|
|
|
|
def get_course_progress(course_id: int): |
|
|
"""Get course progress statistics.""" |
|
|
sessions = get_sessions_by_course(course_id) |
|
|
|
|
|
if not sessions: |
|
|
return {"total": 0, "completed": 0, "processing": 0, "failed": 0, "progress_percent": 0} |
|
|
|
|
|
total = len(sessions) |
|
|
completed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() == "completed") |
|
|
processing = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["processing", "in_progress", "running"]) |
|
|
failed = sum(1 for s in sessions if s.get("status", {}).get("S", "").lower() in ["failed", "error"]) |
|
|
|
|
|
progress_percent = (completed / total) * 100 if total > 0 else 0 |
|
|
|
|
|
return { |
|
|
"total": total, |
|
|
"completed": completed, |
|
|
"processing": processing, |
|
|
"failed": failed, |
|
|
"progress_percent": round(progress_percent, 1) |
|
|
} |
|
|
|
|
|
|
|
|
def list_all_courses(): |
|
|
"""List all courses with their basic info.""" |
|
|
response = dynamodb.scan(TableName=COURSE_TABLE) |
|
|
return response.get("Items", []) |
|
|
|
|
|
|
|
|
def list_recent_sessions(limit=10): |
|
|
"""List recent sessions across all courses.""" |
|
|
response = dynamodb.scan(TableName=SESSION_TABLE) |
|
|
sessions = response.get("Items", []) |
|
|
|
|
|
|
|
|
sessions.sort(key=lambda x: x.get("updated_at", {}).get("S", ""), reverse=True) |
|
|
|
|
|
return sessions[:limit] |
|
|
|
|
|
|
|
|
def delete_course(course_id: int): |
|
|
"""Delete a course and all its associated sessions.""" |
|
|
try: |
|
|
|
|
|
sessions = get_sessions_by_course(course_id) |
|
|
for session in sessions: |
|
|
session_id = session.get("session_id", {}).get("S") |
|
|
if session_id: |
|
|
dynamodb.delete_item( |
|
|
TableName=SESSION_TABLE, |
|
|
Key={"session_id": {"S": session_id}} |
|
|
) |
|
|
|
|
|
|
|
|
dynamodb.delete_item( |
|
|
TableName=COURSE_TABLE, |
|
|
Key={"course_id": {"N": str(course_id)}} |
|
|
) |
|
|
|
|
|
print(f"β
Deleted course {course_id} and {len(sessions)} associated sessions") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"β Error deleting course {course_id}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def _dict_to_dynamodb(data): |
|
|
"""Recursively convert dict/list into DynamoDB structure.""" |
|
|
if isinstance(data, dict): |
|
|
return {"M": {k: _dict_to_dynamodb(v) for k, v in data.items()}} |
|
|
elif isinstance(data, list): |
|
|
return {"L": [_dict_to_dynamodb(v) for v in data]} |
|
|
elif isinstance(data, str): |
|
|
return {"S": data} |
|
|
elif isinstance(data, bool): |
|
|
return {"BOOL": data} |
|
|
elif isinstance(data, (int, float)): |
|
|
return {"N": str(data)} |
|
|
elif data is None: |
|
|
return {"NULL": True} |
|
|
else: |
|
|
raise TypeError(f"Unsupported type: {type(data)}") |
|
|
|
|
|
|
|
|
|
|
|
COURSE_ID_TO_NAME = { |
|
|
47: "AI Ready Course", |
|
|
48: "MERN", |
|
|
49: "Machine Learning", |
|
|
50: "Data Science", |
|
|
51: "Flask", |
|
|
52: "Django" |
|
|
} |
|
|
|
|
|
def get_course_name_by_id(course_id): |
|
|
"""Get course name by course ID.""" |
|
|
return COURSE_ID_TO_NAME.get(int(course_id), f"Course {course_id}") |
|
|
|
|
|
|
|
|
|
|
|
def save_course_with_sessions(course_detail, course_name, session_data_list): |
|
|
"""Save course and multiple sessions in sequence.""" |
|
|
try: |
|
|
|
|
|
save_course(course_detail, course_name) |
|
|
|
|
|
|
|
|
for session_data in session_data_list: |
|
|
save_session( |
|
|
session_data["session_id"], |
|
|
session_data["course_id"], |
|
|
session_data["topic_id"], |
|
|
session_data["topic_title"], |
|
|
session_data.get("node", "initialization"), |
|
|
session_data.get("status", "created"), |
|
|
course_name |
|
|
) |
|
|
|
|
|
print(f"β
Saved course {course_name} with {len(session_data_list)} sessions") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"β Error saving course with sessions: {e}") |
|
|
return False |