import os import psycopg2 import psycopg2.extras import json import re from logging import getLogger from fastapi import HTTPException from dotenv import load_dotenv from pydantic import BaseModel from typing import List from logging_log.logger import logger # Load environment variables from the .env file load_dotenv() #logger = getLogger(__name__) # Pydantic model class UserCreate(BaseModel): user_id: str password: str ad_groups: List[str] is_admin: bool = False # Database connection function def get_db_connection(): try: # Create the connection conn = psycopg2.connect( host=os.getenv("HOSTNAME"), database=os.getenv("DATABASE"), user=os.getenv("DB_USERNAME"), password=os.getenv("DB_PASSWORD"), port=os.getenv("PORT") ) logger.info("Database connection established.") return conn except Exception as e: logger.error(f"Error connecting to database: {str(e)}") raise e # Validate user ID def validate_user_id(user_id: str): if not re.match(r'^[a-zA-Z0-9]{5,}$', user_id): raise HTTPException(status_code=400, detail="Invalid user ID format") if not user_id: raise HTTPException(status_code=400, detail="User ID cannot be empty") # Validate notebook ID def validate_notebook_id(notebook_id: str): if not notebook_id.isdigit(): raise HTTPException(status_code=406, detail="Invalid notebook ID format") # Create a new user in the DB def create_user(user_id: str, password: str, ad_groups: List[str], is_admin: bool): insert_query = """ INSERT INTO users (user_id, password, ad_groups, is_admin) VALUES (%s, %s, %s, %s) """ try: with get_db_connection() as conn: with conn.cursor() as curr: curr.execute(insert_query, (user_id, password, ad_groups, is_admin)) conn.commit() logger.info(f"User {user_id} created") except Exception as e: conn.rollback() logger.error(f"Error creating new user {user_id}") # Update an existing user in the DB def update_user(user_id: str, password: str, ad_groups: List[str], is_admin: bool): update_query = """ UPDATE users SET password = %s, ad_groups = %s, is_admin = %s WHERE user_id = %s """ try: with get_db_connection() as conn: with conn.cursor() as curr: curr.execute(update_query, (password, ad_groups, is_admin, user_id)) conn.commit() logger.info(f"User {user_id} updated") except Exception as e: conn.rollback() logger.error(f"Error updating user {user_id}: {str(e)}") raise # delete a user in the DB def delete_user(user_id: str): delete_query = """ DELETE FROM users WHERE user_id = %s; """ try: with get_db_connection() as conn: with conn.cursor() as curr: curr.execute(delete_query, (user_id, )) conn.commit() logger.info(f"User {user_id} deleted") except Exception as e: conn.rollback() logger.error(f"Error deleting user {user_id}: {str(e)}") raise def get_user_credentials(user_id: str): select_query = """ SELECT * FROM users WHERE user_id = %s; """ try: with get_db_connection() as conn: with conn.cursor() as curr: curr.execute(select_query, (user_id,)) user = curr.fetchone() if user: logger.info(f"User {user_id} fetched") return { "user_id": user[0], "password": user[1], "ad_groups": user[2], "is_admin": user[3] } else: logger.warning(f"User {user_id} not found") return {"error": "User not found"} except Exception as e: logger.error(f"Error fetching user {user_id}: {str(e)}") raise # Fetch user details def get_user_details(User_ID): validate_user_id(User_ID) conn = get_db_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute( "SELECT ad_groups, is_admin FROM users WHERE user_id=%s", (User_ID, ) ) user_row = cursor.fetchone() if not user_row: raise HTTPException(status_code=404, detail="User not found") if user_row: ad_groups, is_admin = user_row cursor.execute("SELECT id, name, files, permission FROM efforts") efforts_data = cursor.fetchall() permissions = { "permissions": [], "efforts": [], "notebooks": {}, "ezones": {}, "files": {} } """ if ad_group in user_permissions.get("can_create_effort", []): permissions["permissions"].append("can_create_effort") if ad_group in user_permissions.get("can_view_effort", []): permissions["permissions"].append("can_view_effort") if ad_group in user_permissions.get("can_revoke_effort", []): permissions["permissions"].append("can_revoke_effort") if ad_group in user_permissions.get("can_modify_effort", []): permissions["permissions"].append("can_modify_effort") """ if is_admin: permissions["permissions"].append("can_create_efforts") permissions["permissions"].append("can_view_efforts") permissions["permissions"].append("can_revoke_efforts") permissions["permissions"].append("can_modify_efforts") for effort in efforts_data: effort_id = effort["id"] effort_name = effort["name"] files = effort["files"] if effort["files"] else [] permission_json = effort["permission"] effort_permissions = [] if "can_view_effort" not in effort_permissions: effort_permissions.append("can_view_effort") permissions["efforts"].append({"name": effort_name, "id": effort_id}) permissions["files"][effort_id] = files cursor.execute("SELECT notebook_id, notebook_name FROM notebook_details WHERE effort_id=%s", (effort_id,)) notebook_details = cursor.fetchall() permissions["notebooks"][effort_id] = [{"name": note["notebook_name"], "id": note["notebook_id"]} for note in notebook_details] cursor.execute("SELECT id, name FROM zones WHERE effort_id=%s", (effort_id,)) zones = cursor.fetchall() permissions["ezones"][effort_id] = [{"name": zone["name"], "id": zone["id"]} for zone in zones] if "can_create_zones" not in effort_permissions: effort_permissions.append("can_create_zones") if "can_modify_zones" not in effort_permissions: effort_permissions.append("can_modify_zones") if "can_delete_zones" not in effort_permissions: effort_permissions.append("can_delete_zones") if "can_create_notebooks" not in effort_permissions: effort_permissions.append("can_create_notebooks") if "can_delete_notebooks" not in effort_permissions: effort_permissions.append("can_delete_notebooks") if effort_permissions: permissions["permissions"].append({effort_id: effort_permissions}) else: permissions["permissions"].append("can_view_efforts") for effort in efforts_data: effort_id = effort["id"] effort_name = effort["name"] files = effort["files"] if effort["files"] else [] permission_json = effort["permission"] effort_permissions = [] """ if ad_group in permission_json.get("can_create_zones", []): effort_permissions.append("can_create_zones") if ad_group in permission_json.get("can_modify_zones", []): effort_permissions.append("can_modify_zones") if ad_group in permission_json.get("can_delete_zones", []): effort_permissions.append("can_delete_zones") if ad_group in permission_json.get("can_create_notebooks", []): effort_permissions.append("can_create_notebooks") if ad_group in permission_json.get("can_delete_notebooks", []): effort_permissions.append("can_delete_notebooks") """ for ad_group in ad_groups: if ad_group in permission_json.get("can_view_effort", []): if "can_view_effort" not in effort_permissions: effort_permissions.append("can_view_effort") permissions["efforts"].append({"name": effort_name, "id": effort_id}) permissions["files"][effort_id] = files cursor.execute("SELECT notebook_id, notebook_name FROM notebook_details WHERE effort_id=%s", (effort_id,)) notebook_details = cursor.fetchall() permissions["notebooks"][effort_id] = [{"name": note["notebook_name"], "id": note["notebook_id"]} for note in notebook_details] cursor.execute("SELECT id, name FROM zones WHERE effort_id=%s", (effort_id,)) zones = cursor.fetchall() permissions["ezones"][effort_id] = [{"name": zone["name"], "id": zone["id"]} for zone in zones] if ad_group in permission_json.get("can_create_and_modify_zones", []): if "can_create_zones" not in effort_permissions: effort_permissions.append("can_create_zones") if "can_modify_zones" not in effort_permissions: effort_permissions.append("can_modify_zones") if "can_delete_zones" not in effort_permissions: effort_permissions.append("can_delete_zones") if ad_group in permission_json.get("can_create_and_modify_notebooks", []): if "can_create_notebooks" not in effort_permissions: effort_permissions.append("can_create_notebooks") if "can_delete_notebooks" not in effort_permissions: effort_permissions.append("can_delete_notebooks") if effort_permissions: permissions["permissions"].append({effort_id: effort_permissions}) return { "permissions": permissions["permissions"], "efforts": permissions["efforts"], "notebooks": permissions["notebooks"], "ezones": permissions["ezones"], "files": permissions["files"] } except Exception as e: logger.error(f"Error fetching user details: {str(e)}") raise e finally: cursor.close() conn.close() # Fetch notebook details def get_notebook_details(notebook_id): validate_notebook_id(notebook_id) conn = get_db_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute( "SELECT notebook_id, notebook_name, chat_history, created_at, effort_id FROM notebook_details WHERE notebook_id=%s", (notebook_id,) ) notebook = cursor.fetchone() if not notebook: raise HTTPException(status_code=404, detail="Notebook not found") cursor.execute("SELECT name FROM efforts WHERE id=%s", (notebook["effort_id"],)) effort = cursor.fetchone() return { "id": notebook["notebook_id"], "name": notebook["notebook_name"], "chat": notebook["chat_history"], "Created at": notebook["created_at"], "effort_name": effort["name"] if effort else None } except Exception as e: logger.error(f"Error fetching notebook details: {str(e)}") raise HTTPException(status_code=500, detail=f"Error fetching notebook details: {str(e)}") finally: cursor.close() conn.close() # Effort Management # Create effort def create_effort(name, permission): conn = None cursor = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO efforts (name, permission) VALUES (%s, %s)", (name, psycopg2.extras.Json(permission)) ) conn.commit() logger.info(f"Effort '{name}' created.") except Exception as e: if conn: conn.rollback() logger.error(f"Error creating effort '{name}': {str(e)}") raise e finally: if cursor: cursor.close() if conn: conn.close() # Delete effort def delete_effort(effort_id): conn = None cursor = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM efforts WHERE id = %s", (effort_id,)) conn.commit() if cursor.rowcount == 0: raise ValueError(f"Effort with ID {effort_id} not found") logger.info(f"Effort with ID {effort_id} deleted successfully.") except Exception as e: if conn: conn.rollback() logger.error(f"Error deleting effort with ID {effort_id}: {str(e)}") raise e finally: if cursor: cursor.close() if conn: conn.close() # Modify effort def modify_effort(effort_id, name, permission): conn = None cursor = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "UPDATE efforts SET name = %s, permission = %s WHERE id = %s", (name, psycopg2.extras.Json(permission), effort_id) ) conn.commit() if cursor.rowcount == 0: raise ValueError(f"Effort with ID {effort_id} not found") logger.info(f"Effort with ID {effort_id} updated successfully.") except Exception as e: if conn: conn.rollback() logger.error(f"Error modifying effort with ID {effort_id}: {str(e)}") raise e finally: if cursor: cursor.close() if conn: conn.close() # List effort details def list_effort_details(effort_id): conn = None cursor = None try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute("SELECT * FROM efforts WHERE id = %s", (effort_id,)) effort_details = cursor.fetchone() if effort_details: logger.info(f"Effort details fetched for ID {effort_id}") return dict(effort_details) else: raise ValueError(f"Effort with ID {effort_id} not found") except Exception as e: logger.error(f"Error fetching effort details for ID {effort_id}: {str(e)}") raise e finally: if cursor: cursor.close() if conn: conn.close() # Zone Management # Delete experiment zone def delete_experiment_zone(zone_id: int): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM zones WHERE id = %s", (zone_id,)) conn.commit() if cursor.rowcount == 0: raise ValueError(f"Zone with ID {zone_id} not found") logger.info(f"Experiment zone with ID {zone_id} deleted successfully.") except Exception as e: logger.error(f"Error while deleting experiment zone: {str(e)}") raise e finally: cursor.close() conn.close() # Fetch zone details def get_zone_details(zone_id: int): try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute("SELECT id, name, config, created_at FROM zones WHERE id = %s", (zone_id,)) zone = cursor.fetchone() if not zone: logger.warning(f"No experiment zone found with ID {zone_id}.") raise ValueError(f"Zone with ID {zone_id} not found") logger.info(f"Details for experiment zone ID {zone_id} fetched successfully.") return zone except Exception as e: logger.error(f"Error while fetching zone details: {str(e)}") raise e finally: cursor.close() conn.close() # Create experiment zone def create_experiment_zone(effort_id: int, name: str, vector_db: str, llm_model: str, chunking_strategy: str, embedding_model: str, indexer: str, top_k: int, overlap: int, chunk_size: int): try: conn = get_db_connection() cursor = conn.cursor() if not name: raise HTTPException(status_code=400, detail="Zone name cannot be empty") config = { "llm_model": llm_model, "embedding_model": embedding_model, "vector_db": vector_db, "chunking_strategy": chunking_strategy, "indexer": indexer, "top_k": top_k, "overlap": overlap, "chunk_size": chunk_size } cursor.execute( "INSERT INTO zones (effort_id, name, config) VALUES (%s, %s, %s) RETURNING id", (effort_id, name, json.dumps(config)) ) zone_id = cursor.fetchone()[0] conn.commit() logger.info(f"Experiment zone '{name}' created with ID {zone_id}.") return {"zone_id": zone_id} except Exception as e: logger.error(f"Error while creating experiment zone: {str(e)}") raise e finally: cursor.close() conn.close() # Modify experiment zone def modify_experiment_zone(zone_id: int, name: str, vector_db: str, llm_model: str, chunking_strategy: str, embedding_model: str, indexer: str, top_k: int, overlap: int, chunk_size: int): try: conn = get_db_connection() cursor = conn.cursor() if not name: raise HTTPException(status_code=400, detail="Zone name cannot be empty") cursor.execute("SELECT effort_id FROM zones WHERE id = %s", (zone_id,)) if cursor.rowcount == 0: raise ValueError(f"Experiment zone with ID {zone_id} not found") effort_id = cursor.fetchone()[0] config = { "llm_model": llm_model, "embedding_model": embedding_model, "vector_db": vector_db, "chunking_strategy": chunking_strategy, "indexer": indexer, "top_k": top_k, "overlap": overlap, "chunk_size": chunk_size } config_json = json.dumps(config) cursor.execute( "INSERT INTO zones (effort_id, name, config) VALUES (%s, %s, %s) RETURNING id", (effort_id, name, config_json) ) new_zone_id = cursor.fetchone()[0] cursor.execute(""" DELETE FROM zones WHERE id = %s """, (zone_id,)) cursor.execute(""" DELETE FROM chunk_embeddings WHERE zone_id = %s """, (zone_id,)) cursor.execute(""" DELETE FROM file_chunks WHERE zone_id = %s """, (zone_id,)) conn.commit() logger.info(f"Experiment zone with ID {zone_id} modified successfully. New zone ID is {new_zone_id}") return new_zone_id except Exception as e: logger.error(f"Error while modifying experiment zone: {str(e)}") raise e finally: cursor.close() conn.close() -----------------------------------------------------------------------------