apigateway / services /db_service /query_service.py
jebin2's picture
db services
50c20bf
"""
QueryService - Main entry point for database operations.
Factory class that provides access to SelectQuery, UpdateQuery, and DeleteQuery.
All query types inherit from BaseQuery for shared filtering logic.
"""
import os
from sqlalchemy.ext.asyncio import AsyncSession
from core.models import User
from services.db_service.select_query import SelectQuery
from services.db_service.update_query import UpdateQuery
from services.db_service.delete_query import DeleteQuery
class QueryService:
"""
Main query service factory.
Provides access to:
- SelectQuery: Read operations (SELECT)
- UpdateQuery: Update operations (UPDATE)
- DeleteQuery: Soft delete operations (soft DELETE)
Usage:
qs = QueryService(user, db)
# SELECT
jobs = await qs.select().execute(select(GeminiJob))
# UPDATE
await qs.update().update(Job, {...}, id=123)
# DELETE
await qs.delete().soft_delete(Job, id=123)
# Check admin status
if qs.is_admin():
# Admin-only logic
"""
def __init__(self, user: User, db: AsyncSession, is_system: bool = False):
"""
Initialize QueryService.
Args:
user: Current authenticated user (or system user for background ops)
db: Database session
is_system: True if this is a system/background operation (OAuth, webhooks, workers)
"""
self.user = user
self.db = db
self.is_system = is_system
def select(self) -> SelectQuery:
"""Get SelectQuery instance for read operations."""
return SelectQuery(self.user, self.db, self.is_system)
def update(self) -> UpdateQuery:
"""Get UpdateQuery instance for update operations."""
return UpdateQuery(self.user, self.db, self.is_system)
def delete(self) -> DeleteQuery:
"""Get DeleteQuery instance for soft delete operations."""
return DeleteQuery(self.user, self.db, self.is_system)
def get_query_service(user: User, db: AsyncSession) -> QueryService:
"""
Factory function to create QueryService.
Can be used as a FastAPI dependency:
from services.db_service import get_query_service
qs: QueryService = Depends(lambda user=Depends(get_current_user), db=Depends(get_db): get_query_service(user, db))
"""
return QueryService(user, db)