nl2sql-copilot / app /services /nl2sql_service.py
github-actions[bot]
Sync from GitHub main
82e122c
from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from typing import Any, Optional
from pathlib import Path
from nl2sql.pipeline import FinalResult
from nl2sql.pipeline_factory import pipeline_from_config_with_adapter
from adapters.db.sqlite_adapter import SQLiteAdapter
from adapters.db.postgres_adapter import PostgresAdapter
from app import state
from app.settings import Settings
from app.errors import (
AppError,
DbNotFound,
SchemaRequired,
SchemaDeriveError,
PipelineConfigError,
PipelineRunError,
)
Adapter = Any # You can replace this with a Protocol later
@dataclass
class NL2SQLService:
"""
Application-level service for the NL2SQL use-case.
Responsibilities:
- Choose the right DB adapter based on db_mode + db_id.
- Derive or accept schema preview.
- Build and run the pipeline for a given query.
"""
settings: Settings
def _select_adapter(self, db_id: Optional[str]) -> Adapter:
mode = self.settings.db_mode.lower()
if mode == "postgres":
dsn = (self.settings.postgres_dsn or "").strip()
if not dsn:
raise PipelineConfigError("Postgres DSN is not configured")
return PostgresAdapter(dsn=dsn)
if db_id:
state.cleanup_stale_dbs()
path = state.get_db_path(db_id)
if not path:
raise DbNotFound(f"Could not resolve DB for db_id={db_id!r}")
return SQLiteAdapter(path=path)
default_path = self.settings.default_sqlite_path
if not Path(default_path).exists():
raise DbNotFound(f"SQLite database path does not exist: {default_path!r}")
return SQLiteAdapter(path=default_path)
def _introspect_sqlite_schema(self, adapter: Adapter) -> str:
"""
Build a lightweight textual schema preview for a SQLite database.
This is a straight port of the previous sqlite3 logic, but contained
inside the service instead of the router.
"""
# Try to locate the underlying .db path from the adapter
db_path = getattr(adapter, "db_path", None) or getattr(adapter, "path", None)
if not db_path:
raise RuntimeError(
"SQLite adapter must expose a .db_path or .path attribute"
)
if not Path(db_path).exists():
raise FileNotFoundError(f"SQLite database path does not exist: {db_path}")
lines: list[str] = []
conn = sqlite3.connect(db_path)
try:
cur = conn.cursor()
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name NOT LIKE 'sqlite_%' ORDER BY name"
)
tables = [row[0] for row in cur.fetchall()]
for table in tables:
cur.execute(f"PRAGMA table_info({table})")
cols = [row[1] for row in cur.fetchall()]
if cols:
lines.append(f"{table}({', '.join(cols)})")
finally:
conn.close()
return "\n".join(lines)
def get_schema_preview(
self,
db_id: Optional[str],
override: Optional[str],
) -> str:
"""
Decide which schema preview to use.
- If override is provided by the client β†’ use it.
- Else, in sqlite mode β†’ introspect the DB.
- In postgres mode without override β†’ fail fast, the caller can map
this to a proper HTTP error.
"""
if override:
return override
mode = self.settings.db_mode.lower()
if mode == "postgres":
raise SchemaRequired("schema_preview is required in postgres mode")
try:
adapter = self._select_adapter(db_id)
return self._introspect_sqlite_schema(adapter)
except DbNotFound:
raise
except Exception as exc:
raise SchemaDeriveError("failed to derive schema preview") from exc
def run_query(
self,
*,
query: str,
db_id: Optional[str],
schema_preview: str,
) -> FinalResult:
"""Build a pipeline for the given DB and run the query through it."""
try:
adapter = self._select_adapter(db_id)
except AppError:
raise
except Exception as exc:
raise PipelineRunError("failed to select adapter") from exc
try:
pipeline = pipeline_from_config_with_adapter(
self.settings.pipeline_config_path,
adapter=adapter,
)
except FileNotFoundError as exc:
raise PipelineConfigError(
f"Pipeline config not found at {self.settings.pipeline_config_path!r}"
) from exc
except Exception as exc:
raise PipelineConfigError(
f"Failed to build pipeline from {self.settings.pipeline_config_path!r}: {exc}"
) from exc
try:
result = pipeline.run(user_query=query, schema_preview=schema_preview)
except AppError:
raise
except Exception as exc:
raise PipelineRunError("pipeline crashed during execution") from exc
return result