ishaq101 sofhiaazzhr commited on
Commit
c7d59cf
·
1 Parent(s): 7da6db5

[NOTICKET][DB] add endpoints for dbtypes (#6)

Browse files

- [noticket] add gitignore (c87f27f458854787102eebc110b178c2572ef8a6)
- [NOTICKET]: add document pipeline, simplify document API (fb871f3d7db9c5a9c95a012761c4180a432e7e4b)
- [NOTICKET]: update folder document_pipelines after pipelines (a4cf97ab3890e35831c0043a1d411c17d84f374b)
- [NOTICKET][DB] refactor code to new repo (7f3bb9782bb9b6ace69e6be35377c12757a2f81d)
- [KM-441] add mean and median (9b5933420821187e73bb72a896e28d7adf6d6137)
- [NOTICKET] new metadata format for cleaner code (6b590d94c398ac4966a0e376f2809aa52c5ba022)
- update document (5a69e0eecf80dd807565dbc369374e41450c37fc)
- delete duplicate file (3848d7b262745b758069e3ab8089437ced42a45f)
- edit document for new pipeline (425e021053c4de65e2598b0f5554b7c1044a904c)
- [NOTICKET]: add CSV and XLSX file type (31920c3bc6f9faf9e07095a02fc1faf08adc246e)
- [DB] fix/rename db_pipeline.py (d913315c356d741cc7a5a5b1f8e36ba41397f2ab)
- [NOTICKET][DB] menyesuaikan format struktur db_pipeline sesuai dengan file lain (e13a9017e0f7fd26ef9b8ad5981d6c0799c01661)
- [NOTICKET][DB] pisahin db credential ke folder model. add ingestion endpoint at db_client to use db pipeline. add router db_client di main. (347a73aa5597cb28d1f43940eb60db619e4a10be)
- [NOTICKET]: use tesseract for extract PDF (6b9a13d417f6ffffed61a44567a3acbee822e173)
- [NOTICKET]: add Tesseract and Poppler binaries via Git LFS (0a9101a16c713c3facd8c06279da1f01403306a0)
- [NOTICKET]: update uv.lock (bb79f64b69c3bdcb2c98867c5d4945bf2311394d)
- [NOTICKET][DB] update credential & databaseclient. update settings (0e079550f8524da66df365638167dfe42b70ae62)
- [NOTICKET] update settings (65a5c6b1c1539de5d66e700aa5bef011e3c2b2f6)
- [KM-437][DB] add mysql, sqlserver, bigquery, snowflake connections (43539293eab9328d1d6f22f2f837c0a0085485a5)
- [NOTICKET]: adjusted pyproject.toml for OCR PDF (a00e2ad5306306ff73c30d085f126fab7bd1e78c)
- [NOTICKET]: fix merge conflict (6c8734607da68151ab574d0d186b17ac0d8ec460)
- [NOTICKET][DB] fix mysql pipeline (060c8cc81d4de2e85d2f6b4d3de178307e211ad0)
- [NOTICKET] edit imports (b145c06e354030dc38a550cd53030ba4864870ae)
- [NOTICKET] minor code refactor (52415b6a52f08900d27b548459a15c1113c1b892)
- [NOTICKET] add duplicate check for storing database (d310770ffcbcf75cb6b952f86bc6947ab2c7bb6a)
- [NOTICKET][DB] add supported dbtype for frontend (a531fcc7f53fe56a634ebba3b0fd4ff7ad51a33d)
- Merge branch 'dev_new' into pr/6 (14ec882dd774b73a44ba4521f35c78b04c8c3e73)


Co-authored-by: Sofhia Az-Zahra <sofhiaazzhr@users.noreply.huggingface.co>

src/api/v1/db_client.py CHANGED
@@ -117,11 +117,122 @@ class DatabaseClientResponse(BaseModel):
117
  model_config = {"from_attributes": True}
118
 
119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  # ---------------------------------------------------------------------------
121
  # Endpoints
122
  # ---------------------------------------------------------------------------
123
 
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  @router.post(
126
  "/database-clients",
127
  response_model=DatabaseClientResponse,
@@ -335,9 +446,14 @@ async def ingest_database_client(
335
  if client.user_id != user_id:
336
  raise HTTPException(status_code=403, detail="Access denied")
337
 
338
- creds = decrypt_credentials_dict(client.credentials)
 
 
 
 
339
 
340
  try:
 
341
  with db_pipeline_service.engine_scope(
342
  db_type=client.db_type,
343
  credentials=creds,
 
117
  model_config = {"from_attributes": True}
118
 
119
 
120
+ # ---------------------------------------------------------------------------
121
+ # Supported DB types registry
122
+ # ---------------------------------------------------------------------------
123
+
124
+ _DB_TYPES: List[Dict[str, Any]] = [
125
+ {
126
+ "db_type": "postgres",
127
+ "display_name": "PostgreSQL",
128
+ "logo": "postgres",
129
+ "status": "active",
130
+ "message": None,
131
+ "fields": [
132
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
133
+ {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number"},
134
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
135
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
136
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
137
+ {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["disable", "require", "verify-ca", "verify-full"]},
138
+ ],
139
+ },
140
+ {
141
+ "db_type": "mysql",
142
+ "display_name": "MySQL",
143
+ "logo": "mysql",
144
+ "status": "active",
145
+ "message": None,
146
+ "fields": [
147
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
148
+ {"name": "port", "type": "integer", "required": False, "default": 3306, "description": "Port number"},
149
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
150
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
151
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
152
+ {"name": "ssl", "type": "boolean", "required": False, "default": True, "description": "Enable SSL"},
153
+ ],
154
+ },
155
+ {
156
+ "db_type": "supabase",
157
+ "display_name": "Supabase",
158
+ "logo": "supabase",
159
+ "status": "active",
160
+ "message": None,
161
+ "fields": [
162
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Supabase database host"},
163
+ {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number (5432 direct, 6543 pooler)"},
164
+ {"name": "database", "type": "string", "required": False, "default": "postgres", "description": "Database name"},
165
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database user"},
166
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
167
+ {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["require", "verify-ca", "verify-full"]},
168
+ ],
169
+ },
170
+ {
171
+ "db_type": "sqlserver",
172
+ "display_name": "SQL Server",
173
+ "logo": "sqlserver",
174
+ "status": "inactive",
175
+ "message": "Coming soon",
176
+ "fields": [
177
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
178
+ {"name": "port", "type": "integer", "required": False, "default": 1433, "description": "Port number"},
179
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
180
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
181
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
182
+ {"name": "driver", "type": "string", "required": False, "default": None, "description": "ODBC driver name"},
183
+ ],
184
+ },
185
+ {
186
+ "db_type": "bigquery",
187
+ "display_name": "BigQuery",
188
+ "logo": "bigquery",
189
+ "status": "inactive",
190
+ "message": "Coming soon",
191
+ "fields": [
192
+ {"name": "project_id", "type": "string", "required": True, "default": None, "description": "GCP project ID"},
193
+ {"name": "dataset_id", "type": "string", "required": True, "default": None, "description": "BigQuery dataset name"},
194
+ {"name": "location", "type": "string", "required": False, "default": "US", "description": "Dataset location/region"},
195
+ {"name": "service_account_json", "type": "string", "required": True, "default": None, "description": "GCP Service Account key JSON", "sensitive": True},
196
+ ],
197
+ },
198
+ {
199
+ "db_type": "snowflake",
200
+ "display_name": "Snowflake",
201
+ "logo": "snowflake",
202
+ "status": "inactive",
203
+ "message": "Coming soon",
204
+ "fields": [
205
+ {"name": "account", "type": "string", "required": True, "default": None, "description": "Snowflake account identifier"},
206
+ {"name": "warehouse", "type": "string", "required": True, "default": None, "description": "Virtual warehouse name"},
207
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
208
+ {"name": "schema", "type": "string", "required": False, "default": "PUBLIC", "description": "Schema name"},
209
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Snowflake username"},
210
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Snowflake password", "sensitive": True},
211
+ {"name": "role", "type": "string", "required": False, "default": None, "description": "Snowflake role"},
212
+ ],
213
+ },
214
+ ]
215
+
216
+
217
  # ---------------------------------------------------------------------------
218
  # Endpoints
219
  # ---------------------------------------------------------------------------
220
 
221
 
222
+ @router.get(
223
+ "/database-clients/dbtypes",
224
+ summary="List supported database types",
225
+ response_description="All database types supported by DataEyond with their connection parameters.",
226
+ )
227
+ async def list_db_types():
228
+ """
229
+ Return every database type DataEyond can connect to, along with the
230
+ credential fields the frontend should render, a logo filename, and
231
+ an active/inactive status with an optional message.
232
+ """
233
+ return _DB_TYPES
234
+
235
+
236
  @router.post(
237
  "/database-clients",
238
  response_model=DatabaseClientResponse,
 
446
  if client.user_id != user_id:
447
  raise HTTPException(status_code=403, detail="Access denied")
448
 
449
+ if client.status != "active":
450
+ raise HTTPException(
451
+ status_code=status.HTTP_409_CONFLICT,
452
+ detail="Cannot ingest from an inactive database connection.",
453
+ )
454
 
455
  try:
456
+ creds = decrypt_credentials_dict(client.credentials)
457
  with db_pipeline_service.engine_scope(
458
  db_type=client.db_type,
459
  credentials=creds,
src/config/settings.py CHANGED
@@ -63,7 +63,7 @@ class Settings(BaseSettings):
63
 
64
  # DB credential encryption (Fernet key for user-registered database creds)
65
  dataeyond_db_credential_key: str = Field(
66
- alias="dataeyond__db__credential__key", default=""
67
  )
68
 
69
 
 
63
 
64
  # DB credential encryption (Fernet key for user-registered database creds)
65
  dataeyond_db_credential_key: str = Field(
66
+ alias="dataeyond__db__credential__key"
67
  )
68
 
69
 
src/database_client/database_client_service.py CHANGED
@@ -16,9 +16,46 @@ from src.utils.db_credential_encryption import (
16
  logger = get_logger("database_client_service")
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
19
  class DatabaseClientService:
20
  """Service for managing user-registered external database connections."""
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  async def create(
23
  self,
24
  db: AsyncSession,
@@ -29,8 +66,17 @@ class DatabaseClientService:
29
  ) -> DatabaseClient:
30
  """Register a new database client connection.
31
 
 
 
32
  Credentials are encrypted before being stored.
33
  """
 
 
 
 
 
 
 
34
  client = DatabaseClient(
35
  id=str(uuid.uuid4()),
36
  user_id=user_id,
 
16
  logger = get_logger("database_client_service")
17
 
18
 
19
+ # Fields that identify the same physical database per db_type.
20
+ _CONNECTION_IDENTITY_KEYS: dict[str, tuple[str, ...]] = {
21
+ "postgres": ("host", "port", "database"),
22
+ "supabase": ("host", "port", "database"),
23
+ "mysql": ("host", "port", "database"),
24
+ "sqlserver": ("host", "port", "database"),
25
+ "bigquery": ("project_id", "dataset_id"),
26
+ "snowflake": ("account", "warehouse", "database"),
27
+ }
28
+
29
+
30
  class DatabaseClientService:
31
  """Service for managing user-registered external database connections."""
32
 
33
+ async def _find_duplicate(
34
+ self,
35
+ db: AsyncSession,
36
+ user_id: str,
37
+ db_type: str,
38
+ credentials: dict,
39
+ ) -> Optional[DatabaseClient]:
40
+ """Return an existing client if it points to the same physical database."""
41
+ identity_keys = _CONNECTION_IDENTITY_KEYS.get(db_type, ())
42
+ if not identity_keys:
43
+ return None
44
+
45
+ result = await db.execute(
46
+ select(DatabaseClient).where(
47
+ DatabaseClient.user_id == user_id,
48
+ DatabaseClient.db_type == db_type,
49
+ )
50
+ )
51
+ for existing in result.scalars().all():
52
+ decrypted = decrypt_credentials_dict(existing.credentials)
53
+ if all(
54
+ decrypted.get(k) == credentials.get(k) for k in identity_keys
55
+ ):
56
+ return existing
57
+ return None
58
+
59
  async def create(
60
  self,
61
  db: AsyncSession,
 
66
  ) -> DatabaseClient:
67
  """Register a new database client connection.
68
 
69
+ If a connection to the same physical database already exists for this
70
+ user, the existing record is returned instead of creating a duplicate.
71
  Credentials are encrypted before being stored.
72
  """
73
+ existing = await self._find_duplicate(db, user_id, db_type, credentials)
74
+ if existing:
75
+ logger.info(
76
+ f"Duplicate connection detected, returning existing client {existing.id}"
77
+ )
78
+ return existing
79
+
80
  client = DatabaseClient(
81
  id=str(uuid.uuid4()),
82
  user_id=user_id,
src/pipeline/db_pipeline/db_pipeline_service.py CHANGED
@@ -128,7 +128,7 @@ class DbPipelineService:
128
  )
129
  return create_engine(url)
130
 
131
- raise ValueError(f"Unsupported db_type: {db_type}")
132
 
133
  @contextmanager
134
  def engine_scope(
 
128
  )
129
  return create_engine(url)
130
 
131
+ raise NotImplementedError(f"Unsupported db_type: {db_type}")
132
 
133
  @contextmanager
134
  def engine_scope(
src/pipeline/db_pipeline/extractor.py CHANGED
@@ -149,10 +149,10 @@ def profile_column(
149
  order_by="ORDER BY cnt DESC",
150
  )
151
  top = pd.read_sql(top_sql, engine)
152
- profile["top_values"] = list(zip(top[col_name].tolist(), top["cnt"].tolist()))
153
 
154
  sample = pd.read_sql(_head_query(engine, qc, qt, 5), engine)
155
- profile["sample_values"] = sample[col_name].tolist()
156
 
157
  return profile
158
 
 
149
  order_by="ORDER BY cnt DESC",
150
  )
151
  top = pd.read_sql(top_sql, engine)
152
+ profile["top_values"] = list(zip(top.iloc[:, 0].tolist(), top["cnt"].tolist()))
153
 
154
  sample = pd.read_sql(_head_query(engine, qc, qt, 5), engine)
155
+ profile["sample_values"] = sample.iloc[:, 0].tolist()
156
 
157
  return profile
158