Rifqi Hafizuddin commited on
Commit
14ec882
·
2 Parent(s): 7da6db5a531fcc

Merge branch 'dev_new' into pr/6

Browse files
src/api/v1/db_client.py CHANGED
@@ -117,11 +117,122 @@ class DatabaseClientResponse(BaseModel):
117
  model_config = {"from_attributes": True}
118
 
119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  # ---------------------------------------------------------------------------
121
  # Endpoints
122
  # ---------------------------------------------------------------------------
123
 
124
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  @router.post(
126
  "/database-clients",
127
  response_model=DatabaseClientResponse,
@@ -335,9 +446,14 @@ async def ingest_database_client(
335
  if client.user_id != user_id:
336
  raise HTTPException(status_code=403, detail="Access denied")
337
 
338
- creds = decrypt_credentials_dict(client.credentials)
 
 
 
 
339
 
340
  try:
 
341
  with db_pipeline_service.engine_scope(
342
  db_type=client.db_type,
343
  credentials=creds,
 
117
  model_config = {"from_attributes": True}
118
 
119
 
120
+ # ---------------------------------------------------------------------------
121
+ # Supported DB types registry
122
+ # ---------------------------------------------------------------------------
123
+
124
+ _DB_TYPES: List[Dict[str, Any]] = [
125
+ {
126
+ "db_type": "postgres",
127
+ "display_name": "PostgreSQL",
128
+ "logo": "postgres",
129
+ "status": "active",
130
+ "message": None,
131
+ "fields": [
132
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
133
+ {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number"},
134
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
135
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
136
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
137
+ {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["disable", "require", "verify-ca", "verify-full"]},
138
+ ],
139
+ },
140
+ {
141
+ "db_type": "mysql",
142
+ "display_name": "MySQL",
143
+ "logo": "mysql",
144
+ "status": "active",
145
+ "message": None,
146
+ "fields": [
147
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
148
+ {"name": "port", "type": "integer", "required": False, "default": 3306, "description": "Port number"},
149
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
150
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
151
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
152
+ {"name": "ssl", "type": "boolean", "required": False, "default": True, "description": "Enable SSL"},
153
+ ],
154
+ },
155
+ {
156
+ "db_type": "supabase",
157
+ "display_name": "Supabase",
158
+ "logo": "supabase",
159
+ "status": "active",
160
+ "message": None,
161
+ "fields": [
162
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Supabase database host"},
163
+ {"name": "port", "type": "integer", "required": False, "default": 5432, "description": "Port number (5432 direct, 6543 pooler)"},
164
+ {"name": "database", "type": "string", "required": False, "default": "postgres", "description": "Database name"},
165
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database user"},
166
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
167
+ {"name": "ssl_mode", "type": "select", "required": False, "default": "require", "description": "SSL mode", "options": ["require", "verify-ca", "verify-full"]},
168
+ ],
169
+ },
170
+ {
171
+ "db_type": "sqlserver",
172
+ "display_name": "SQL Server",
173
+ "logo": "sqlserver",
174
+ "status": "inactive",
175
+ "message": "Coming soon",
176
+ "fields": [
177
+ {"name": "host", "type": "string", "required": True, "default": None, "description": "Hostname or IP address"},
178
+ {"name": "port", "type": "integer", "required": False, "default": 1433, "description": "Port number"},
179
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
180
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Database username"},
181
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Database password", "sensitive": True},
182
+ {"name": "driver", "type": "string", "required": False, "default": None, "description": "ODBC driver name"},
183
+ ],
184
+ },
185
+ {
186
+ "db_type": "bigquery",
187
+ "display_name": "BigQuery",
188
+ "logo": "bigquery",
189
+ "status": "inactive",
190
+ "message": "Coming soon",
191
+ "fields": [
192
+ {"name": "project_id", "type": "string", "required": True, "default": None, "description": "GCP project ID"},
193
+ {"name": "dataset_id", "type": "string", "required": True, "default": None, "description": "BigQuery dataset name"},
194
+ {"name": "location", "type": "string", "required": False, "default": "US", "description": "Dataset location/region"},
195
+ {"name": "service_account_json", "type": "string", "required": True, "default": None, "description": "GCP Service Account key JSON", "sensitive": True},
196
+ ],
197
+ },
198
+ {
199
+ "db_type": "snowflake",
200
+ "display_name": "Snowflake",
201
+ "logo": "snowflake",
202
+ "status": "inactive",
203
+ "message": "Coming soon",
204
+ "fields": [
205
+ {"name": "account", "type": "string", "required": True, "default": None, "description": "Snowflake account identifier"},
206
+ {"name": "warehouse", "type": "string", "required": True, "default": None, "description": "Virtual warehouse name"},
207
+ {"name": "database", "type": "string", "required": True, "default": None, "description": "Database name"},
208
+ {"name": "schema", "type": "string", "required": False, "default": "PUBLIC", "description": "Schema name"},
209
+ {"name": "username", "type": "string", "required": True, "default": None, "description": "Snowflake username"},
210
+ {"name": "password", "type": "string", "required": True, "default": None, "description": "Snowflake password", "sensitive": True},
211
+ {"name": "role", "type": "string", "required": False, "default": None, "description": "Snowflake role"},
212
+ ],
213
+ },
214
+ ]
215
+
216
+
217
  # ---------------------------------------------------------------------------
218
  # Endpoints
219
  # ---------------------------------------------------------------------------
220
 
221
 
222
+ @router.get(
223
+ "/database-clients/dbtypes",
224
+ summary="List supported database types",
225
+ response_description="All database types supported by DataEyond with their connection parameters.",
226
+ )
227
+ async def list_db_types():
228
+ """
229
+ Return every database type DataEyond can connect to, along with the
230
+ credential fields the frontend should render, a logo filename, and
231
+ an active/inactive status with an optional message.
232
+ """
233
+ return _DB_TYPES
234
+
235
+
236
  @router.post(
237
  "/database-clients",
238
  response_model=DatabaseClientResponse,
 
446
  if client.user_id != user_id:
447
  raise HTTPException(status_code=403, detail="Access denied")
448
 
449
+ if client.status != "active":
450
+ raise HTTPException(
451
+ status_code=status.HTTP_409_CONFLICT,
452
+ detail="Cannot ingest from an inactive database connection.",
453
+ )
454
 
455
  try:
456
+ creds = decrypt_credentials_dict(client.credentials)
457
  with db_pipeline_service.engine_scope(
458
  db_type=client.db_type,
459
  credentials=creds,
src/config/settings.py CHANGED
@@ -63,7 +63,7 @@ class Settings(BaseSettings):
63
 
64
  # DB credential encryption (Fernet key for user-registered database creds)
65
  dataeyond_db_credential_key: str = Field(
66
- alias="dataeyond__db__credential__key", default=""
67
  )
68
 
69
 
 
63
 
64
  # DB credential encryption (Fernet key for user-registered database creds)
65
  dataeyond_db_credential_key: str = Field(
66
+ alias="dataeyond__db__credential__key"
67
  )
68
 
69
 
src/database_client/database_client_service.py CHANGED
@@ -16,9 +16,46 @@ from src.utils.db_credential_encryption import (
16
  logger = get_logger("database_client_service")
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
19
  class DatabaseClientService:
20
  """Service for managing user-registered external database connections."""
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  async def create(
23
  self,
24
  db: AsyncSession,
@@ -29,8 +66,17 @@ class DatabaseClientService:
29
  ) -> DatabaseClient:
30
  """Register a new database client connection.
31
 
 
 
32
  Credentials are encrypted before being stored.
33
  """
 
 
 
 
 
 
 
34
  client = DatabaseClient(
35
  id=str(uuid.uuid4()),
36
  user_id=user_id,
 
16
  logger = get_logger("database_client_service")
17
 
18
 
19
+ # Fields that identify the same physical database per db_type.
20
+ _CONNECTION_IDENTITY_KEYS: dict[str, tuple[str, ...]] = {
21
+ "postgres": ("host", "port", "database"),
22
+ "supabase": ("host", "port", "database"),
23
+ "mysql": ("host", "port", "database"),
24
+ "sqlserver": ("host", "port", "database"),
25
+ "bigquery": ("project_id", "dataset_id"),
26
+ "snowflake": ("account", "warehouse", "database"),
27
+ }
28
+
29
+
30
  class DatabaseClientService:
31
  """Service for managing user-registered external database connections."""
32
 
33
+ async def _find_duplicate(
34
+ self,
35
+ db: AsyncSession,
36
+ user_id: str,
37
+ db_type: str,
38
+ credentials: dict,
39
+ ) -> Optional[DatabaseClient]:
40
+ """Return an existing client if it points to the same physical database."""
41
+ identity_keys = _CONNECTION_IDENTITY_KEYS.get(db_type, ())
42
+ if not identity_keys:
43
+ return None
44
+
45
+ result = await db.execute(
46
+ select(DatabaseClient).where(
47
+ DatabaseClient.user_id == user_id,
48
+ DatabaseClient.db_type == db_type,
49
+ )
50
+ )
51
+ for existing in result.scalars().all():
52
+ decrypted = decrypt_credentials_dict(existing.credentials)
53
+ if all(
54
+ decrypted.get(k) == credentials.get(k) for k in identity_keys
55
+ ):
56
+ return existing
57
+ return None
58
+
59
  async def create(
60
  self,
61
  db: AsyncSession,
 
66
  ) -> DatabaseClient:
67
  """Register a new database client connection.
68
 
69
+ If a connection to the same physical database already exists for this
70
+ user, the existing record is returned instead of creating a duplicate.
71
  Credentials are encrypted before being stored.
72
  """
73
+ existing = await self._find_duplicate(db, user_id, db_type, credentials)
74
+ if existing:
75
+ logger.info(
76
+ f"Duplicate connection detected, returning existing client {existing.id}"
77
+ )
78
+ return existing
79
+
80
  client = DatabaseClient(
81
  id=str(uuid.uuid4()),
82
  user_id=user_id,
src/pipeline/db_pipeline/db_pipeline_service.py CHANGED
@@ -128,7 +128,7 @@ class DbPipelineService:
128
  )
129
  return create_engine(url)
130
 
131
- raise ValueError(f"Unsupported db_type: {db_type}")
132
 
133
  @contextmanager
134
  def engine_scope(
 
128
  )
129
  return create_engine(url)
130
 
131
+ raise NotImplementedError(f"Unsupported db_type: {db_type}")
132
 
133
  @contextmanager
134
  def engine_scope(
src/pipeline/db_pipeline/extractor.py CHANGED
@@ -149,10 +149,10 @@ def profile_column(
149
  order_by="ORDER BY cnt DESC",
150
  )
151
  top = pd.read_sql(top_sql, engine)
152
- profile["top_values"] = list(zip(top[col_name].tolist(), top["cnt"].tolist()))
153
 
154
  sample = pd.read_sql(_head_query(engine, qc, qt, 5), engine)
155
- profile["sample_values"] = sample[col_name].tolist()
156
 
157
  return profile
158
 
 
149
  order_by="ORDER BY cnt DESC",
150
  )
151
  top = pd.read_sql(top_sql, engine)
152
+ profile["top_values"] = list(zip(top.iloc[:, 0].tolist(), top["cnt"].tolist()))
153
 
154
  sample = pd.read_sql(_head_query(engine, qc, qt, 5), engine)
155
+ profile["sample_values"] = sample.iloc[:, 0].tolist()
156
 
157
  return profile
158