fastapi-v2 / app /main.py
BMCVRN's picture
[FEATURE] Add Logic to Calculate and Get Life Score (#1)
e2224db verified
raw
history blame
37.2 kB
from fastapi import FastAPI, HTTPException, Security, Query, status, Request
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.security import APIKeyHeader
import openai
from pydantic import BaseModel
from uuid import UUID
import os
import logging
import json
import regex as re
from datetime import datetime
from app.user import User
from typing import List, Optional, Callable
from openai import OpenAI
import psycopg2
from psycopg2 import sql
import os
from app.utils import get_api_key, get_user_info, get_growth_guide_session, pop_cache, print_log, update_user, upload_file_to_s3, get_user, upload_mementos_to_db, get_user_summary, get_user_life_status, get_life_score
from dotenv import load_dotenv
import logging.config
import time
from starlette.middleware.base import BaseHTTPMiddleware
import sys
load_dotenv()
# Create required folders
os.makedirs('logs', exist_ok=True)
os.makedirs(os.path.join('logs', 'users'), exist_ok=True)
if not os.path.exists(os.path.join('users', 'data')):
os.makedirs(os.path.join('users', 'data'))
else:
# Folder exists, we want to clear all current user data
for file in os.listdir(os.path.join('users', 'data')):
os.remove(os.path.join('users', 'data', file))
if not os.path.exists(os.path.join('users', 'to_upload')):
os.makedirs(os.path.join('users', 'to_upload'))
if not os.path.exists(os.path.join('mementos', 'to_upload')):
os.makedirs(os.path.join('mementos', 'to_upload'))
# Custom filter for user-specific logs
class UserFilter(logging.Filter):
def filter(self, record):
return hasattr(record, 'user_id') and record.user_id != "no-user"
class NoUserFilter(logging.Filter):
def filter(self, record):
return not (hasattr(record, 'user_id') and record.user_id != "no-user")
class UserLogHandler(logging.Handler):
def __init__(self, **kwargs):
super().__init__()
self.base_path = kwargs.get('base_path', 'logs/users')
self.maxBytes = kwargs.get('maxBytes', 10485760)
self.backupCount = kwargs.get('backupCount', 3)
self.handlers = {}
# Ensure base path exists
os.makedirs(self.base_path, exist_ok=True)
def emit(self, record):
if hasattr(record, 'user_id') and record.user_id != "no-user":
# Remove brackets from filename
if record.user_id:
user_id = record.user_id.strip('[]').strip()
else:
user_id = "no-user"
if user_id not in self.handlers:
handler = logging.handlers.RotatingFileHandler(
filename=os.path.join(self.base_path, f'{user_id}.log'),
maxBytes=self.maxBytes,
backupCount=self.maxBytes,
encoding='utf-8'
)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] [%(endpoint)s] [%(user_id)s]: %(message)s')
handler.setFormatter(formatter)
self.handlers[user_id] = handler
try:
self.handlers[user_id].emit(record)
except Exception:
self.handleError(record)
class ConditionalFormatter(logging.Formatter):
def format(self, record):
format_string = '%(asctime)s [%(levelname)s]'
if getattr(record, 'endpoint', None):
format_string += ' [%(endpoint)s]'
if getattr(record, 'user_id', None):
format_string += ' [%(user_id)s]'
if getattr(record, 'duration', None):
format_string += ' [Duration: %(duration).3fs]'
format_string += ': %(message)s'
self._style._fmt = format_string
return super().format(record)
# Add new filter class after existing filter classes
class EndpointFilter(logging.Filter):
def filter(self, record):
return hasattr(record, 'endpoint') and record.endpoint.startswith('/')
# Configure logging
logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'conditional': {
'()': ConditionalFormatter,
'datefmt': '%Y-%m-%d %H:%M:%S',
},
},
'filters': {
'userfilter': {
'()': UserFilter
},
'nouserfilter': {
'()': NoUserFilter
},
'endpointfilter': {
'()': EndpointFilter
}
},
'handlers': {
'default': {
'level': 'INFO',
'formatter': 'conditional',
'class': 'logging.StreamHandler',
'stream': sys.stdout, # Use stdout instead of stderr
'filters': ['nouserfilter']
},
'file': {
'level': 'INFO',
'formatter': 'conditional',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/app.log',
'maxBytes': 10485760, # 10MB
'backupCount': 5,
'encoding': 'utf-8', # Add UTF-8 encoding
'filters': ['endpointfilter'] # Only log endpoints
},
'userfile': {
'level': 'INFO',
'formatter': 'conditional',
'()': UserLogHandler, # Changed from 'class' to '()'
'base_path': 'logs/users',
'maxBytes': 10485760,
'backupCount': 3,
'filters': ['userfilter']
}
},
'loggers': {
'': { # root logger
'handlers': ['default', 'file', 'userfile'],
'level': 'INFO',
'propagate': True
}
}
}
logging.config.dictConfig(logging_config)
logger = logging.getLogger(__name__)
# Suppress verbose logs from external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Request logging middleware
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable):
start_time = time.time()
endpoint = request.url.path
user_id = None
if "user_id" in request.query_params:
user_id = request.query_params["user_id"]
elif request.method == "POST":
try:
body = await request.json()
user_id = body.get("user_id")
except:
pass
# Log start of request
logger.info(
"[start]: Request received",
extra={
"user_id": user_id,
"endpoint": endpoint,
}
)
try:
response = await call_next(request)
duration = time.time() - start_time
# Log end of request with duration
logger.info(
f"Request completed with status {response.status_code}",
extra={
"user_id": user_id,
"endpoint": endpoint,
"duration": duration
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed with error: {str(e)}",
extra={
"user_id": user_id,
"endpoint": endpoint,
"duration": duration
}
)
raise
# OpenAI Client
GENERAL_ASSISTANT = os.getenv('OPENAI_GENERAL_ASSISTANT')
# Initialize Logging (optional)
# logging.basicConfig(filename='app.log', level=logging.INFO)
# FastAPI App
app = FastAPI(title="Ourcoach AI API", description="A FastAPI app for ourcoach's chatbot", version="0.1.0")
app.add_middleware(LoggingMiddleware)
# Pydantic Models
class CreateUserItem(BaseModel):
user_id: str
class ChatItem(BaseModel):
user_id: str
message: str
class ChangeDateItem(BaseModel):
user_id: str
date: str
class GGItem(BaseModel):
gg_session_id: str
user_id: str
class ErrorResponse(BaseModel):
status: str = "error"
code: int
message: str
timestamp: datetime = datetime.now()
@app.get("/ok")
def ok_endpoint():
print_log("INFO", "health check endpoint")
logger.info("Health check endpoint called", extra={"endpoint": "/ok"})
return {"message": "ok"}
@app.post("/set_intro_done")
def set_intro_done(user_id: str, api_key: str = Security(get_api_key)):
user = get_user(user_id)
user.set_intro_done()
logger.info("Intro done", extra={"user_id": user_id, "endpoint": "/set_intro_done"})
return {"response": "ok"}
@app.post("/set_goal")
def set_goal(user_id: str, goal: str, api_key: str = Security(get_api_key)):
user = get_user(user_id)
user.set_goal(goal)
logger.info(f"Goal set: {goal}", extra={"user_id": user_id, "endpoint": "/set_goal"})
return {"response": "ok"}
@app.post("/do_micro")
def do_micro(request: ChangeDateItem, day: int, api_key: str = Security(get_api_key)):
print_log("INFO", "do_micro endpoint")
logger.info("do_micro endpoint called", extra={"endpoint": "/do_micro"})
# get user
user = get_user(request.user_id)
try:
response = user.do_micro(request.date, day)
except openai.BadRequestError:
# Check if there is an active run for the thread id
recent_run = user.get_recent_run()
print_log("INFO",f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.info(f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"})
# If there is an active run, cancel it and resubmit the previous message
if recent_run:
user.cancel_run(recent_run)
response = user.send_message(user.get_recent_message())
print_log("INFO",f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.info(f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"})
return {"response": response}
# endpoint to change user assistant using user.change_to_latest_assistant()
@app.get("/change_assistant")
def change_assistant(user_id: str, api_key: str = Security(get_api_key)):
print_log("INFO", "Changing assistant", extra={"user_id": user_id, "endpoint": "/change_assistant"})
logger.info("Changing assistant", extra={"user_id": user_id, "endpoint": "/change_assistant"})
user = get_user(user_id)
assistant_id = user.change_to_latest_assistant()
logger.info(f"Assistant changed to {assistant_id}", extra={"user_id": user_id, "endpoint": "/change_assistant"})
return {"assistant_id": assistant_id}
@app.get("/get_user")
def get_user_by_id(user_id: str, api_key: str = Security(get_api_key)):
print_log("INFO", "Getting user", extra={"user_id": user_id, "endpoint": "/get_user"})
logger.info("Getting user", extra={"user_id": user_id, "endpoint": "/get_user"})
try:
user = get_user(user_id)
print_log("INFO", "Successfully retrieved user", extra={"user_id": user_id, "endpoint": "/get_user"})
logger.info("Successfully retrieved user", extra={"user_id": user_id, "endpoint": "/get_user"})
api_response = {"user": str(user), "user_messages": user.get_messages(), "general_assistant": user.conversations.assistants['general'].id, "intro_assistant": user.conversations.assistants['intro'].id}
if user.goal:
api_response["goal"] = user.goal
else:
api_response["goal"] = ["Goal is not set yet"]
api_response["current_day"] = user.growth_plan.current()['day']
return api_response
except LookupError:
print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user"})
logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user"}, exc_info=True)
logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.get("/get_user_life_score")
def life_score_by_id(user_id: str, api_key: str = Security(get_api_key)):
print_log("INFO", "Getting user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
logger.info("Getting user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
try:
life_score = get_life_score(user_id)
print_log("INFO", "Successfully retrieved user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
logger.info("Successfully retrieved user life score", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
return life_score
except LookupError:
print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user_life_score"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}, exc_info=True)
logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_life_score"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/add_ai_message")
def add_ai_message(request: ChatItem, api_key: str = Security(get_api_key)):
user_id = request.user_id
message = request.message
logger.info("Adding AI response", extra={"user_id": user_id, "endpoint": "/add_ai_message"})
print_log("INFO", "Adding AI response", extra={"user_id": user_id, "endpoint": "/add_ai_message"})
try:
user = get_user(user_id)
user.add_ai_message(message)
user.save_user()
update_user(user)
print_log("INFO", "AI response added", extra={"user_id": user_id, "endpoint": "/add_ai_message"})
return {"response": "ok"}
except LookupError:
print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/add_ai_message"})
logger.error("User not found", extra={"user_id": user_id, "endpoint": "/add_ai_message"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error adding AI response: {str(e)}", extra={"user_id": user_id, "endpoint": "/add_ai_message"}, exc_info=True)
logger.error(f"Error adding AI response: {str(e)}", extra={"user_id": user_id, "endpoint": "/add_ai_message"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/process_gg_session")
def process_gg_session(request: GGItem, api_key: str = Security(get_api_key)):
session_id = request.gg_session_id
user_id = request.user_id
logger.info(f"Processing GG session: {session_id}", extra={"user_id": user_id, "endpoint": "/process_gg_session"})
print_log("INFO", f"Processing GG session: {session_id}", extra={"user_id": user_id, "endpoint": "/process_gg_session"})
# get user
user = get_user(user_id)
# get the session_data
session_data = get_growth_guide_session(user_id, session_id)
# update user
response = user.process_growth_guide_session(session_data)
logger.info(f"GG session processed: {session_id}, response: {response}", extra={"user_id": user_id, "endpoint": "/process_gg_session"})
return {"response": response}
@app.get("/user_daily_messages")
def get_daily_message(user_id: str, api_key: str = Security(get_api_key)):
logger.info("Getting daily messages", extra={"user_id": user_id, "endpoint": "/user_daily_messages"})
user = get_user(user_id)
daily_messages = user.get_daily_messages()
return {"response": daily_messages}
@app.post("/batch_refresh_users")
def refresh_multiple_users(user_ids: List[str], api_key: str = Security(get_api_key)):
logger.info("Refreshing multiple users", extra={"endpoint": "/batch_refresh_users"})
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
failed_users = []
for i,user_id in enumerate(user_ids):
try:
old_user = get_user(user_id)
user = old_user.refresh(client)
user.save_user()
update_user(user)
logger.info(f"Successfully refreshed user {i+1}/{len(user_ids)}", extra={"user_id": user_id, "endpoint": "/batch_refresh_users"})
except Exception as e:
logger.error(f"Failed to refresh user: {str(e)}", extra={"user_id": user_id, "endpoint": "/batch_refresh_users"})
failed_users.append(user_id)
if failed_users:
return {"status": "partial", "failed_users": failed_users}
return {"status": "success", "failed_users": []}
@app.post("/refresh_user")
def refresh_user(request: CreateUserItem, api_key: str = Security(get_api_key)):
print_log("INFO","Refreshing user", extra={"user_id": request.user_id, "endpoint": "/refresh_user"})
logger.info("Refreshing user", extra={"user_id": request.user_id, "endpoint": "/refresh_user"})
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
old_user = get_user(request.user_id)
user = old_user.refresh(client)
user.save_user()
update_user(user)
print_log("INFO","User refreshed", extra={"user_id": request.user_id, "endpoint": "/refresh_user"})
logger.info(f"User refreshed -> {user}", extra={"user_id": request.user_id, "endpoint": "/refresh_user"})
return {"response": "ok"}
@app.post("/create_user")
def create_user(request: CreateUserItem, api_key: str = Security(get_api_key)):
print_log("INFO","Creating new user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.info("Creating new user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
try:
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
# check if user exists by looking for pickle file in users/data
if os.path.exists(f'users/data/{request.user_id}.pkl'):
print_log("INFO",f"User already exists: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.info(f"User already exists: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"})
return {"message": f"[OK] User already exists: {request.user_id}"}
user_info, _ = get_user_info(request.user_id)
if not user_info:
print_log("ERROR",f"Could not fetch user information from DB {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.error(f"Could not fetch user information from DB {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/create_user"})
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Could not fetch user information from DB"
)
user = User(request.user_id, user_info, client, GENERAL_ASSISTANT)
save = user.save_user()
if save:
print_log("INFO",f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.info(f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
else:
print_log("ERROR",f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.error(f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user pickle file"
)
# create memento folder for user
folder_path = os.path.join("mementos", "to_upload", request.user_id)
# create folder if not exists
os.makedirs(folder_path, exist_ok=True)
print_log("INFO",f"Created temp memento folder for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.info(f"Created temp memento folder for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
# upload user pickle file to s3 bucket
try:
# value.save_user()
pop_cache(request.user_id)
upload = True
except:
upload = False
if upload == True:
print_log("INFO",f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.info(f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
return {"message": f"[OK] User created: {user.user_id}"}
else:
print_log("ERROR",f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
logger.error(f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to upload user pickle to S3"
)
except Exception as e:
print_log("ERROR",f"Failed to create user: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/create_user"}, exc_info=True)
logger.error(f"Failed to create user: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/create_user"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/chat")
def chat(request: ChatItem, api_key: str = Security(get_api_key)):
print_log("INFO","Processing chat request", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.info("Processing chat request", extra={"user_id": request.user_id, "endpoint": "/chat"})
try:
# get user
user = get_user(request.user_id)
try:
response = user.send_message(request.message)
except openai.BadRequestError as e:
print(e)
# Check if there is an active run for the thread id
recent_run = user.get_recent_run()
print_log("INFO",f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.info(f"Recent run: {recent_run}", extra={"user_id": request.user_id, "endpoint": "/chat"})
# If there is an active run, cancel it and resubmit the previous message
if recent_run:
user.cancel_run(recent_run)
response = user.send_message(user.get_recent_message())
print_log("INFO",f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.info(f"Assistant: {response['content']}", extra={"user_id": request.user_id, "endpoint": "/chat"})
return {"response": response}
except LookupError:
print_log("ERROR",f"User not found for chat: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"})
logger.error(f"User not found for chat: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {request.user_id} not found"
)
except ReferenceError:
logger.warning(f"User pickle creation still ongoing for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"})
print_log("WARNING",f"User pickle creation still ongoing for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/chat"})
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User pickle creation still ongoing"
)
except Exception as e:
print_log("ERROR",f"Chat error for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/chat"}, exc_info=True)
logger.error(f"Chat error for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/chat"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/change_date")
def change_date(request: ChangeDateItem, api_key: str = Security(get_api_key)):
print_log("INFO",f"Processing date change request, new date: {request.date}",
extra={"user_id": request.user_id, "endpoint": "/change_date"})
logger.info(f"Processing date change request, new date: {request.date}",
extra={"user_id": request.user_id, "endpoint": "/change_date"})
try:
user_id = request.user_id
user = get_user(user_id)
logger.info(f"User: {user}", extra={"user_id": user_id, "endpoint": "/change_date"})
# infer follow_up dates
user.infer_memento_follow_ups()
# Push users mementos to DB
try:
upload = upload_mementos_to_db(user_id)
if upload:
print_log("INFO",f"Uploaded mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.info(f"Uploaded mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
else:
print_log("ERROR",f"Failed to upload mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.error(f"Failed to upload mementos to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to upload mementos to DB for user: {user_id}"
)
except ConnectionError as e:
print_log("ERROR",f"Failed to connect to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.error(f"Failed to connect to DB for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to connect to DB for user: {user_id}"
)
response = user.change_date(request.date)
response['user_id'] = user_id
print_log("INFO",f"Date changed successfully for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.info(f"Date changed successfully for user: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
print_log("DEBUG",f"Change date response: {response}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.debug(f"Change date response: {response}", extra={"user_id": user_id, "endpoint": "/change_date"})
# Update user
user.save_user()
update = update_user(user)
if not update:
print_log("ERROR",f"Failed to update user pickle in S3: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
logger.error(f"Failed to update user pickle in S3: {user_id}", extra={"user_id": user_id, "endpoint": "/change_date"})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update user pickle in S3 {user_id}"
)
return {"response": response}
except ValueError as e:
print_log("ERROR",f"Invalid date format for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"})
logger.error(f"Invalid date format for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"})
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except LookupError:
print_log("ERROR",f"User not found for date change: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/change_date"})
logger.error(f"User not found for date change: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/change_date"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {request.user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error changing date for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}, exc_info=True)
logger.error(f"Error changing date for user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/change_date"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/reset_user_messages")
def reset_user_messages(request: CreateUserItem, api_key: str = Security(get_api_key)):
print_log("INFO","Resetting messages", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
logger.info("Resetting messages", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
try:
user = get_user(request.user_id)
user.reset_conversations()
print_log("INFO",f"Successfully reset messages for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
logger.info(f"Successfully reset messages for user: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
user.save_user()
update_user(user)
print_log("INFO",f"Successfully updated user pickle: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
logger.info(f"Successfully updated user pickle: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
return {"response": "ok"}
except LookupError:
print_log("ERROR",f"User not found for reset: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
logger.error(f"User not found for reset: {request.user_id}", extra={"user_id": request.user_id, "endpoint": "/reset_user"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {request.user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error resetting user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}, exc_info=True)
logger.error(f"Error resetting user {request.user_id}: {str(e)}", extra={"user_id": request.user_id, "endpoint": "/reset_user"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.get("/get_logs")
def get_logs(user_id: str = Query(default="", description="User ID to fetch logs for")):
if (user_id):
log_file_path = os.path.join('logs', 'users', f'{user_id}.log')
if not os.path.exists(log_file_path):
print_log("INFO",f"Log file not found for user: {user_id}", extra={"user_id": user_id, "endpoint": "/get_logs"})
logger.error(f"Log file not found for user: {user_id}", extra={"user_id": user_id, "endpoint": "/get_logs"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Log file for user {user_id} not found"
)
else:
log_file_path = 'logs/app.log'
def file_iterator():
with open(log_file_path, 'rb') as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(
file_iterator(),
media_type='text/plain',
headers={'Content-Disposition': f'attachment; filename="{os.path.basename(log_file_path)}"'}
)
@app.get("/is_user_responsive")
def is_user_responsive(user_id: str, api_key: str = Security(get_api_key)):
logger.info("Checking if user is responsive", extra={"user_id": user_id, "endpoint": "/is_user_responsive"})
try:
user = get_user(user_id)
messages = user.get_messages()
if len(messages) >= 3 and messages[-1]['role'] == 'assistant' and messages[-2]['role'] == 'assistant':
return {"response": False}
else:
return {"response": True}
except LookupError:
logger.error(f"User not found: {user_id}", extra={"user_id": user_id, "endpoint": "/is_user_responsive"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
logger.error(f"Error checking user responsiveness: {str(e)}", extra={"user_id": user_id, "endpoint": "/is_user_responsive"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.get("/get_user_summary")
def get_summary_by_id(user_id: str, api_key: str = Security(get_api_key)):
print_log("INFO", "Getting user's summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
logger.info("Getting user's summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
try:
user_summary = get_user_summary(user_id)
print_log("INFO", "Successfully generated summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
logger.info("Successfully generated summary", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
return user_summary
except LookupError:
print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_user_summary"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_summary"}, exc_info=True)
logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_user_summary"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.get("/get_life_status")
def get_life_status_by_id(user_id: str, api_key: str = Security(get_api_key)):
print_log("INFO", "Getting user's life status", extra={"user_id": user_id, "endpoint": "/get_life_status"})
logger.info("Getting user's life status", extra={"user_id": user_id, "endpoint": "/get_life_status"})
try:
life_status = get_user_life_status(user_id)
print_log("INFO", "Successfully generated life status", extra={"user_id": user_id, "endpoint": "/get_life_status"})
logger.info("Successfully generated life status", extra={"user_id": user_id, "endpoint": "/get_life_status"})
return life_status
except LookupError:
print_log("ERROR", "User not found", extra={"user_id": user_id, "endpoint": "/get_life_status"})
logger.error("User not found", extra={"user_id": user_id, "endpoint": "/get_life_status"})
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
except Exception as e:
print_log("ERROR",f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_life_status"}, exc_info=True)
logger.error(f"Error getting user: {str(e)}", extra={"user_id": user_id, "endpoint": "/get_life_status"}, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@app.post("/add_booking_point")
def add_booking_point_by_user(user_id: str, api_key: str = Security(get_api_key)):
user = get_user(user_id)
user.add_point_for_booking()
return {"response": "ok"}
@app.post("/add_session_completion_point")
def add_session_completion_point_by_user(user_id: str, api_key: str = Security(get_api_key)):
user = get_user(user_id)
user.add_point_for_completing_session()
return {"response": "ok"}