Spaces:
Runtime error
Runtime error
| from sqlalchemy import select, delete, update | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from utils.error_handlers import handle_error, not_found_error, no_entries_found, handle_exception | |
| from fastapi.responses import JSONResponse | |
| from typing import List, Type, Optional | |
| class BaseQuery: | |
| def __init__(self, user): | |
| self.user = user | |
| self.user_id = user.get("id") | |
| def _fetch(self, db, query, not_found_message, multiple: bool = False): | |
| """Fetch a single or multiple results based on the 'multiple' flag.""" | |
| try: | |
| if multiple: | |
| results = db.execute(query).all() | |
| if not results: | |
| return [] | |
| return results | |
| else: | |
| result = db.execute(query).scalar_one_or_none() | |
| if not result: | |
| return [] | |
| return result | |
| except Exception as e: | |
| return handle_error( | |
| e, | |
| "Failed to fetch entry" if not multiple else "Failed to fetch entries", | |
| ) | |
| def _handle_commit(self, db): | |
| try: | |
| db.commit() | |
| except SQLAlchemyError as e: | |
| db.rollback() | |
| return handle_exception(e) | |
| except Exception as e: | |
| db.rollback() | |
| return handle_error(e, "Operation failed") | |
| def add(self, db, instance): | |
| """Add a new entry.""" | |
| db.add(instance) | |
| return self._handle_commit(db) | |
| def insert_entries(self, db, entries): | |
| """Insert multiple entries.""" | |
| db.add_all(entries) | |
| return self._handle_commit(db) | |
| def delete(self, db, model, id=None, filter_conditions=None): | |
| """Delete an entry by ID with optional filter conditions.""" | |
| # Build the query to select the entry | |
| query = select(model) | |
| if id: | |
| query = query.where(model.id == id) | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| # Fetch the entry | |
| entry = self._fetch(db, query, f"Entry with ID {id} not found.", multiple=False) | |
| if isinstance(entry, JSONResponse): | |
| return entry | |
| # Build the delete query | |
| delete_query = delete(model).where(model.id == id) | |
| if filter_conditions: | |
| delete_query = delete_query.where(*filter_conditions) | |
| # Execute the delete query and commit | |
| db.execute(delete_query) | |
| return self._handle_commit(db) | |
| def delete_all(self, db, model, filter_conditions=None): | |
| """Delete all entries or based on filters.""" | |
| query = delete(model) | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| db.execute(query) | |
| return self._handle_commit(db) | |
| def update(self, db, model, id, update_data, filter_conditions=None): | |
| """Update an entry by ID.""" | |
| # Define the initial query to fetch the entry | |
| query = select(model).where(model.id == id) | |
| # Append additional filter conditions if provided | |
| if filter_conditions: | |
| query = query.where(model.id == id, *filter_conditions) | |
| # Attempt to fetch the entry | |
| not_found_message = f"Entry with ID {id} not found." | |
| entry = self._fetch(db, query, not_found_message, multiple=False) | |
| # Check if the entry was found | |
| if isinstance(entry, JSONResponse): | |
| return entry | |
| # Prepare the update statement | |
| stmt = update(model).where(model.id == id).values(update_data) | |
| db.execute(stmt) | |
| # If filter conditions were provided, apply them to the update as well | |
| if filter_conditions: | |
| filter_stmt = update(model).where(model.id == id, *filter_conditions).values(update_data) | |
| db.execute(filter_stmt) | |
| return self._handle_commit(db) | |
| def update_entries(self, db, model, update_data, filter_conditions=None): | |
| """Update multiple entries with optional filtering.""" | |
| query = select(model) | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| not_found_message = "No entries found matching the filter conditions." | |
| results = self._fetch( | |
| db, query, not_found_message , multiple=True | |
| ) | |
| if isinstance(results, JSONResponse): | |
| return results | |
| db.execute(update(model).where(*filter_conditions).values(update_data)) | |
| return self._handle_commit(db) | |
| def get( | |
| self, | |
| db, | |
| model: Type = None, | |
| id: Optional[int] = None, | |
| filter_conditions=None, | |
| columns: Optional[List[str]] = None, | |
| multiple: bool = False, | |
| ): | |
| """Get one or multiple entries, filtered by ID or conditions.""" | |
| if columns: | |
| query = select(*columns) | |
| else: | |
| query = select(model) | |
| if id: | |
| query = query.where(model.id == id) | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| return self._fetch( | |
| db, | |
| query, | |
| "Entry not found." if not multiple else "No entries found.", | |
| multiple=multiple, | |
| ) | |
| def get_with_joins( | |
| self, | |
| db, | |
| join_models: List[Type], | |
| join_conditions: List=None, | |
| model: Type=None, | |
| filter_conditions=None, | |
| columns: Optional[List[str]] = None, | |
| multiple: bool = False, | |
| ): | |
| """Get one or multiple entries with joins and optional filters.""" | |
| if columns: | |
| query = select(*columns) | |
| else: | |
| query = select(model, *join_models).select_from(model) | |
| # Apply joins | |
| if join_conditions: | |
| for join_model, join_condition in zip(join_models, join_conditions): | |
| query = query.join(join_model, join_condition) | |
| else: | |
| query = query.join(*join_models) | |
| # Apply filtering by user ID and optional conditions | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| return self._fetch( | |
| db, | |
| query, | |
| "Entry not found." if not multiple else "No entries found.", | |
| multiple=multiple, | |
| ) | |
| def get_columns( | |
| self, | |
| db, | |
| columns: List[str], | |
| model=None, | |
| filter_conditions=None, | |
| id: Optional[int] = None, | |
| multiple: bool = False, | |
| ): | |
| """Get specific columns by ID or filtering.""" | |
| query = select(*columns).select_from(model) | |
| if id: | |
| query = query.where(model.id == id) | |
| if filter_conditions: | |
| query = query.where(*filter_conditions) | |
| return self._fetch( | |
| db, | |
| query, | |
| "Entry not found." if not multiple else "No entries found.", | |
| multiple=multiple, | |
| ) | |