| from sqlalchemy.orm import Session |
| from sqlalchemy import and_, or_, desc |
| from database.models.acl import ACLs, Scope, AclRole |
| from database.models.watch_channel import WatchChannel |
| from schemas.acl import ACLRuleInput, PatchACLRuleInput, Channel, ACLListResponse, ACLRule, ScopeInput, ScopeOutput |
| from services.notification_service import get_notification_service |
| from uuid import uuid4 |
| from datetime import datetime, timedelta |
| from typing import Dict, Any, Optional |
| import logging |
| import json |
| import uuid |
| import base64 |
| from database.models.calendar import Calendar |
| from database.models.user import User |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ACLManager: |
| def __init__(self, db: Session, user_id: str): |
| self.db = db |
| self.user_id = user_id |
|
|
| def validate_calendar_id(self, calendar_id, user_id): |
| calendar = self.db.query(Calendar).filter( |
| Calendar.calendar_id == calendar_id, |
| Calendar.user_id == user_id, |
| Calendar.deleted == False |
| ).first() |
| if not calendar: |
| return False |
| return True |
|
|
| def list_rules(self, calendar_id: str, max_results: int = 100, |
| page_token: Optional[str] = None, show_deleted: bool = False, |
| sync_token: Optional[str] = None) -> ACLListResponse: |
| """ |
| List ACL rules for a calendar with pagination and filtering support. |
| |
| Args: |
| calendar_id: The calendar ID |
| max_results: Maximum number of entries per page (1-250, default 100) |
| page_token: Token for pagination continuation |
| show_deleted: Whether to include deleted ACLs (role == "none") |
| sync_token: Token for incremental synchronization |
| |
| Returns: |
| ACLListResponse with items and pagination tokens |
| """ |
| try: |
| |
| if sync_token: |
| return self._handle_sync_request(calendar_id, sync_token, max_results, show_deleted) |
| |
| |
| query = self.db.query(ACLs).filter( |
| ACLs.calendar_id == calendar_id, |
| ACLs.user_id == self.user_id |
| ) |
| |
| |
| if not show_deleted: |
| query = query.filter(ACLs.role != AclRole.none) |
| |
| |
| query = query.order_by(ACLs.created_at, ACLs.id) |
| |
| |
| offset = 0 |
| if page_token: |
| try: |
| offset = int(base64.b64decode(page_token).decode('utf-8')) |
| except (ValueError, TypeError): |
| raise ValueError("Invalid pageToken") |
| |
| |
| items = query.offset(offset).limit(max_results + 1).all() |
| |
| |
| has_next_page = len(items) > max_results |
| if has_next_page: |
| items = items[:max_results] |
| next_page_token = base64.b64encode(str(offset + max_results).encode('utf-8')).decode('utf-8') |
| else: |
| next_page_token = None |
| |
| |
| latest_updated = self.db.query(ACLs.updated_at).filter( |
| ACLs.calendar_id == calendar_id, |
| ACLs.user_id == self.user_id |
| ).order_by(desc(ACLs.updated_at)).first() |
| |
| next_sync_token = None |
| if latest_updated and latest_updated[0]: |
| sync_data = { |
| 'calendar_id': calendar_id, |
| 'timestamp': latest_updated[0].isoformat() |
| } |
| next_sync_token = base64.b64encode(json.dumps(sync_data).encode('utf-8')).decode('utf-8') |
| |
| |
| etag = f'"{uuid4()}"' |
| |
| |
| acl_rules = [] |
| for item in items: |
| if item.scope.type == "default": |
| scope = ScopeOutput(type=item.scope.type) |
| else: |
| scope = ScopeOutput( |
| type=item.scope.type, |
| value=item.scope.value |
| ) |
| acl_rule = ACLRule( |
| id=item.id, |
| calendar_id=item.calendar_id, |
| user_id=item.user_id, |
| role=item.role, |
| etag=item.etag, |
| scope=scope |
| ) |
| acl_rules.append(acl_rule) |
|
|
| return ACLListResponse( |
| etag=etag, |
| items=acl_rules, |
| nextPageToken=next_page_token, |
| nextSyncToken=next_sync_token |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error listing ACL rules: {e}") |
| raise |
| |
| def _handle_sync_request(self, calendar_id: str, sync_token: str, |
| max_results: int, show_deleted: bool) -> ACLListResponse: |
| """ |
| Handle incremental synchronization request. |
| |
| Args: |
| calendar_id: The calendar ID |
| sync_token: The sync token from previous request |
| max_results: Maximum number of entries per page |
| show_deleted: Whether to include deleted ACLs |
| |
| Returns: |
| ACLListResponse with changes since the sync token |
| """ |
| try: |
| |
| sync_data = json.loads(base64.b64decode(sync_token).decode('utf-8')) |
| last_sync_time = datetime.fromisoformat(sync_data['timestamp']) |
| |
| |
| if sync_data.get('calendar_id') != calendar_id: |
| raise ValueError("Sync token calendar ID mismatch") |
| |
| |
| if (datetime.utcnow() - last_sync_time).days > 7: |
| from fastapi import HTTPException |
| raise HTTPException(status_code=410, detail="Sync token expired") |
| |
| |
| query = self.db.query(ACLs).filter( |
| ACLs.calendar_id == calendar_id, |
| ACLs.user_id == self.user_id, |
| ACLs.updated_at > last_sync_time |
| ) |
| |
| |
| |
| query = query.order_by(ACLs.updated_at, ACLs.id) |
| |
| |
| items = query.limit(max_results + 1).all() |
| |
| |
| has_more = len(items) > max_results |
| if has_more: |
| items = items[:max_results] |
| |
| |
| |
| |
| next_sync_token = None |
| if items: |
| latest_time = max(item.updated_at for item in items) |
| sync_data = { |
| 'calendar_id': calendar_id, |
| 'timestamp': latest_time.isoformat() |
| } |
| next_sync_token = base64.b64encode(json.dumps(sync_data).encode('utf-8')).decode('utf-8') |
| else: |
| |
| next_sync_token = sync_token |
| |
| |
| etag = f'"{uuid4()}"' |
| |
| |
| acl_rules = [] |
| for item in items: |
| scope = ScopeInput( |
| type=item.scope.type, |
| value=item.scope.value |
| ) |
| acl_rule = ACLRule( |
| id=item.id, |
| calendar_id=item.calendar_id, |
| user_id=item.user_id, |
| role=item.role, |
| etag=item.etag, |
| scope=scope |
| ) |
| acl_rules.append(acl_rule) |
| |
| return ACLListResponse( |
| etag=etag, |
| items=acl_rules, |
| nextPageToken=None, |
| nextSyncToken=next_sync_token |
| ) |
| |
| except json.JSONDecodeError: |
| raise ValueError("Invalid sync token format") |
| except Exception as e: |
| logger.error(f"Error handling sync request: {e}") |
| raise |
|
|
| def get_rule(self, calendar_id: str, rule_id: str): |
| """ |
| Retrieve a specific ACL rule by calendar ID and rule ID (must be owned). |
| """ |
| return self.db.query(ACLs).filter_by( |
| id=rule_id, |
| calendar_id=calendar_id, |
| user_id=self.user_id |
| ).first() |
|
|
| def insert_rule(self, calendar_id: str, rule: ACLRuleInput, send_notifications: bool = True): |
| """ |
| Insert a new ACL rule after validating scope existence. |
| |
| Args: |
| calendar_id: The calendar ID |
| rule: The ACL rule input data |
| send_notifications: Whether to send notifications about the calendar sharing change (default: True) |
| |
| Returns the inserted ACL rule. |
| """ |
| |
| if rule.scope.type == "default": |
| scope = self.db.query(Scope).filter(Scope.type == rule.scope.type).first() |
| else: |
| if rule.scope.value is None: |
| scope = self.db.query(Scope).filter(Scope.type == rule.scope.type).first() |
| else: |
| scope = ( |
| self.db.query(Scope) |
| .filter(Scope.type == rule.scope.type, Scope.value == rule.scope.value) |
| .first() |
| ) |
|
|
| if not scope: |
| raise ValueError(f"Scope ({rule.scope.type}, {rule.scope.value}) not found") |
|
|
| |
| rule_id = f"{uuid4()}" |
| db_rule = ACLs( |
| id=rule_id, |
| calendar_id=calendar_id, |
| user_id=self.user_id, |
| role=rule.role, |
| scope_id=scope.id, |
| etag=f'"{uuid4()}"', |
| created_at=datetime.utcnow(), |
| updated_at=datetime.utcnow() |
| ) |
| self.db.add(db_rule) |
| self.db.commit() |
| self.db.refresh(db_rule) |
| |
| |
| if send_notifications: |
| self._send_acl_notification(calendar_id, "insert", { |
| "id": db_rule.id, |
| "calendar_id": db_rule.calendar_id, |
| "user_id": db_rule.user_id, |
| "role": db_rule.role.value, |
| "scope": scope.as_dict(), |
| "etag": db_rule.etag |
| }) |
|
|
| response = { |
| "kind": "calendar#aclRule", |
| "etag": db_rule.etag, |
| "id": db_rule.id, |
| "scope":{}, |
| "role": db_rule.role.value |
| } |
| scope_dict = scope.as_dict() |
| response["scope"]["type"] = scope_dict.get("type") |
| if scope_dict.get("value") != "public": |
| response["scope"]["value"] = scope_dict.get("value") |
| return response |
|
|
| def update_rule(self, calendar_id: str, rule_id: str, rule: ACLRuleInput, send_notifications: bool = True): |
| """ |
| Fully replace an existing ACL rule's role and scope. |
| |
| Returns the updated rule or None if not found. |
| """ |
| db_rule = self.get_rule(calendar_id, rule_id) |
|
|
| if not db_rule: |
| return None |
|
|
| if rule.role is not None: |
| db_rule.role = rule.role |
| |
| if rule.scope is not None: |
| if db_rule.scope: |
| db_rule.scope.type = rule.scope.type |
| if rule.scope.value is not None and rule.scope.type != "default": |
| if rule.scope.type in ["user", "group"]: |
| |
| user = self.db.query(User).filter(User.email == rule.scope.value, User.is_active == True).first() |
| if user is None: |
| raise ValueError("Invalid data in 'value field'. Please enter an existing email id in 'value' field") |
| db_rule.scope.value = rule.scope.value |
| else: |
| raise ValueError("ACL rule has no associated scope object to update.") |
| db_rule.updated_at = datetime.utcnow() |
| self.db.commit() |
| self.db.refresh(db_rule) |
| |
| |
| if send_notifications: |
| self._send_acl_notification(calendar_id, "update", { |
| "id": db_rule.id, |
| "calendar_id": db_rule.calendar_id, |
| "user_id": db_rule.user_id, |
| "role": db_rule.role.value, |
| "scope": db_rule.scope.as_dict(), |
| "etag": db_rule.etag |
| }) |
| |
| response = { |
| "kind": "calendar#aclRule", |
| "etag": db_rule.etag, |
| "id": db_rule.id, |
| "scope":{}, |
| "role": db_rule.role.value |
| } |
| scope_dict = db_rule.scope.as_dict() |
| response["scope"]["type"] = scope_dict.get("type") |
| if scope_dict.get("value") != "public": |
| response["scope"]["value"] = scope_dict.get("value") |
| return response |
|
|
| def patch_rule(self, calendar_id: str, rule_id: str, rule: PatchACLRuleInput, send_notifications: bool = True): |
| """ |
| Partially update an ACL rule's role or scope if provided. |
| |
| Returns the updated rule or None if not found. |
| """ |
| db_rule = self.get_rule(calendar_id, rule_id) |
| if not db_rule: |
| return None |
|
|
| if rule.role is not None: |
| db_rule.role = rule.role |
|
|
| if rule.scope is not None: |
| |
| if db_rule.scope: |
| db_rule.scope.type = rule.scope.type |
| if rule.scope.value is not None and rule.scope.type != "default": |
| if rule.scope.type in ["user", "group"]: |
| |
| user = self.db.query(User).filter(User.email == rule.scope.value, User.is_active == True).first() |
| if user is None: |
| raise ValueError("Invalid data in 'value field'. Please enter an existing email id in 'value' field") |
| db_rule.scope.value = rule.scope.value |
| else: |
| raise ValueError("ACL rule has no associated scope object to patch.") |
|
|
| db_rule.updated_at = datetime.utcnow() |
| self.db.commit() |
| self.db.refresh(db_rule) |
| |
| |
| if send_notifications: |
| self._send_acl_notification(calendar_id, "update", { |
| "id": db_rule.id, |
| "calendar_id": db_rule.calendar_id, |
| "user_id": db_rule.user_id, |
| "role": db_rule.role.value, |
| "scope": db_rule.scope.as_dict(), |
| "etag": db_rule.etag |
| }) |
| |
| response = { |
| "kind": "calendar#aclRule", |
| "etag": db_rule.etag, |
| "id": db_rule.id, |
| "scope":{}, |
| "role": db_rule.role.value |
| } |
| scope_dict = db_rule.scope.as_dict() |
| response["scope"]["type"] = scope_dict.get("type") |
| if scope_dict.get("value") != "public": |
| response["scope"]["value"] = scope_dict.get("value") |
| return response |
|
|
| def delete_rule(self, calendar_id: str, rule_id: str) -> bool: |
| """ |
| Delete an ACL rule by ID and calendar ID. Only the calendar owner can delete ACLs. |
| |
| Returns: |
| True if deleted, False if rule not found. |
| Raises: |
| Exception if DB operation fails. |
| """ |
| session = self.db |
| try: |
| from database.models import Calendar |
| rule = session.query(ACLs).join(Calendar).filter( |
| ACLs.id == rule_id, |
| ACLs.calendar_id == calendar_id, |
| Calendar.user_id == self.user_id |
| ).first() |
|
|
| if not rule: |
| return False |
|
|
| |
| rule_data = { |
| "id": rule.id, |
| "calendar_id": rule.calendar_id, |
| "user_id": rule.user_id, |
| "role": rule.role.value, |
| "scope": rule.scope.as_dict() if rule.scope else {}, |
| "etag": rule.etag |
| } |
|
|
| session.delete(rule) |
| session.commit() |
| |
| |
| self._send_acl_notification(calendar_id, "delete", rule_data) |
| |
| return True |
|
|
| except Exception as e: |
| session.rollback() |
| raise |
|
|
| def watch_acl( |
| self, |
| user_id:str, |
| calendar_id: str, |
| watch_request: Dict[str, Any] |
| ) -> Channel: |
| """ |
| Set up watch notifications for ACL changes. |
| |
| POST /calendars/{calendarId}/acl/watch |
| """ |
| try: |
| session = self.db |
|
|
| |
| resource_id = f"acl-{calendar_id}-{uuid.uuid4().hex[:8]}" |
| resource_uri = f"/calendars/{calendar_id}/acl" |
| |
| |
| |
| expires_at = datetime.utcnow() + timedelta(hours=24) |
|
|
| |
| calendar = session.query(Calendar).filter( |
| Calendar.calendar_id == calendar_id, |
| Calendar.user_id == user_id |
| ).first() |
| if not calendar: |
| raise ValueError(f"Calendar {calendar_id} not found for user {user_id}") |
|
|
| if session.query(WatchChannel).filter(WatchChannel.id == watch_request.id).first(): |
| raise ValueError(f"Channel with Id {watch_request.id} already exists") |
|
|
| |
| watch_channel = WatchChannel( |
| id=watch_request.id, |
| resource_id=resource_id, |
| resource_uri=resource_uri, |
| resource_type="acl", |
| calendar_id=calendar_id, |
| user_id=user_id, |
| webhook_address=watch_request.address, |
| webhook_token=watch_request.token, |
| webhook_type=watch_request.type, |
| params=json.dumps(watch_request.params.model_dump()) if watch_request.params else None, |
| created_at=datetime.utcnow(), |
| expires_at=expires_at, |
| is_active="true", |
| notification_count=0 |
| ) |
| |
| |
| session.add(watch_channel) |
| session.commit() |
| |
| logger.info(f"Created settings watch channel {watch_request.id} for user {user_id}") |
|
|
|
|
| |
| |
| channel = Channel( |
| id=watch_channel.id, |
| resourceId=resource_id, |
| resourceUri=watch_channel.resource_uri, |
| token=watch_channel.webhook_token, |
| expiration=watch_channel.expires_at.isoformat() + "Z" if watch_channel.expires_at else None |
| ) |
| |
| logger.info(f"Set up watch channel {watch_channel.id} for ACL changes in calendar {calendar_id}") |
| return channel |
| |
| except Exception as e: |
| logger.error(f"Error setting up ACL watch for calendar {calendar_id}: {e}") |
| self.db.rollback() |
| raise |
|
|
| def cleanup_expired_channels(self) -> int: |
| """ |
| Clean up expired watch channels for this user |
| |
| Returns: |
| Number of channels cleaned up |
| """ |
| try: |
| current_time = datetime.utcnow() |
| |
| |
| expired_channels = self.db.query(WatchChannel).filter( |
| WatchChannel.user_id == self.user_id, |
| WatchChannel.expires_at < current_time, |
| WatchChannel.is_active == "true" |
| ).all() |
| |
| cleanup_count = 0 |
| for channel in expired_channels: |
| channel.is_active = "false" |
| cleanup_count += 1 |
| |
| if cleanup_count > 0: |
| self.db.commit() |
| logger.info(f"Cleaned up {cleanup_count} expired watch channels for user {self.user_id}") |
| |
| return cleanup_count |
| |
| except Exception as e: |
| logger.error(f"Error cleaning up expired channels: {e}") |
| self.db.rollback() |
| return 0 |
|
|
| def _send_acl_notification(self, calendar_id: str, change_type: str, acl_data: Dict[str, Any]): |
| """ |
| Send ACL change notification to all active watch channels for the calendar |
| |
| Args: |
| calendar_id: The calendar ID |
| change_type: Type of change ("insert", "update", "delete") |
| acl_data: The ACL rule data |
| """ |
| try: |
| notification_service = get_notification_service(self.db) |
| notifications_sent = notification_service.notify_acl_change( |
| calendar_id, |
| change_type, |
| acl_data |
| ) |
| |
| if notifications_sent > 0: |
| logger.debug(f"Sent {notifications_sent} notifications for ACL {change_type} in calendar {calendar_id}") |
| |
| except Exception as e: |
| logger.error(f"Error sending ACL notification: {e}") |
| |
|
|
|
|
| def get_acl_manager(db: Session, user_id: str) -> ACLManager: |
| return ACLManager(db, user_id) |
|
|