Spaces:
Running
Running
| # ============================================================ | |
| # app/models/call.py - Call Model | |
| # ============================================================ | |
| from datetime import datetime | |
| from typing import Optional | |
| class Call: | |
| """Call model for MongoDB""" | |
| def create_document( | |
| conversation_id: str, | |
| call_type: str, | |
| caller_id: str, | |
| receiver_id: str, | |
| webrtc_session_id: str, | |
| ) -> dict: | |
| """Create call document for insertion""" | |
| now = datetime.utcnow() | |
| return { | |
| "conversation_id": conversation_id, | |
| "call_type": call_type, # "voice" or "video" | |
| "caller_id": caller_id, | |
| "receiver_id": receiver_id, | |
| "status": "ringing", # "ringing", "ongoing", "completed", "missed", "declined" | |
| "started_at": None, | |
| "ended_at": None, | |
| "duration": 0, # In seconds | |
| "webrtc_session_id": webrtc_session_id, | |
| "created_at": now, | |
| } | |
| def format_response(call_doc: dict) -> dict: | |
| """Format call document for API response""" | |
| if not call_doc: | |
| return None | |
| return { | |
| "id": str(call_doc.get("_id", "")), | |
| "conversation_id": call_doc.get("conversation_id"), | |
| "call_type": call_doc.get("call_type"), | |
| "caller_id": call_doc.get("caller_id"), | |
| "receiver_id": call_doc.get("receiver_id"), | |
| "status": call_doc.get("status"), | |
| "started_at": call_doc.get("started_at"), | |
| "ended_at": call_doc.get("ended_at"), | |
| "duration": call_doc.get("duration", 0), | |
| "webrtc_session_id": call_doc.get("webrtc_session_id"), | |
| "created_at": call_doc.get("created_at"), | |
| } | |