| from fastapi import FastAPI, HTTPException, Query as QueryParam, Request |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| from typing import Optional, List, Dict, Any |
| import os |
| import json |
| import redis |
| import uuid |
| from datetime import datetime |
| from dotenv import load_dotenv |
| from fastapi import APIRouter |
|
|
| load_dotenv() |
|
|
|
|
|
|
| |
| Redis_session_router_old = APIRouter(prefix="/Redis", tags=["Redis_Management_old"]) |
|
|
| |
|
|
| |
| REDIS_URL = os.getenv("REDIS_URL") |
| REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") |
| REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) |
| REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") |
|
|
| |
|
|
| def get_redis_client(): |
| """Initialize Redis client with fallback to local Redis""" |
| try: |
| if REDIS_URL: |
| |
| redis_client = redis.from_url( |
| REDIS_URL, |
| decode_responses=True, |
| socket_connect_timeout=5, |
| socket_timeout=5 |
| ) |
| |
| redis_client.ping() |
| print(f"β
Connected to deployed Redis: {REDIS_URL}") |
| return redis_client |
| else: |
| |
| redis_client = redis.StrictRedis( |
| host=REDIS_HOST, |
| port=REDIS_PORT, |
| password=REDIS_PASSWORD, |
| decode_responses=True, |
| socket_connect_timeout=5, |
| socket_timeout=5 |
| ) |
| |
| redis_client.ping() |
| print(f"β
Connected to local Redis: {REDIS_HOST}:{REDIS_PORT}") |
| return redis_client |
| except Exception as e: |
| print(f"β Redis connection failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Redis connection failed: {str(e)}") |
|
|
| |
| |
|
|
| |
|
|
| class SessionResponse(BaseModel): |
| session_id: str |
| userLoginId: int |
| orgId: int |
| created_at: str |
| status: str |
| title: Optional[str] = "New Chat" |
|
|
| class MessageResponse(BaseModel): |
| message_id: str |
| session_id: str |
| role: str |
| message: str |
| timestamp: str |
|
|
| class ChatHistoryResponse(BaseModel): |
| session_id: str |
| messages: List[MessageResponse] |
| total_messages: int |
|
|
| class UpdateSessionTitleRequest(BaseModel): |
| new_title: str |
|
|
| class CreateSessionRequest(BaseModel): |
| userLoginId: int |
| orgId: int |
| auth_token: str |
|
|
| class AddMessageRequest(BaseModel): |
| session_id: str |
| role: str |
| message: str |
|
|
| |
|
|
| def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict: |
| """Create a new chat session""" |
| session_id = str(uuid.uuid4()) |
| session_data = { |
| "session_id": session_id, |
| "userLoginId": userLoginId, |
| "orgId": orgId, |
| "auth_token": auth_token, |
| "created_at": datetime.now().isoformat(), |
| "status": "active", |
| "title": "New Chat" |
| } |
| |
| |
| redis_client.setex( |
| f"session:{session_id}", |
| 86400, |
| json.dumps(session_data) |
| ) |
| |
| |
| redis_client.setex( |
| f"chat:{session_id}", |
| 86400, |
| json.dumps([]) |
| ) |
| |
| |
| redis_client.setex( |
| f"memory:{session_id}", |
| 86400, |
| json.dumps([]) |
| ) |
| |
| return session_data |
|
|
| def get_session(session_id: str) -> dict: |
| """Get session data from Redis""" |
| session_data = redis_client.get(f"session:{session_id}") |
| if not session_data: |
| raise HTTPException(status_code=404, detail="Session not found or expired") |
| return json.loads(session_data) |
|
|
| def add_message_to_session(session_id: str, role: str, message: str) -> str: |
| """Add message to session chat history""" |
| message_id = str(uuid.uuid4()) |
| message_data = { |
| "message_id": message_id, |
| "session_id": session_id, |
| "role": role, |
| "message": message, |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| chat_history = redis_client.get(f"chat:{session_id}") |
| if chat_history: |
| messages = json.loads(chat_history) |
| else: |
| messages = [] |
| |
| |
| messages.Redis_session_routerend(message_data) |
| |
| |
| redis_client.setex( |
| f"chat:{session_id}", |
| 86400, |
| json.dumps(messages) |
| ) |
| |
| return message_id |
|
|
| def get_session_memory(session_id: str) -> List[Dict]: |
| """Get conversation memory for session""" |
| memory_data = redis_client.get(f"memory:{session_id}") |
| if memory_data: |
| return json.loads(memory_data) |
| return [] |
|
|
| def update_session_memory(session_id: str, messages: List[Dict]): |
| """Update conversation memory for session""" |
| redis_client.setex( |
| f"memory:{session_id}", |
| 86400, |
| json.dumps(messages) |
| ) |
|
|
| def generate_session_title(session_id: str) -> str: |
| """Generate a title for the session based on chat history""" |
| try: |
| |
| session_data = redis_client.get(f"session:{session_id}") |
| if session_data: |
| session = json.loads(session_data) |
| if "user_title" in session: |
| |
| return session["user_title"] |
|
|
| |
| chat_data = redis_client.get(f"chat:{session_id}") |
| if not chat_data: |
| return "New Chat" |
|
|
| messages = json.loads(chat_data) |
| if not messages: |
| return "New Chat" |
|
|
| |
| first_user_message = next( |
| (msg["message"] for msg in messages if msg["role"] == "user"), None |
| ) |
|
|
| if not first_user_message: |
| return "New Chat" |
|
|
| |
| words = first_user_message.split()[:6] |
| title = " ".join(words) + ("..." if len(first_user_message.split()) > 6 else "") |
|
|
| |
| if session_data: |
| session["generated_title"] = title |
| if not session.get("user_title"): |
| session["title"] = title |
| redis_client.setex(f"session:{session_id}", 86400, json.dumps(session)) |
|
|
| return title |
|
|
| except Exception as e: |
| print(f"Error in generate_session_title: {e}") |
| return "New Chat" |
|
|
| def update_session_title(session_id: str): |
| """Update session title after first message""" |
| try: |
| |
| session_data = redis_client.get(f"session:{session_id}") |
| if not session_data: |
| return |
| |
| session = json.loads(session_data) |
| |
| |
| if session.get("title", "New Chat") == "New Chat": |
| new_title = generate_session_title(session_id) |
| session["title"] = new_title |
| |
| |
| redis_client.setex( |
| f"session:{session_id}", |
| 86400, |
| json.dumps(session) |
| ) |
| |
| except Exception as e: |
| print(f"Error updating session title: {e}") |
| pass |
|
|
| def get_user_sessions(userLoginId: int) -> List[dict]: |
| """Get all sessions for a user with generated titles""" |
| sessions = [] |
| |
| for key in redis_client.scan_iter(match="session:*"): |
| session_data = redis_client.get(key) |
| if session_data: |
| session = json.loads(session_data) |
| if session["userLoginId"] == userLoginId: |
| |
| session["title"] = generate_session_title(session["session_id"]) |
| sessions.Redis_session_routerend(session) |
| |
| |
| |
| sessions.sort(key=lambda x: x["created_at"], reverse=True) |
|
|
| return sessions |
| def get_user_sessions(userLoginId: int) -> List[dict]: |
| """Get all sessions for a user with generated titles""" |
| sessions = [] |
| |
| for key in redis_client.scan_iter(match="session:*"): |
| session_data = redis_client.get(key) |
| if session_data: |
| session = json.loads(session_data) |
| if session["userLoginId"] == userLoginId: |
| |
| session["title"] = generate_session_title(session["session_id"]) |
| sessions.append(session) |
| |
| |
| sessions.sort(key=lambda x: x["created_at"], reverse=True) |
| return sessions |
|
|
| def delete_session(session_id: str): |
| """Delete session and associated data""" |
| |
| redis_client.delete(f"session:{session_id}") |
| |
| redis_client.delete(f"chat:{session_id}") |
| |
| redis_client.delete(f"memory:{session_id}") |
|
|
| |
|
|
|
|
|
|
| |
|
|
| @Redis_session_router_old.post("/sessions", response_model=SessionResponse) |
| def create_new_session(request: CreateSessionRequest): |
| """Create a new chat session""" |
| try: |
| session_data = create_session(request.userLoginId, request.orgId, request.auth_token) |
| return SessionResponse(**session_data) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Redis_session_router_old.get("/sessions") |
| def list_user_sessions(userLoginId: int): |
| """List all sessions for a user""" |
| try: |
| sessions = get_user_sessions(userLoginId) |
| return { |
| "userLoginId": userLoginId, |
| "total_sessions": len(sessions), |
| "sessions": sessions |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}") |
|
|
| @Redis_session_router_old.get("/sessions/{session_id}") |
| def get_session_details(session_id: str): |
| """Get details of a specific session""" |
| try: |
| session_data = get_session(session_id) |
| return session_data |
| except Exception as e: |
| raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}") |
|
|
| @Redis_session_router_old.delete("/sessions/{session_id}") |
| def delete_user_session(session_id: str): |
| """Delete/close a session""" |
| try: |
| |
| get_session(session_id) |
| |
| |
| delete_session(session_id) |
| |
| return { |
| "message": f"Session {session_id} deleted successfully", |
| "session_id": session_id |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error deleting session: {str(e)}") |
|
|
| @Redis_session_router_old.get("/sessions/{session_id}/history", response_model=ChatHistoryResponse) |
| def get_session_history( |
| session_id: str, |
| n: int = QueryParam(50, description="Number of recent messages to return") |
| ): |
| """Get chat history for a session""" |
| try: |
| |
| get_session(session_id) |
| |
| |
| chat_data = redis_client.get(f"chat:{session_id}") |
| if not chat_data: |
| return ChatHistoryResponse( |
| session_id=session_id, |
| messages=[], |
| total_messages=0 |
| ) |
| |
| messages = json.loads(chat_data) |
| |
| |
| recent_messages = messages[-n:] if len(messages) > n else messages |
| |
| |
| message_responses = [MessageResponse(**msg) for msg in recent_messages] |
| |
| return ChatHistoryResponse( |
| session_id=session_id, |
| messages=message_responses, |
| total_messages=len(messages) |
| ) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}") |
|
|
| @Redis_session_router_old.post("/sessions/{session_id}/messages") |
| def add_message(session_id: str, request: AddMessageRequest): |
| """Add a message to a session""" |
| try: |
| |
| get_session(session_id) |
| |
| |
| if request.role not in ["user", "assistant"]: |
| raise HTTPException(status_code=400, detail="Role must be 'user' or 'assistant'") |
| |
| |
| message_id = add_message_to_session(session_id, request.role, request.message) |
| |
| |
| if request.role == "user": |
| update_session_title(session_id) |
| |
| return { |
| "message_id": message_id, |
| "session_id": session_id, |
| "role": request.role, |
| "message": request.message, |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error adding message: {str(e)}") |
|
|
| @Redis_session_router_old.put("/sessions/{session_id}/title") |
| def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest): |
| """Update the user-defined title of an existing session""" |
| try: |
| session_data = redis_client.get(f"session:{session_id}") |
| if not session_data: |
| raise HTTPException(status_code=404, detail="Session not found or expired") |
|
|
| session = json.loads(session_data) |
|
|
| new_title = request.new_title.strip() |
| if not new_title: |
| raise HTTPException(status_code=400, detail="New title cannot be empty") |
| if len(new_title) > 100: |
| raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters") |
|
|
| old_title = session.get("title", "New Chat") |
| session["user_title"] = new_title |
| session["title"] = new_title |
| session["last_updated"] = datetime.now().isoformat() |
|
|
| redis_client.setex(f"session:{session_id}", 86400, json.dumps(session)) |
|
|
| return { |
| "message": "Session title updated successfully", |
| "session_id": session_id, |
| "old_title": old_title, |
| "new_title": new_title |
| } |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}") |
|
|
| @Redis_session_router_old.put("/sessions/{session_id}/refresh-title") |
| def refresh_session_title(session_id: str): |
| """Manually refresh/regenerate session title""" |
| try: |
| |
| session_data = get_session(session_id) |
| |
| |
| new_title = generate_session_title(session_id) |
| |
| |
| session_data["title"] = new_title |
| redis_client.setex( |
| f"session:{session_id}", |
| 86400, |
| json.dumps(session_data) |
| ) |
| |
| return { |
| "session_id": session_id, |
| "new_title": new_title, |
| "message": "Session title updated successfully" |
| } |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}") |
|
|
| @Redis_session_router_old.get("/sessions/{session_id}/memory") |
| def get_session_memory_endpoint(session_id: str): |
| """Get conversation memory for a session""" |
| try: |
| |
| get_session(session_id) |
| |
| memory = get_session_memory(session_id) |
| |
| return { |
| "session_id": session_id, |
| "memory": memory, |
| "total_items": len(memory) |
| } |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching memory: {str(e)}") |
|
|
| |
|
|
| @Redis_session_router_old.get("/redis-info") |
| def redis_info(): |
| """Get Redis connection information""" |
| try: |
| info = redis_client.info() |
| return { |
| "redis_connected": True, |
| "redis_version": info.get("redis_version"), |
| "used_memory": info.get("used_memory_human"), |
| "connected_clients": info.get("connected_clients"), |
| "total_keys": redis_client.dbsize() |
| } |
| except Exception as e: |
| return { |
| "redis_connected": False, |
| "error": str(e) |
| } |
|
|
| @Redis_session_router_old.get("/health") |
| def health(): |
| """System health check""" |
| try: |
| redis_client.ping() |
| redis_status = "connected" |
| except: |
| redis_status = "disconnected" |
| |
| total_sessions = 0 |
| if redis_status == "connected": |
| try: |
| total_sessions = len(list(redis_client.scan_iter(match="session:*"))) |
| except: |
| pass |
| |
| return { |
| "status": "ok", |
| "redis_status": redis_status, |
| "session_management": "enabled", |
| "total_sessions": total_sessions, |
| "ttl": "24 hours" |
| } |
|
|
| @Redis_session_router_old.get("/") |
| def root(): |
| """Root endpoint with API information""" |
| return { |
| "service": "Redis Session Management API", |
| "version": "1.0.0", |
| "endpoints": { |
| "sessions": { |
| "POST /sessions": "Create new session", |
| "GET /sessions?userLoginId={id}": "List user sessions", |
| "GET /sessions/{id}": "Get session details", |
| "DELETE /sessions/{id}": "Delete session", |
| "GET /sessions/{id}/history": "Get chat history", |
| "POST /sessions/{id}/messages": "Add message to session", |
| "PUT /sessions/{id}/title": "Update session title", |
| "PUT /sessions/{id}/refresh-title": "Refresh session title", |
| "GET /sessions/{id}/memory": "Get session memory" |
| }, |
| "system": { |
| "GET /health": "Health check", |
| "GET /redis-info": "Redis connection info" |
| } |
| } |
| } |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |