Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Security, Query, status, Request | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from fastapi.security import APIKeyHeader | |
| import openai | |
| from pydantic import BaseModel | |
| from uuid import UUID | |
| import os | |
| import logging | |
| import json | |
| import regex as re | |
| from datetime import datetime | |
| from app.user import User | |
| from typing import List, Optional, Callable | |
| from openai import OpenAI | |
| import psycopg2 | |
| from psycopg2 import sql | |
| import os | |
| from app.utils import get_api_key, get_user_info, get_growth_guide_session, pop_cache, print_log, update_user, upload_file_to_s3, get_user, upload_mementos_to_db, get_user_summary, get_user_life_status, get_life_score | |
| from dotenv import load_dotenv | |
| import logging.config | |
| import time | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| import sys | |
| load_dotenv() | |
| # Create required folders | |
| os.makedirs('logs', exist_ok=True) | |
| os.makedirs(os.path.join('logs', 'users'), exist_ok=True) | |
| if not os.path.exists(os.path.join('users', 'data')): | |
| os.makedirs(os.path.join('users', 'data')) | |
| else: | |
| # Folder exists, we want to clear all current user data | |
| for file in os.listdir(os.path.join('users', 'data')): | |
| os.remove(os.path.join('users', 'data', file)) | |
| if not os.path.exists(os.path.join('users', 'to_upload')): | |
| os.makedirs(os.path.join('users', 'to_upload')) | |
| if not os.path.exists(os.path.join('mementos', 'to_upload')): | |
| os.makedirs(os.path.join('mementos', 'to_upload')) | |
| # Custom filter for user-specific logs | |
| class UserFilter(logging.Filter): | |
| def filter(self, record): | |
| return hasattr(record, 'user_id') and record.user_id != "no-user" | |
| class NoUserFilter(logging.Filter): | |
| def filter(self, record): | |
| return not (hasattr(record, 'user_id') and record.user_id != "no-user") | |
| class UserLogHandler(logging.Handler): | |
| def __init__(self, **kwargs): | |
| super().__init__() | |
| self.base_path = kwargs.get('base_path', 'logs/users') | |
| self.maxBytes = kwargs.get('maxBytes', 10485760) | |
| self.backupCount = kwargs.get('backupCount', 3) | |
| self.handlers = {} | |
| # Ensure base path exists | |
| os.makedirs(self.base_path, exist_ok=True) | |
| def emit(self, record): | |
| if hasattr(record, 'user_id') and record.user_id != "no-user": | |
| # Remove brackets from filename | |
| if record.user_id: | |
| user_id = record.user_id.strip('[]').strip() | |
| else: | |
| user_id = "no-user" | |
| if user_id not in self.handlers: | |
| handler = logging.handlers.RotatingFileHandler( | |
| filename=os.path.join(self.base_path, f'{user_id}.log'), | |
| maxBytes=self.maxBytes, | |
| backupCount=self.maxBytes, | |
| encoding='utf-8' | |
| ) | |
| formatter = logging.Formatter('%(asctime)s [%(levelname)s] [%(endpoint)s] [%(user_id)s]: %(message)s') | |
| handler.setFormatter(formatter) | |
| self.handlers[user_id] = handler | |
| try: | |
| self.handlers[user_id].emit(record) | |
| except Exception: | |
| self.handleError(record) | |
| class ConditionalFormatter(logging.Formatter): | |
| def format(self, record): | |
| format_string = '%(asctime)s [%(levelname)s]' | |
| if getattr(record, 'endpoint', None): | |
| format_string += ' [%(endpoint)s]' | |
| if getattr(record, 'user_id', None): | |
| format_string += ' [%(user_id)s]' | |
| if getattr(record, 'duration', None): | |
| format_string += ' [Duration: %(duration).3fs]' | |
| format_string += ': %(message)s' | |
| self._style._fmt = format_string | |
| return super().format(record) | |
| # Add new filter class after existing filter classes | |
| class EndpointFilter(logging.Filter): | |
| def filter(self, record): | |
| return hasattr(record, 'endpoint') and record.endpoint.startswith('/') | |
| # Configure logging | |
| logging_config = { | |
| 'version': 1, | |
| 'disable_existing_loggers': False, | |
| 'formatters': { | |
| 'conditional': { | |
| '()': ConditionalFormatter, | |
| 'datefmt': '%Y-%m-%d %H:%M:%S', | |
| }, | |
| }, | |
| 'filters': { | |
| 'userfilter': { | |
| '()': UserFilter | |
| }, | |
| 'nouserfilter': { | |
| '()': NoUserFilter | |
| }, | |
| 'endpointfilter': { | |
| '()': EndpointFilter | |
| } | |
| }, | |
| 'handlers': { | |
| 'default': { | |
| 'level': 'INFO', | |
| 'formatter': 'conditional', | |
| 'class': 'logging.StreamHandler', | |
| 'stream': sys.stdout, # Use stdout instead of stderr | |
| 'filters': ['nouserfilter'] | |
| }, | |
| 'file': { | |
| 'level': 'INFO', | |
| 'formatter': 'conditional', | |
| 'class': 'logging.handlers.RotatingFileHandler', | |
| 'filename': 'logs/app.log', | |
| 'maxBytes': 10485760, # 10MB | |
| 'backupCount': 5, | |
| 'encoding': 'utf-8', # Add UTF-8 encoding | |
| 'filters': ['endpointfilter'] # Only log endpoints | |
| }, | |
| 'userfile': { | |
| 'level': 'INFO', | |
| 'formatter': 'conditional', | |
| '()': UserLogHandler, # Changed from 'class' to '()' | |
| 'base_path': 'logs/users', | |
| 'maxBytes': 10485760, | |
| 'backupCount': 3, | |
| 'filters': ['userfilter'] | |
| } | |
| }, | |
| 'loggers': { | |
| '': { # root logger | |
| 'handlers': ['default', 'file', 'userfile'], | |
| 'level': 'INFO', | |
| 'propagate': True | |
| } | |
| } | |
| } | |
| logging.config.dictConfig(logging_config) | |
| logger = logging.getLogger(__name__) | |
| # Suppress verbose logs from external libraries | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| # Request logging middleware | |
| class LoggingMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request: Request, call_next: Callable): | |
| start_time = time.time() | |
| endpoint = request.url.path | |
| user_id = None | |
| if "user_id" in request.query_params: | |
| user_id = request.query_params["user_id"] | |
| elif request.method == "POST": | |
| try: | |
| body = await request.json() | |
| user_id = body.get("user_id") | |
| except: | |
| pass | |
| # Log start of request | |
| logger.info( | |
| "[start]: Request received", | |
| extra={ | |
| "user_id": user_id, | |
| "endpoint": endpoint, | |
| } | |
| ) | |
| try: | |
| response = await call_next(request) | |
| duration = time.time() - start_time | |
| # Log end of request with duration | |
| logger.info( | |
| f"Request completed with status {response.status_code}", | |
| extra={ | |
| "user_id": user_id, | |
| "endpoint": endpoint, | |
| "duration": duration | |
| } | |
| ) | |
| return response | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| logger.error( | |
| f"Request failed with error: {str(e)}", | |
| extra={ | |
| "user_id": user_id, | |
| "endpoint": endpoint, | |
| "duration": duration | |
| } | |
| ) | |
| raise | |
| # OpenAI Client | |
| GENERAL_ASSISTANT = os.getenv('OPENAI_GENERAL_ASSISTANT') | |
| # Initialize Logging (optional) | |
| # logging.basicConfig(filename='app.log', level=logging.INFO) | |
| # FastAPI App | |
| app = FastAPI(title="Ourcoach AI API", description="A FastAPI app for ourcoach's chatbot", version="0.1.0") | |
| app.add_middleware(LoggingMiddleware) | |
| # Pydantic Models | |
| class CreateUserItem(BaseModel): | |
| user_id: str | |
| class ChatItem(BaseModel): | |
| user_id: str | |
| message: str | |
| class ChangeDateItem(BaseModel): | |
| user_id: str | |
| date: str | |
| class GGItem(BaseModel): | |
| gg_session_id: str | |
| user_id: str | |
| class ErrorResponse(BaseModel): | |
| status: str = "error" | |
| code: int | |
| message: str | |
| timestamp: datetime = datetime.now() | |
| def ok_endpoint(): | |
| print_log("INFO", "health check endpoint") | |
| logger.info("Health check endpoint called", extra={"endpoint": "/ok"}) | |
| return {"message": "ok"} | |
| def set_intro_done(user_id: str, api_key: str = Security(get_api_key)): | |
| user = get_user(user_id) | |
| user.set_intro_done() | |
| logger.info("Intro done", extra={"user_id": user_id, "endpoint": "/set_intro_done"}) | |
| return {"response": "ok"} | |
| def set_goal(user_id: str, goal: str, api_key: str = Security(get_api_key)): | |
| user = get_user(user_id) | |
| user.set_goal(goal) | |
| logger.info(f"Goal set: {goal}", extra={"user_id": user_id, "endpoint": "/set_goal"}) | |
| return {"response": "ok"} | |
| def do_micro(request: ChangeDateItem, day: int, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "do_micro endpoint") | |
| logger.info("do_micro endpoint called", extra={"endpoint": "/do_micro"}) | |
| # get user | |
| user = get_user(request.user_id) | |
| try: | |
| response = user.do_micro(request.date, day) | |
| except openai.BadRequestError: | |
| # Check if there is an active run for the thread id | |
| recent_run = user.get_recent_run() | |
| print_log("INFO",f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.info(f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| # If there is an active run, cancel it and resubmit the previous message | |
| if recent_run: | |
| user.cancel_run(recent_run) | |
| response = user.send_message(user.get_recent_message()) | |
| print_log("INFO",f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.info(f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| return {"response": response} | |
| # endpoint to change user assistant using user.change_to_latest_assistant() | |
| def change_assistant(user_id: str, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "Changing assistant", extra={"user_id": user_id, "endpoint": "/change_assistant"}) | |
| logger.info("Changing assistant", extra={"user_id": user_id, "endpoint": "/change_assistant"}) | |
| user = get_user(user_id) | |
| assistant_id = user.change_to_latest_assistant() | |
| logger.info(f"Assistant changed to {assistant_id}", extra={"user_id": user_id, "endpoint": "/change_assistant"}) | |
| return {"assistant_id": assistant_id} | |
| def get_user_by_id(user_id: str, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "Getting user", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| logger.info("Getting user", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| try: | |
| user = get_user(user_id) | |
| print_log("INFO", "Successfully retrieved user", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| logger.info("Successfully retrieved user", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| api_response = {"user": str(user), "user_messages": user.get_messages(), "general_assistant": user.conversations.assistants['general'].id, "intro_assistant": user.conversations.assistants['intro'].id} | |
| if user.goal: | |
| api_response["goal"] = user.goal | |
| else: | |
| api_response["goal"] = ["Goal is not set yet"] | |
| api_response["current_day"] = user.growth_plan.current()['day'] | |
| return api_response | |
| except LookupError: | |
| print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user"}, exc_info=True) | |
| logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def life_score_by_id(user_id: str, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "Getting user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| logger.info("Getting user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| try: | |
| life_score = get_life_score(user_id) | |
| print_log("INFO", "Successfully retrieved user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| logger.info("Successfully retrieved user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| return life_score | |
| except LookupError: | |
| print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}, exc_info=True) | |
| logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def add_ai_message(request: ChatItem, api_key: str = Security(get_api_key)): | |
| user_id = request.user_id | |
| message = request.message | |
| logger.info("Adding AI response", extra={"user_id": user_id, "endpoint": "/add_ai_message"}) | |
| print_log("INFO", "Adding AI response", extra={"user_id": user_id, "endpoint": "/add_ai_message"}) | |
| try: | |
| user = get_user(user_id) | |
| user.add_ai_message(message) | |
| user.save_user() | |
| update_user(user) | |
| print_log("INFO", "AI response added", extra={"user_id": user_id, "endpoint": "/add_ai_message"}) | |
| return {"response": "ok"} | |
| except LookupError: | |
| print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/add_ai_message"}) | |
| logger.error("User not found", extra={"user_id": user_id, "endpoint": "/add_ai_message"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error adding AI response: {str(e)}", extra={"user_id": user_id, "endpoint": "/add_ai_message"}, exc_info=True) | |
| logger.error(f"Error adding AI response: {str(e)}", extra={"user_id": user_id, "endpoint": "/add_ai_message"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def process_gg_session(request: GGItem, api_key: str = Security(get_api_key)): | |
| session_id = request.gg_session_id | |
| user_id = request.user_id | |
| logger.info(f"Processing GG session: {session_id}", extra={"user_id": user_id, "endpoint": "/process_gg_session"}) | |
| print_log("INFO", f"Processing GG session: {session_id}", extra={"user_id": user_id, "endpoint": "/process_gg_session"}) | |
| # get user | |
| user = get_user(user_id) | |
| # get the session_data | |
| session_data = get_growth_guide_session(user_id, session_id) | |
| # update user | |
| response = user.process_growth_guide_session(session_data) | |
| logger.info(f"GG session processed: {session_id}, response: {response}", extra={"user_id": user_id, "endpoint": "/process_gg_session"}) | |
| return {"response": response} | |
| def get_daily_message(user_id: str, api_key: str = Security(get_api_key)): | |
| logger.info("Getting daily messages", extra={"user_id": user_id, "endpoint": "/user_daily_messages"}) | |
| user = get_user(user_id) | |
| daily_messages = user.get_daily_messages() | |
| return {"response": daily_messages} | |
| def refresh_multiple_users(user_ids: List[str], api_key: str = Security(get_api_key)): | |
| logger.info("Refreshing multiple users", extra={"endpoint": "/batch_refresh_users"}) | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| failed_users = [] | |
| for i,user_id in enumerate(user_ids): | |
| try: | |
| old_user = get_user(user_id) | |
| user = old_user.refresh(client) | |
| user.save_user() | |
| update_user(user) | |
| logger.info(f"Successfully refreshed user {i+1}/{len(user_ids)}", extra={"user_id": user_id, "endpoint": "/batch_refresh_users"}) | |
| except Exception as e: | |
| logger.error(f"Failed to refresh user: {str(e)}", extra={"user_id": user_id, "endpoint": "/batch_refresh_users"}) | |
| failed_users.append(user_id) | |
| if failed_users: | |
| return {"status": "partial", "failed_users": failed_users} | |
| return {"status": "success", "failed_users": []} | |
| def refresh_user(request: CreateUserItem, api_key: str = Security(get_api_key)): | |
| print_log("INFO","Refreshing user", extra={"user_id": request.user_id, "endpoint": "/refresh_user"}) | |
| logger.info("Refreshing user", extra={"user_id": request.user_id, "endpoint": "/refresh_user"}) | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| old_user = get_user(request.user_id) | |
| user = old_user.refresh(client) | |
| user.save_user() | |
| update_user(user) | |
| print_log("INFO","User refreshed", extra={"user_id": request.user_id, "endpoint": "/refresh_user"}) | |
| logger.info(f"User refreshed -> {user}", extra={"user_id": request.user_id, "endpoint": "/refresh_user"}) | |
| return {"response": "ok"} | |
| def create_user(request: CreateUserItem, api_key: str = Security(get_api_key)): | |
| print_log("INFO","Creating new user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.info("Creating new user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| try: | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| # check if user exists by looking for pickle file in users/data | |
| if os.path.exists(f'users/data/{request.user_id}.pkl'): | |
| print_log("INFO",f"User already exists: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.info(f"User already exists: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| return {"message": f"[OK] User already exists: {request.user_id}"} | |
| user_info, _ = get_user_info(request.user_id) | |
| if not user_info: | |
| print_log("ERROR",f"Could not fetch user information from DB {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.error(f"Could not fetch user information from DB {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Could not fetch user information from DB" | |
| ) | |
| user = User(request.user_id, user_info, client, GENERAL_ASSISTANT) | |
| save = user.save_user() | |
| if save: | |
| print_log("INFO",f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.info(f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| else: | |
| print_log("ERROR",f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.error(f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create user pickle file" | |
| ) | |
| # create memento folder for user | |
| folder_path = os.path.join("mementos", "to_upload", request.user_id) | |
| # create folder if not exists | |
| os.makedirs(folder_path, exist_ok=True) | |
| print_log("INFO",f"Created temp memento folder for user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.info(f"Created temp memento folder for user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| # upload user pickle file to s3 bucket | |
| try: | |
| # value.save_user() | |
| pop_cache(request.user_id) | |
| upload = True | |
| except: | |
| upload = False | |
| if upload == True: | |
| print_log("INFO",f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.info(f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| return {"message": f"[OK] User created: {user.user_id}"} | |
| else: | |
| print_log("ERROR",f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| logger.error(f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to upload user pickle to S3" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Failed to create user: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/create_user"}, exc_info=True) | |
| logger.error(f"Failed to create user: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/create_user"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def chat(request: ChatItem, api_key: str = Security(get_api_key)): | |
| print_log("INFO","Processing chat request", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.info("Processing chat request", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| try: | |
| # get user | |
| user = get_user(request.user_id) | |
| try: | |
| response = user.send_message(request.message) | |
| except openai.BadRequestError as e: | |
| print(e) | |
| # Check if there is an active run for the thread id | |
| recent_run = user.get_recent_run() | |
| print_log("INFO",f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.info(f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| # If there is an active run, cancel it and resubmit the previous message | |
| if recent_run: | |
| user.cancel_run(recent_run) | |
| response = user.send_message(user.get_recent_message()) | |
| print_log("INFO",f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.info(f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| return {"response": response} | |
| except LookupError: | |
| print_log("ERROR",f"User not found for chat: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| logger.error(f"User not found for chat: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {request.user_id} not found" | |
| ) | |
| except ReferenceError: | |
| logger.warning(f"User pickle creation still ongoing for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| print_log("WARNING",f"User pickle creation still ongoing for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="User pickle creation still ongoing" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Chat error for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/chat"}, exc_info=True) | |
| logger.error(f"Chat error for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/chat"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def change_date(request: ChangeDateItem, api_key: str = Security(get_api_key)): | |
| print_log("INFO",f"Processing date change request, new date: {request.date}", | |
| extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| logger.info(f"Processing date change request, new date: {request.date}", | |
| extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| try: | |
| user_id = request.user_id | |
| user = get_user(user_id) | |
| logger.info(f"User: {user}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| # infer follow_up dates | |
| user.infer_memento_follow_ups() | |
| # Push users mementos to DB | |
| try: | |
| upload = upload_mementos_to_db(user_id) | |
| if upload: | |
| print_log("INFO",f"Uploaded mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.info(f"Uploaded mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| else: | |
| print_log("ERROR",f"Failed to upload mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.error(f"Failed to upload mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to upload mementos to DB for user: {user_id}" | |
| ) | |
| except ConnectionError as e: | |
| print_log("ERROR",f"Failed to connect to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.error(f"Failed to connect to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to connect to DB for user: {user_id}" | |
| ) | |
| response = user.change_date(request.date) | |
| response['user_id'] = user_id | |
| print_log("INFO",f"Date changed successfully for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.info(f"Date changed successfully for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| print_log("DEBUG",f"Change date response: {response}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.debug(f"Change date response: {response}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| # Update user | |
| user.save_user() | |
| update = update_user(user) | |
| if not update: | |
| print_log("ERROR",f"Failed to update user pickle in S3: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| logger.error(f"Failed to update user pickle in S3: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to update user pickle in S3 {user_id}" | |
| ) | |
| return {"response": response} | |
| except ValueError as e: | |
| print_log("ERROR",f"Invalid date format for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| logger.error(f"Invalid date format for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except LookupError: | |
| print_log("ERROR",f"User not found for date change: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| logger.error(f"User not found for date change: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/change_date"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {request.user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error changing date for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}, exc_info=True) | |
| logger.error(f"Error changing date for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def reset_user_messages(request: CreateUserItem, api_key: str = Security(get_api_key)): | |
| print_log("INFO","Resetting messages", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| logger.info("Resetting messages", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| try: | |
| user = get_user(request.user_id) | |
| user.reset_conversations() | |
| print_log("INFO",f"Successfully reset messages for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| logger.info(f"Successfully reset messages for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| user.save_user() | |
| update_user(user) | |
| print_log("INFO",f"Successfully updated user pickle: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| logger.info(f"Successfully updated user pickle: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| return {"response": "ok"} | |
| except LookupError: | |
| print_log("ERROR",f"User not found for reset: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| logger.error(f"User not found for reset: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {request.user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error resetting user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}, exc_info=True) | |
| logger.error(f"Error resetting user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def get_logs(user_id: str = Query(default="", description="User ID to fetch logs for")): | |
| if (user_id): | |
| log_file_path = os.path.join('logs', 'users', f'{user_id}.log') | |
| if not os.path.exists(log_file_path): | |
| print_log("INFO",f"Log file not found for user: {user_id}", extra={"user_id": user_id, "endpoint": "/get_logs"}) | |
| logger.error(f"Log file not found for user: {user_id}", extra={"user_id": user_id, "endpoint": "/get_logs"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Log file for user {user_id} not found" | |
| ) | |
| else: | |
| log_file_path = 'logs/app.log' | |
| def file_iterator(): | |
| with open(log_file_path, 'rb') as f: | |
| while chunk := f.read(8192): | |
| yield chunk | |
| return StreamingResponse( | |
| file_iterator(), | |
| media_type='text/plain', | |
| headers={'Content-Disposition': f'attachment; filename="{os.path.basename(log_file_path)}"'} | |
| ) | |
| def is_user_responsive(user_id: str, api_key: str = Security(get_api_key)): | |
| logger.info("Checking if user is responsive", extra={"user_id": user_id, "endpoint": "/is_user_responsive"}) | |
| try: | |
| user = get_user(user_id) | |
| messages = user.get_messages() | |
| if len(messages) >= 3 and messages[-1]['role'] == 'assistant' and messages[-2]['role'] == 'assistant': | |
| return {"response": False} | |
| else: | |
| return {"response": True} | |
| except LookupError: | |
| logger.error(f"User not found: {user_id}", extra={"user_id": user_id, "endpoint": "/is_user_responsive"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error checking user responsiveness: {str(e)}", extra={"user_id": user_id, "endpoint": "/is_user_responsive"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def get_summary_by_id(user_id: str, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "Getting user's summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| logger.info("Getting user's summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| try: | |
| user_summary = get_user_summary(user_id) | |
| print_log("INFO", "Successfully generated summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| logger.info("Successfully generated summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| return user_summary | |
| except LookupError: | |
| print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user_summary"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_summary"}, exc_info=True) | |
| logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_summary"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def get_life_status_by_id(user_id: str, api_key: str = Security(get_api_key)): | |
| print_log("INFO", "Getting user's life status", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| logger.info("Getting user's life status", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| try: | |
| life_status = get_user_life_status(user_id) | |
| print_log("INFO", "Successfully generated life status", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| logger.info("Successfully generated life status", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| return life_status | |
| except LookupError: | |
| print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_life_status"}) | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"User with ID {user_id} not found" | |
| ) | |
| except Exception as e: | |
| print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_life_status"}, exc_info=True) | |
| logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_life_status"}, exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| def add_booking_point_by_user(user_id: str, api_key: str = Security(get_api_key)): | |
| user = get_user(user_id) | |
| user.add_point_for_booking() | |
| return {"response": "ok"} | |
| def add_session_completion_point_by_user(user_id: str, api_key: str = Security(get_api_key)): | |
| user = get_user(user_id) | |
| user.add_point_for_completing_session() | |
| return {"response": "ok"} |