#!/usr/bin/env python3 """ mbok_dev - Main entry point Public Space that loads private ver20 app dynamically """ import os import sys import time import traceback import logging logging.getLogger("uvicorn.access").setLevel(logging.WARNING) from pathlib import Path from fastapi import FastAPI, Request, Depends, HTTPException, Form from fastapi.responses import RedirectResponse, JSONResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware import gradio as gr from supabase import create_client, Client # Import bootstrap to download private app from bootstrap import download_private_app from login import create_login_ui from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source # --- Startup Meta Info --- print("=" * 80) print("🚀 Starting mbok_dev") print("=" * 80) print(f"[STARTUP_META] Python version: {sys.version}") print(f"[STARTUP_META] CWD: {os.getcwd()}") print(f"[STARTUP_META] PORT: {os.environ.get('PORT', 'not set')}") print(f"[STARTUP_META] SPACE_ID: {os.environ.get('SPACE_ID', 'not set')}") print(f"[STARTUP_META] SPACE_HOST: {os.environ.get('SPACE_HOST', 'not set')}") print(f"[STARTUP_META] GRADIO_SERVER_NAME: {os.environ.get('GRADIO_SERVER_NAME', 'not set')}") print(f"[STARTUP_META] GRADIO_SERVER_PORT: {os.environ.get('GRADIO_SERVER_PORT', 'not set')}") print(f"[STARTUP_META] HF_TOKEN: {'***set***' if os.environ.get('HF_TOKEN') else 'NOT SET'}") print(f"[STARTUP_META] SUPABASE_URL: {'***set***' if os.environ.get('SUPABASE_URL') else 'NOT SET'}") print(f"[STARTUP_META] SUPABASE_KEY: {'***set***' if os.environ.get('SUPABASE_KEY') else 'NOT SET'}") print("=" * 80) # --- Bootstrap: Download private app at startup --- print("[PHASE] bootstrap_start") try: private_app_dir = download_private_app() # Add private app to Python path so we can import it private_app_path = str(private_app_dir.resolve()) if private_app_path not in sys.path: sys.path.insert(0, private_app_path) print(f"[PHASE] bootstrap_end success=true path={private_app_path}") except Exception as e: print(f"[PHASE] bootstrap_end success=false") print(f"[ERROR] Bootstrap failed: {e}") print(f"[TRACEBACK]\n{traceback.format_exc()}") print("⚠️ Application will start but /app/ route will not work") private_app_dir = None # --- Supabase Setup --- print("[PHASE] supabase_init_start") SUPABASE_URL = os.environ.get("SUPABASE_URL") SUPABASE_KEY = os.environ.get("SUPABASE_KEY") if not SUPABASE_URL or not SUPABASE_KEY: print("[ERROR] SUPABASE_URL and/or SUPABASE_KEY not set") raise ValueError( "SUPABASE_URL and SUPABASE_KEY must be set in environment variables. " "Please configure them in HF Space Secrets." ) try: supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) init_logger(supabase) print(f"[PHASE] supabase_init_end success=true") except Exception as e: print(f"[PHASE] supabase_init_end success=false") print(f"[ERROR] Supabase init failed: {e}") print(f"[TRACEBACK]\n{traceback.format_exc()}") raise # --- FastAPI App --- print("[PHASE] fastapi_init_start") app = FastAPI() # /static で静的ファイルを配信(logo.png などのアセット用) _static_dir = Path(__file__).parent app.mount("/static", StaticFiles(directory=str(_static_dir)), name="static") print("[PHASE] fastapi_init_end") # user_id -> profile dict のキャッシュ(プロセス内で保持、ログイン情報は変わらない前提) _user_profile_cache: dict = {} # 最後に解決された source を保持(Gradio WebSocket スレッドは ContextVar が伝播しないため) _last_known_source: dict = {} # 内部プロキシホスト(ログ対象外) _INTERNAL_PROXY_DOMAINS = { "proxy.spaces.internal.huggingface.tech", } # Gradio バックグラウンド通信のパス(未認証ノイズを抑制) def _is_gradio_background_path(path: str) -> bool: """Gradio が自動送信するバックグラウンドリクエストかどうかを判定する。 これらは未認証時でも大量に飛んでくるためログ対象外とする。 """ return ( path.startswith("/app/gradio_api/heartbeat/") or path == "/app/gradio_api/queue/join" or path.startswith("/app/gradio_api/queue/join/") ) # --- Request Logging Middleware --- def _resolve_source(request: Request) -> dict | None: """リクエストヘッダから流入元を判定して source_domain を返す。 内部プロキシホストの場合は None を返してログを除外する。 判定優先順: 1. ホストが内部プロキシ → None(ログ除外) 2. Referer に huggingface.co/spaces/ を含む → source_domain="huggingface.co" 3. ホストが *.hf.space → source_domain=host 4. それ以外 → source_domain=host or None """ headers = request.headers referer = (headers.get("referer") or "").lower() host = (headers.get("x-forwarded-host") or headers.get("host") or "").lower() if host in _INTERNAL_PROXY_DOMAINS: return None if "huggingface.co/spaces/" in referer: return {"source_domain": "huggingface.co"} if host.endswith(".hf.space"): return {"source_domain": host} return {"source_domain": host or None} class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start_time = time.time() path = request.url.path method = request.method # Resolve user from cookie and store in contextvars user_info = self._resolve_user(request) set_user_context(user_info) user_tag = f" user={user_info['email']}" if user_info else "" # Resolve source (flow-in channel) and store in contextvars source_info = _resolve_source(request) set_request_source(source_info) if source_info is not None: _last_known_source.update(source_info) # print(f"[REQUEST] method={method} path={path}{user_tag}") # log_event("request", f"{method} {path}", metadata={"method": method, "path": path}) try: response = await call_next(request) duration = time.time() - start_time # print(f"[RESPONSE] method={method} path={path} status={response.status_code} duration={duration:.3f}s{user_tag}") if response.status_code >= 400: # Gradio バックグラウンド通信の 401 は未認証ノイズなので記録しない if not (response.status_code == 401 and _is_gradio_background_path(path)): log_event( "error", "http_response_error", level="WARNING", metadata={"method": method, "path": path, "status": response.status_code, "duration": round(duration, 3)}, ) return response except Exception as e: duration = time.time() - start_time print(f"[RESPONSE] method={method} path={path} status=500 duration={duration:.3f}s error={e}{user_tag}") log_event( "error", "http_response_error", level="ERROR", metadata={"method": method, "path": path, "status": 500, "duration": round(duration, 3), "error": str(e)}, ) raise finally: set_user_context(None) set_request_source(None) @staticmethod def _resolve_user(request: Request): """User resolution from cookie. Full profile (incl. org_id) fetched once and cached per user_id.""" token = request.cookies.get("sb_access_token") if not token: return None try: res = supabase.auth.get_user(token) user_id = str(res.user.id) # キャッシュヒットならプロフィール取得をスキップ if user_id in _user_profile_cache: return _user_profile_cache[user_id] # 初回のみ profiles から org_id/org_name/role/display_name を全取得 email = res.user.email org_id = None org_name = None role = None display_name = None try: profile_res = supabase.from_("profiles").select( "email, org_id, role, display_name, organizations(name)" ).eq("id", user_id).single().execute() d = profile_res.data or {} org_id = d.get("org_id") org_name = (d.get("organizations") or {}).get("name") role = d.get("role") display_name = d.get("display_name") email = d.get("email") or email except Exception as pe: print(f"[ORG_CONTEXT] _resolve_user: profile fetch failed: {pe}") print(f"[ORG_CONTEXT] _resolve_user: first fetch user_id={user_id} email={email} org_id={org_id!r} org_name={org_name!r}") user_info = { "user_id": user_id, "email": email, "display_name": display_name, "role": role, "org_id": org_id, "org_name": org_name, } _user_profile_cache[user_id] = user_info return user_info except Exception: return None app.add_middleware(RequestLoggingMiddleware) print("[MIDDLEWARE] RequestLoggingMiddleware added") # --- Authentication Handler (for login UI) --- def handle_login(request: gr.Request, email, password): """Handle login attempt via Supabase""" print(f"[AUTH] Login attempt for: {email}") source = _resolve_source(request) log_event("auth", "login_attempt", user_override={"email": email}, source=source) try: res = supabase.auth.sign_in_with_password({"email": email, "password": password}) if res.session: print(f"[AUTH] Login successful: {email}") user_ctx = {"user_id": str(res.user.id), "email": email} log_event( "auth", "login_success", user_override=user_ctx, source=source, ) return ( gr.update(visible=False), gr.update(visible=True, value=f"### ✅ ログイン成功: {email}"), res.session.access_token ) except Exception as e: print(f"[AUTH] Login failed for {email}: {e}") log_event( "auth", "login_failure", level="WARNING", user_override={"email": email}, metadata={"error": str(e)}, source=source, ) return gr.update(), gr.update(value=f"❌ エラー: {str(e)}"), None # --- Authentication Dependency --- def get_current_user(request: Request): """Verify token from cookie and fetch user profile (uses _user_profile_cache)""" token = request.cookies.get("sb_access_token") if not token: print("[AUTH_CHECK] No sb_access_token cookie – unauthenticated access") if not _is_gradio_background_path(str(request.url.path)): log_event("auth", "unauthenticated_access", level="INFO", metadata={"path": str(request.url.path)}) return None try: res = supabase.auth.get_user(token) user_id = str(res.user.id) # キャッシュがあればそれを返す(ミドルウェアが先に取得済みのはず) if user_id in _user_profile_cache: return _user_profile_cache[user_id] # キャッシュ未作成の場合(直接アクセス等)はここで取得してキャッシュする profile_res = supabase.from_("profiles").select( "email, org_id, role, display_name, organizations(name)" ).eq("id", user_id).single().execute() d = profile_res.data or {} user_dict = { "user_id": user_id, "email": d.get("email"), "display_name": d.get("display_name"), "role": d.get("role"), "org_id": d.get("org_id"), "org_name": (d.get("organizations") or {}).get("name"), } _user_profile_cache[user_id] = user_dict return user_dict except Exception as e: print(f"[AUTH_CHECK] Token verify failed: {e}") log_event("auth", "token_verify_fail", level="WARNING", metadata={"error": str(e)}) return None # --- Create UI instances --- print("[PHASE] create_ui_start") login_ui = create_login_ui(handle_login) print("[PHASE] create_ui_end component=login_ui") # Import ver20 app (Gradio Blocks) print("[PHASE] import_ver20_start") ver20_app = None VER20_CSS = None if private_app_dir: try: # Import app module from downloaded private app from app import app as ver20_blocks # --- Inject Logging Callback --- try: from lib.logging import set_logger_callback def bridge_logger(event_type: str, message: str, metadata=None): """Ver20からのログイベントをSupabaseに転送""" user_override = None session_id = None clean_metadata = None if metadata: clean_metadata = dict(metadata) user_ctx = clean_metadata.pop("_user_context", None) if user_ctx and isinstance(user_ctx, dict): user_override = user_ctx session_id = clean_metadata.pop("session_id", None) log_event(event_type, message, metadata=clean_metadata, user_override=user_override, session_id=session_id, source=dict(_last_known_source) if _last_known_source else None) set_logger_callback(bridge_logger) print("[LOGGING] Connected ver20 logging to Supabase") except ImportError as e: print(f"[LOGGING] Could not import lib.logging or set_logger_callback: {e}") # ------------------------------- # --- Inject Org Context Getter (for HF dataset namespace) --- try: from lib.hf_storage import set_org_context_getter def get_org_for_storage(): """プロセスレベルの _user_profile_cache から org_id/org_name を返す。 ContextVar (get_user_context) は FastAPI リクエストスレッドでのみ有効で Gradio WebSocket キュースレッドでは伝播しないため、プロセス共有の _user_profile_cache(ログイン時にセットされる)を参照する。 シングルユーザー運用前提; session_org_map が優先されるため マルチユーザー時もStep実行後は正しいorgが使われる。 """ if _user_profile_cache: last_user = next(iter(_user_profile_cache.values())) org_id = last_user.get("org_id") org_name = last_user.get("org_name") if org_id or org_name: return {"org_id": org_id, "org_name": org_name} return None set_org_context_getter(get_org_for_storage) print("[ORG_CONTEXT] Connected org_context getter to hf_storage (cache-based)") except ImportError as e: print(f"[ORG_CONTEXT] Could not inject org_context getter: {e}") # --------------------------------------------------------- ver20_app = ver20_blocks # theme/css を mount_gradio_app に渡すために ver20 の定数を取得 try: from app import CUSTOM_CSS as VER20_CSS except ImportError: VER20_CSS = None print(f"[PHASE] import_ver20_end success=true type={type(ver20_app)}") except Exception as e: print(f"[PHASE] import_ver20_end success=false") print(f"[ERROR] Failed to import ver20 app: {e}") print(f"[TRACEBACK]\n{traceback.format_exc()}") else: print(f"[PHASE] import_ver20_end success=false reason=bootstrap_failed") # --- Routes --- @app.get("/") async def root(user=Depends(get_current_user)): """Root route - redirect to login or app based on auth status""" print(f"[ROUTE] / accessed, user_authenticated={isinstance(user, dict) and user.get('user_id')}") if isinstance(user, dict) and user.get("user_id"): return RedirectResponse(url="/app/") return RedirectResponse(url="/login/") @app.get("/logout") async def logout(request: Request): """Logout route - clear cookie and redirect to login. Also serves as force-logout endpoint when session is expired/invalid. """ user = get_user_context() token = request.cookies.get("sb_access_token") forced = request.query_params.get("forced", "0") print(f"[ROUTE] /logout accessed forced={forced}") if forced == "1": log_event("auth", "force_logout", user_override=user, metadata={"reason": "session_expired"}) else: log_event("auth", "logout", user_override=user) # Supabase セッション失効(トークンがある場合) if token: try: supabase.auth.sign_out() except Exception: pass response = RedirectResponse(url="/login/") response.delete_cookie("sb_access_token", path="/", samesite="none") return response @app.get("/healthz") async def healthz(): """Health check endpoint""" status = { "ok": True, "ver20_loaded": ver20_app is not None, "private_app_dir": str(private_app_dir) if private_app_dir else None } print(f"[HEALTHZ] {status}") return JSONResponse(content=status) _RESET_PASSWORD_HTML = """