File size: 3,151 Bytes
e9e68a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import ipaddress
from typing import List, Optional

from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel, Field

from core.config import (
    SUPABASE_ADMIN_SYNC_TOKEN,
    SUPABASE_SYNC_ALLOWED_IPS,
    SUPABASE_SYNC_ALLOW_PRIVATE_NETWORK,
)

router = APIRouter(prefix="/admin/sync", tags=["admin-sync"])


class SyncNotifyRequest(BaseModel):
    event: str = "notify"
    folder_key: Optional[str] = None
    object_paths: List[str] = Field(default_factory=list)
    source: Optional[str] = None


def _extract_client_ip(request: Request) -> str:
    forwarded_for = (request.headers.get("x-forwarded-for") or "").strip()
    if forwarded_for:
        first_hop = forwarded_for.split(",", 1)[0].strip()
        if first_hop:
            return first_hop

    if request.client and request.client.host:
        return str(request.client.host)

    return ""


def _is_private_or_loopback(ip_value: str) -> bool:
    try:
        parsed_ip = ipaddress.ip_address(ip_value)
    except ValueError:
        return False

    return bool(parsed_ip.is_private or parsed_ip.is_loopback)


def _is_ip_allowed(ip_value: str) -> bool:
    normalized_ip = (ip_value or "").strip()
    if not normalized_ip:
        return False

    if SUPABASE_SYNC_ALLOW_PRIVATE_NETWORK and _is_private_or_loopback(normalized_ip):
        return True

    return normalized_ip in set(SUPABASE_SYNC_ALLOWED_IPS)


async def verify_admin_sync_access(request: Request) -> None:
    if not SUPABASE_ADMIN_SYNC_TOKEN:
        raise HTTPException(status_code=503, detail="Admin sync token is not configured.")

    incoming_token = (request.headers.get("x-internal-token") or "").strip()
    if incoming_token != SUPABASE_ADMIN_SYNC_TOKEN:
        raise HTTPException(status_code=401, detail="Invalid internal token.")

    request_ip = _extract_client_ip(request)
    if not _is_ip_allowed(request_ip):
        raise HTTPException(status_code=403, detail="Request source IP is not allowed.")


@router.post("/notify")
async def notify_sync(
    payload: SyncNotifyRequest,
    request: Request,
    _: None = Depends(verify_admin_sync_access),
):
    coordinator = getattr(request.app.state, "supabase_sync_coordinator", None)
    if coordinator is None:
        raise HTTPException(status_code=503, detail="Supabase sync coordinator is not available.")

    result = await coordinator.request_event_sync(
        event_name=payload.event,
        payload={
            "folder_key": payload.folder_key,
            "object_paths": payload.object_paths,
            "source": payload.source,
        },
    )

    return {
        "status": "accepted",
        "event": payload.event,
        "sync": result,
    }


@router.get("/health")
async def sync_health(
    request: Request,
    _: None = Depends(verify_admin_sync_access),
):
    coordinator = getattr(request.app.state, "supabase_sync_coordinator", None)
    if coordinator is None:
        raise HTTPException(status_code=503, detail="Supabase sync coordinator is not available.")

    return {
        "status": "ok",
        "sync": coordinator.get_health_snapshot(),
    }