Spaces:
Sleeping
Sleeping
| from sqlalchemy.orm import Session | |
| from .database import Base, KeyCategory, APIKey # Import models directly from database module | |
| # from . import schemas # Schemas will be created later | |
| # --- CRUD for KeyCategory --- | |
| def create_key_category(db: Session, name: str, type: str, tags: list = None, metadata: dict = None): | |
| db_category = KeyCategory(name=name, type=type, tags=tags, metadata_=metadata) | |
| db.add(db_category) | |
| db.commit() | |
| db.refresh(db_category) | |
| return db_category | |
| def get_key_category(db: Session, category_id: int): | |
| return db.query(KeyCategory).filter(KeyCategory.id == category_id).first() | |
| def get_key_category_by_name(db: Session, name: str): | |
| return db.query(KeyCategory).filter(KeyCategory.name == name).first() | |
| def get_key_categories(db: Session, skip: int = 0, limit: int = 100): | |
| return db.query(KeyCategory).offset(skip).limit(limit).all() | |
| def update_key_category(db: Session, category_id: int, name: str = None, type: str = None, tags: list = None, metadata: dict = None): | |
| db_category = get_key_category(db, category_id) | |
| if db_category: | |
| if name is not None: | |
| db_category.name = name | |
| if type is not None: | |
| db_category.type = type | |
| if tags is not None: | |
| db_category.tags = tags | |
| if metadata is not None: | |
| db_category.metadata_ = metadata | |
| db.commit() | |
| db.refresh(db_category) | |
| return db_category | |
| def delete_key_category(db: Session, category_id: int): | |
| db_category = get_key_category(db, category_id) | |
| if db_category: | |
| db.delete(db_category) | |
| db.commit() | |
| return db_category | |
| # --- CRUD for APIKey --- | |
| def create_api_key(db: Session, value: str, category_id: int, status: str = "active", metadata: dict = None): | |
| db_key = APIKey(value=value, category_id=category_id, status=status, metadata_=metadata) | |
| db.add(db_key) | |
| db.commit() | |
| db.refresh(db_key) | |
| return db_key | |
| def get_api_key(db: Session, key_id: int): | |
| return db.query(APIKey).filter(APIKey.id == key_id).first() | |
| def get_api_keys(db: Session, category_id: int = None, status: str = None, skip: int = 0, limit: int = 100): | |
| query = db.query(APIKey) | |
| if category_id is not None: | |
| query = query.filter(APIKey.category_id == category_id) | |
| if status is not None: | |
| query = query.filter(APIKey.status == status) | |
| return query.offset(skip).limit(limit).all() | |
| def update_api_key(db: Session, key_id: int, value: str = None, category_id: int = None, status: str = None, metadata: dict = None): | |
| db_key = get_api_key(db, key_id) | |
| if db_key: | |
| if value is not None: | |
| db_key.value = value | |
| if category_id is not None: | |
| db_key.category_id = category_id | |
| if status is not None: | |
| db_key.status = status | |
| if metadata is not None: | |
| db_key.metadata_ = metadata | |
| db.commit() | |
| db.refresh(db_key) | |
| return db_key | |
| def delete_api_key(db: Session, key_id: int): | |
| db_key = get_api_key(db, key_id) | |
| if db_key: | |
| db.delete(db_key) | |
| db.commit() | |
| return db_key | |
| # --- Key Selection Logic (Basic Placeholder) --- | |
| # This will be expanded later based on selection strategy (round-robin, etc.) | |
| def get_available_keys_for_category(db: Session, category_id: int): | |
| """Get all active keys for a given category.""" | |
| return db.query(APIKey).filter( | |
| APIKey.category_id == category_id, | |
| APIKey.status == "active" | |
| ).all() | |
| # Placeholder for selection strategy - will need to implement round-robin, etc. | |
| def select_key_from_pool(db: Session, category_id: int): | |
| """Select one key from the available pool for a category.""" | |
| available_keys = get_available_keys_for_category(db, category_id) | |
| if not available_keys: | |
| return None # No keys available | |
| # Basic selection: just return the first active key for now | |
| # TODO: Implement round-robin or other selection strategy | |
| selected_key = available_keys[0] | |
| # Optional: Update usage stats (e.g., increment usage_count, update last_used) | |
| # selected_key.usage_count += 1 | |
| # selected_key.last_used = func.now() | |
| # db.commit() | |
| # db.refresh(selected_key) | |
| return selected_key | |