| """API endpoints for user-registered database connections. |
| |
| Credential schemas (DbType, PostgresCredentials, etc.) live in |
| `src/models/credentials.py` β they are imported below (with noqa: F401) so |
| FastAPI/Swagger picks them up for OpenAPI schema generation even though they |
| are not referenced by name in this file. |
| """ |
|
|
| from typing import Any, Dict, List, Literal, Optional |
| from datetime import datetime |
|
|
| from fastapi import APIRouter, Depends, HTTPException, Query, Request, status |
| from pydantic import BaseModel, Field |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from src.database_client.database_client_service import database_client_service |
| from src.db.postgres.connection import get_db |
| from src.middlewares.logging import get_logger, log_execution |
| from src.middlewares.rate_limit import limiter |
| from src.models.credentials import ( |
| BigQueryCredentials, |
| CredentialSchemas, |
| DbType, |
| MysqlCredentials, |
| PostgresCredentials, |
| SnowflakeCredentials, |
| SqlServerCredentials, |
| SupabaseCredentials, |
| ) |
| from src.pipeline.db_pipeline import db_pipeline_service |
| from src.utils.db_credential_encryption import decrypt_credentials_dict |
|
|
| logger = get_logger("database_client_api") |
|
|
| router = APIRouter(prefix="/api/v1", tags=["Database Clients"]) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class DatabaseClientCreate(BaseModel): |
| """ |
| Payload to register a new external database connection. |
| |
| The `credentials` object shape depends on `db_type`: |
| |
| | db_type | Required fields | |
| |-------------|----------------------------------------------------------| |
| | postgres | host, port, database, username, password, ssl_mode | |
| | mysql | host, port, database, username, password, ssl | |
| | sqlserver | host, port, database, username, password, driver? | |
| | supabase | host, port, database, username, password, ssl_mode | |
| | bigquery | project_id, dataset_id, location?, service_account_json | |
| | snowflake | account, warehouse, database, schema?, username, password, role? | |
| |
| Sensitive fields (`password`, `service_account_json`) are encrypted |
| at rest using Fernet symmetric encryption. |
| """ |
|
|
| name: str = Field(..., description="Display name for this connection.", examples=["Production DB"]) |
| db_type: DbType = Field(..., description="Type of the database engine.", examples=["postgres"]) |
| credentials: Dict[str, Any] = Field( |
| ..., |
| description="Connection credentials. Shape depends on db_type. See schema descriptions above.", |
| examples=[ |
| { |
| "host": "db.example.com", |
| "port": 5432, |
| "database": "mydb", |
| "username": "admin", |
| "password": "s3cr3t!", |
| "ssl_mode": "require", |
| } |
| ], |
| ) |
|
|
|
|
| class DatabaseClientUpdate(BaseModel): |
| """ |
| Payload to update an existing database connection. |
| |
| All fields are optional β only provided fields will be updated. |
| If `credentials` is provided, it replaces the entire credentials object |
| and sensitive fields are re-encrypted. |
| """ |
|
|
| name: Optional[str] = Field(None, description="New display name for this connection.", examples=["Staging DB"]) |
| credentials: Optional[Dict[str, Any]] = Field( |
| None, |
| description="Updated credentials object. Replaces existing credentials entirely if provided.", |
| examples=[{"host": "new-host.example.com", "port": 5432, "database": "mydb", "username": "admin", "password": "n3wP@ss!", "ssl_mode": "require"}], |
| ) |
| status: Optional[Literal["active", "inactive"]] = Field( |
| None, |
| description="Set to 'inactive' to soft-disable the connection without deleting it.", |
| examples=["inactive"], |
| ) |
|
|
|
|
| class DatabaseClientResponse(BaseModel): |
| """ |
| Database connection record returned by the API. |
| |
| Credentials are **never** included in the response for security reasons. |
| """ |
|
|
| id: str = Field(..., description="Unique identifier of the database connection.") |
| user_id: str = Field(..., description="ID of the user who owns this connection.") |
| name: str = Field(..., description="Display name of the connection.") |
| db_type: str = Field(..., description="Database engine type.") |
| status: str = Field(..., description="Connection status: 'active' or 'inactive'.") |
| created_at: datetime = Field(..., description="Timestamp when the connection was registered.") |
| updated_at: Optional[datetime] = Field(None, description="Timestamp of the last update, if any.") |
|
|
| model_config = {"from_attributes": True} |
|
|
|
|
| |
| |
| |
|
|
| _DB_TYPES: List[Dict[str, Any]] = [ |
| { |
| "db_type": "postgres", |
| "display_name": "PostgreSQL", |
| "logo": "postgres", |
| "status": "active", |
| "message": None, |
| "fields": [ |
| {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"}, |
| {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number"}, |
| {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"}, |
| {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"}, |
| {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True}, |
| {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["disable", "require", "verify-ca", "verify-full"]}, |
| ], |
| }, |
| { |
| "db_type": "mysql", |
| "display_name": "MySQL", |
| "logo": "mysql", |
| "status": "active", |
| "message": None, |
| "fields": [ |
| {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"}, |
| {"name": "port", "type": "integer", "required": False, "default": 3306, "description": "Port number"}, |
| {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"}, |
| {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"}, |
| {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True}, |
| {"name": "ssl", "type": "boolean", "required": False, "default": True, "description": "Enable SSL"}, |
| ], |
| }, |
| { |
| "db_type": "supabase", |
| "display_name": "Supabase", |
| "logo": "supabase", |
| "status": "active", |
| "message": None, |
| "fields": [ |
| {"name": "host", "type": "string", "required": True, "default": None, "description": "Supabase database host"}, |
| {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number (5432 direct, 6543 pooler)"}, |
| {"name": "database", "type": "string", "required": False, "default": "postgres", "description": "Database name"}, |
| {"name": "username", "type": "string", "required": True, "default": None, "description": "Database user"}, |
| {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True}, |
| {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["require", "verify-ca", "verify-full"]}, |
| ], |
| }, |
| { |
| "db_type": "sqlserver", |
| "display_name": "SQL Server", |
| "logo": "sqlserver", |
| "status": "inactive", |
| "message": "Coming soon", |
| "fields": [ |
| {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"}, |
| {"name": "port", "type": "integer", "required": False, "default": 1433, "description": "Port number"}, |
| {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"}, |
| {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"}, |
| {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True}, |
| {"name": "driver", "type": "string", "required": False, "default": None, "description": "ODBC driver name"}, |
| ], |
| }, |
| { |
| "db_type": "bigquery", |
| "display_name": "BigQuery", |
| "logo": "bigquery", |
| "status": "inactive", |
| "message": "Coming soon", |
| "fields": [ |
| {"name": "project_id", "type": "string", "required": True, "default": None, "description": "GCP project ID"}, |
| {"name": "dataset_id", "type": "string", "required": True, "default": None, "description": "BigQuery dataset name"}, |
| {"name": "location", "type": "string", "required": False, "default": "US", "description": "Dataset location/region"}, |
| {"name": "service_account_json", "type": "string", "required": True, "default": None, "description": "GCP Service Account key JSON", "sensitive": True}, |
| ], |
| }, |
| { |
| "db_type": "snowflake", |
| "display_name": "Snowflake", |
| "logo": "snowflake", |
| "status": "inactive", |
| "message": "Coming soon", |
| "fields": [ |
| {"name": "account", "type": "string", "required": True, "default": None, "description": "Snowflake account identifier"}, |
| {"name": "warehouse", "type": "string", "required": True, "default": None, "description": "Virtual warehouse name"}, |
| {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"}, |
| {"name": "schema", "type": "string", "required": False, "default": "PUBLIC", "description": "Schema name"}, |
| {"name": "username", "type": "string", "required": True, "default": None, "description": "Snowflake username"}, |
| {"name": "password", "type": "string", "required": True, "default": None, "description": "Snowflake password", "sensitive": True}, |
| {"name": "role", "type": "string", "required": False, "default": None, "description": "Snowflake role"}, |
| ], |
| }, |
| ] |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get( |
| "/database-clients/dbtypes", |
| summary="List supported database types", |
| response_description="All database types supported by DataEyond with their connection parameters.", |
| ) |
| async def list_db_types(): |
| """ |
| Return every database type DataEyond can connect to, along with the |
| credential fields the frontend should render, a logo filename, and |
| an active/inactive status with an optional message. |
| """ |
| return _DB_TYPES |
|
|
|
|
| @router.post( |
| "/database-clients", |
| response_model=DatabaseClientResponse, |
| status_code=status.HTTP_201_CREATED, |
| summary="Register a new database connection", |
| response_description="The newly created database connection record (credentials excluded).", |
| responses={ |
| 201: {"description": "Connection registered successfully."}, |
| 422: {"description": "Validation error β check the credentials shape for the given db_type."}, |
| 500: {"description": "Internal server error."}, |
| }, |
| ) |
| @limiter.limit("10/minute") |
| @log_execution(logger) |
| async def create_database_client( |
| request: Request, |
| payload: DatabaseClientCreate, |
| user_id: str = Query(..., description="ID of the user registering the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Register a new external database connection for a user. |
| |
| The `credentials` object must match the shape for the chosen `db_type` |
| (see **CredentialSchemas** in the schema section below for exact fields). |
| Sensitive fields (`password`, `service_account_json`) are encrypted |
| before being persisted β they are never returned in any response. |
| """ |
| try: |
| client = await database_client_service.create( |
| db=db, |
| user_id=user_id, |
| name=payload.name, |
| db_type=payload.db_type, |
| credentials=payload.credentials, |
| ) |
| return DatabaseClientResponse.model_validate(client) |
| except Exception as e: |
| logger.error(f"Failed to create database client for user {user_id}", error=str(e)) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to create database client: {str(e)}", |
| ) |
|
|
|
|
| @router.get( |
| "/database-clients/{user_id}", |
| response_model=List[DatabaseClientResponse], |
| summary="List all database connections for a user", |
| response_description="List of database connections (credentials excluded).", |
| responses={ |
| 200: {"description": "Returns an empty list if the user has no connections."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def list_database_clients( |
| user_id: str, |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Return all database connections registered by the specified user, |
| ordered by creation date (newest first). |
| |
| Credentials are never included in the response. |
| """ |
| clients = await database_client_service.get_user_clients(db, user_id) |
| return [DatabaseClientResponse.model_validate(c) for c in clients] |
|
|
|
|
| @router.get( |
| "/database-clients/{user_id}/{client_id}", |
| response_model=DatabaseClientResponse, |
| summary="Get a single database connection", |
| response_description="Database connection detail (credentials excluded).", |
| responses={ |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def get_database_client( |
| user_id: str, |
| client_id: str, |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Return the detail of a single database connection. |
| |
| Returns **403** if the `user_id` in the path does not match the owner |
| of the requested connection. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| return DatabaseClientResponse.model_validate(client) |
|
|
|
|
| @router.put( |
| "/database-clients/{client_id}", |
| response_model=DatabaseClientResponse, |
| summary="Update a database connection", |
| response_description="Updated database connection record (credentials excluded).", |
| responses={ |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def update_database_client( |
| client_id: str, |
| payload: DatabaseClientUpdate, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Update an existing database connection. |
| |
| Only fields present in the request body are updated. |
| If `credentials` is provided it **replaces** the entire credentials object |
| and sensitive fields are re-encrypted automatically. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| updated = await database_client_service.update( |
| db=db, |
| client_id=client_id, |
| name=payload.name, |
| credentials=payload.credentials, |
| status=payload.status, |
| ) |
| return DatabaseClientResponse.model_validate(updated) |
|
|
|
|
| @router.delete( |
| "/database-clients/{client_id}", |
| status_code=status.HTTP_200_OK, |
| summary="Delete a database connection", |
| responses={ |
| 200: {"description": "Connection deleted successfully."}, |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def delete_database_client( |
| client_id: str, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Permanently delete a database connection. |
| |
| This action is irreversible. The stored credentials are also removed. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| await database_client_service.delete(db, client_id) |
| return {"status": "success", "message": "Database client deleted successfully"} |
|
|
|
|
| @router.post( |
| "/database-clients/{client_id}/ingest", |
| status_code=status.HTTP_200_OK, |
| summary="Ingest schema from a registered database into the vector store", |
| response_description="Count of chunks ingested.", |
| responses={ |
| 200: {"description": "Ingestion completed successfully."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| 404: {"description": "Connection not found."}, |
| 501: {"description": "The connection's db_type is not yet supported by the pipeline."}, |
| 500: {"description": "Ingestion failed (connection error, profiling error, etc.)."}, |
| }, |
| ) |
| @limiter.limit("5/minute") |
| @log_execution(logger) |
| async def ingest_database_client( |
| request: Request, |
| client_id: str, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Decrypt the stored credentials, connect to the user's database, introspect |
| its schema, profile each column, embed the descriptions, and store them in |
| the shared PGVector collection tagged with `source_type="database"`. |
| |
| Chunks become retrievable via the same retriever used for document chunks. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| if client.status != "active": |
| raise HTTPException( |
| status_code=status.HTTP_409_CONFLICT, |
| detail="Cannot ingest from an inactive database connection.", |
| ) |
|
|
| try: |
| creds = decrypt_credentials_dict(client.credentials) |
| with db_pipeline_service.engine_scope( |
| db_type=client.db_type, |
| credentials=creds, |
| ) as engine: |
| total = await db_pipeline_service.run(user_id=user_id, client_id=client_id, engine=engine) |
| except NotImplementedError as e: |
| raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e)) |
| except Exception as e: |
| logger.error( |
| f"Ingestion failed for client {client_id}", user_id=user_id, error=str(e) |
| ) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Ingestion failed: {e}", |
| ) |
|
|
| return {"status": "success", "client_id": client_id, "chunks_ingested": total} |
|
|