GraphResearcher Admin
Hidden admin workspace for monitoring users, documents, system health, and developer tools.
Users
Documents
Conversations
Status
Output
{}
from pathlib import Path # Clean BOM for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") print("BOM cleanup completed.") # ===================================================== # 1. Minimal product_db fallback if missing # ===================================================== product_db_path = Path("app/product/product_db.py") if not product_db_path.exists(): product_db_path.write_text(r''' import os import sqlite3 from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, List, Optional from app.core.config import settings def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def get_database_path() -> Path: env_path = os.getenv("APP_DATABASE_PATH") if env_path: db_path = Path(env_path) else: db_path = Path(settings.PROCESSED_DIR).parent / "product_app.sqlite3" db_path.parent.mkdir(parents=True, exist_ok=True) return db_path def get_connection(): conn = sqlite3.connect(str(get_database_path())) conn.row_factory = sqlite3.Row return conn def rows_to_dicts(rows): return [dict(row) for row in rows] def init_product_database() -> Dict[str, Any]: conn = get_connection() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, name TEXT, role TEXT NOT NULL DEFAULT 'user', auth_provider TEXT DEFAULT 'local', is_active INTEGER DEFAULT 1, created_at TEXT NOT NULL, last_login_at TEXT ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS user_documents ( document_id TEXT PRIMARY KEY, owner_user_id TEXT, source_file_name TEXT, upload_status TEXT DEFAULT 'uploaded', index_status TEXT DEFAULT 'not_indexed', graph_status TEXT DEFAULT 'not_built', chunk_count INTEGER DEFAULT 0, entity_count INTEGER DEFAULT 0, relation_count INTEGER DEFAULT 0, created_at TEXT NOT NULL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS conversations ( conversation_id TEXT PRIMARY KEY, owner_user_id TEXT, document_id TEXT, title TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS messages ( message_id TEXT PRIMARY KEY, conversation_id TEXT, role TEXT, content TEXT, created_at TEXT NOT NULL, metadata_json TEXT ) """) conn.commit() conn.close() return { "status": "success", "database_path": str(get_database_path()) } def get_database_status() -> Dict[str, Any]: init_product_database() conn = get_connection() cur = conn.cursor() tables = ["users", "user_documents", "conversations", "messages"] counts = {} for table in tables: cur.execute(f"SELECT COUNT(*) AS count FROM {table}") counts[table] = int(cur.fetchone()["count"]) conn.close() return { "status": "healthy", "database_path": str(get_database_path()), "table_counts": counts } def upsert_user( user_id: str, email: str, name: Optional[str] = None, role: str = "user", auth_provider: str = "local", avatar_url: Optional[str] = None ): init_product_database() now = utc_now() conn = get_connection() cur = conn.cursor() cur.execute(""" INSERT INTO users (user_id, email, name, role, auth_provider, is_active, created_at, last_login_at) VALUES (?, ?, ?, ?, ?, 1, ?, ?) ON CONFLICT(email) DO UPDATE SET name = excluded.name, role = excluded.role, auth_provider = excluded.auth_provider, last_login_at = excluded.last_login_at """, (user_id, email, name, role, auth_provider, now, now)) conn.commit() cur.execute("SELECT * FROM users WHERE email = ?", (email,)) user = dict(cur.fetchone()) conn.close() return user def list_users(limit: int = 100): init_product_database() conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT user_id, email, name, role, auth_provider, is_active, created_at, last_login_at FROM users ORDER BY created_at DESC LIMIT ? """, (limit,)) rows = rows_to_dicts(cur.fetchall()) conn.close() return rows def list_documents(limit: int = 100): init_product_database() conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT * FROM user_documents ORDER BY created_at DESC LIMIT ? """, (limit,)) rows = rows_to_dicts(cur.fetchall()) conn.close() return rows def list_conversations(limit: int = 100): init_product_database() conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT * FROM conversations ORDER BY updated_at DESC LIMIT ? """, (limit,)) rows = rows_to_dicts(cur.fetchall()) conn.close() return rows ''', encoding="utf-8") print("Created fallback product_db.py") else: print("product_db.py already exists") # ===================================================== # 2. Auth service # ===================================================== Path("app/product/auth_service.py").write_text(r''' import os from typing import Dict, Any, Optional from fastapi import Request, HTTPException from app.product.product_db import upsert_user DEFAULT_ADMIN_EMAILS = { "2006yugb@gmail.com" } def get_admin_emails(): raw = os.getenv("ADMIN_EMAILS", "") emails = { email.strip().lower() for email in raw.split(",") if email.strip() } return emails | DEFAULT_ADMIN_EMAILS def normalize_email(email: Optional[str]) -> str: return str(email or "").strip().lower() def make_user_id(email: str) -> str: return "user_" + email.replace("@", "_").replace(".", "_") def infer_role(email: str) -> str: if normalize_email(email) in get_admin_emails(): return "admin" return "user" def get_current_user_from_request(request: Request) -> Dict[str, Any]: email = normalize_email(request.headers.get("x-user-email")) name = request.headers.get("x-user-name") if not email: return { "authenticated": False, "user_id": None, "email": None, "name": "Guest", "role": "guest", "auth_provider": "none" } role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="header_dev" ) user["authenticated"] = True return user def require_authenticated_user(request: Request) -> Dict[str, Any]: user = get_current_user_from_request(request) if not user.get("authenticated"): raise HTTPException( status_code=401, detail="Authentication required." ) return user def require_admin_user(request: Request) -> Dict[str, Any]: user = require_authenticated_user(request) if user.get("role") != "admin": raise HTTPException( status_code=403, detail="Admin access required." ) return user def dev_login_user(email: str, name: Optional[str] = None) -> Dict[str, Any]: email = normalize_email(email) if not email: raise HTTPException(status_code=400, detail="email is required") role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="dev_login" ) user["authenticated"] = True user["dev_header_hint"] = { "X-User-Email": email, "X-User-Name": name or email.split("@")[0] } return user ''', encoding="utf-8") # ===================================================== # 3. Admin service # ===================================================== Path("app/product/admin_service.py").write_text(r''' from typing import Dict, Any from app.product.product_db import ( get_database_status, list_users, list_documents, list_conversations ) def get_admin_status(current_admin: Dict[str, Any]) -> Dict[str, Any]: return { "status": "ok", "message": "Admin backend is available.", "admin": { "email": current_admin.get("email"), "role": current_admin.get("role") }, "database": get_database_status() } def get_admin_users(limit: int = 100) -> Dict[str, Any]: users = list_users(limit=limit) return { "count": len(users), "users": users } def get_admin_documents(limit: int = 100) -> Dict[str, Any]: documents = list_documents(limit=limit) return { "count": len(documents), "documents": documents } def get_admin_conversations(limit: int = 100) -> Dict[str, Any]: conversations = list_conversations(limit=limit) return { "count": len(conversations), "conversations": conversations } def get_admin_system_summary() -> Dict[str, Any]: db = get_database_status() return { "status": "ok", "database": db, "notes": [ "Admin tools are separated from the normal user app.", "Normal users should not see API docs or GraphRAG console links.", "Admin APIs are protected by backend role checks." ] } ''', encoding="utf-8") # ===================================================== # 4. Admin UI # ===================================================== Path("app/product/admin_ui.py").write_text(r''' def get_admin_panel_html() -> str: return """
Hidden admin workspace for monitoring users, documents, system health, and developer tools.
{}