mvp_2_hf_dev_clone / Redis /sessions_old.py
srivatsavdamaraju's picture
Upload 82 files
e947927 verified
from fastapi import FastAPI, HTTPException, Query as QueryParam, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
import os
import json
import redis
import uuid
from datetime import datetime
from dotenv import load_dotenv
from fastapi import APIRouter
load_dotenv()
# Redis_session_router_old = FastAPI(title="Redis Session Management API")
Redis_session_router_old = APIRouter(prefix="/Redis", tags=["Redis_Management_old"])
# ==================== CONFIGURATION ====================
# Redis Configuration
REDIS_URL = os.getenv("REDIS_URL")
REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
# ==================== REDIS CLIENT INITIALIZATION ====================
def get_redis_client():
"""Initialize Redis client with fallback to local Redis"""
try:
if REDIS_URL:
# Use deployed Redis URL
redis_client = redis.from_url(
REDIS_URL,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# Test connection
redis_client.ping()
print(f"βœ… Connected to deployed Redis: {REDIS_URL}")
return redis_client
else:
# Use local Redis
redis_client = redis.StrictRedis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# Test connection
redis_client.ping()
print(f"βœ… Connected to local Redis: {REDIS_HOST}:{REDIS_PORT}")
return redis_client
except Exception as e:
print(f"❌ Redis connection failed: {e}")
raise HTTPException(status_code=500, detail=f"Redis connection failed: {str(e)}")
# Initialize Redis client
# redis_client = get_redis_client()
# ==================== PYDANTIC MODELS ====================
class SessionResponse(BaseModel):
session_id: str
userLoginId: int
orgId: int
created_at: str
status: str
title: Optional[str] = "New Chat"
class MessageResponse(BaseModel):
message_id: str
session_id: str
role: str # "user" or "assistant"
message: str
timestamp: str
class ChatHistoryResponse(BaseModel):
session_id: str
messages: List[MessageResponse]
total_messages: int
class UpdateSessionTitleRequest(BaseModel):
new_title: str
class CreateSessionRequest(BaseModel):
userLoginId: int
orgId: int
auth_token: str
class AddMessageRequest(BaseModel):
session_id: str
role: str
message: str
# ==================== SESSION MANAGEMENT FUNCTIONS ====================
def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict:
"""Create a new chat session"""
session_id = str(uuid.uuid4())
session_data = {
"session_id": session_id,
"userLoginId": userLoginId,
"orgId": orgId,
"auth_token": auth_token,
"created_at": datetime.now().isoformat(),
"status": "active",
"title": "New Chat"
}
# Store session in Redis with 24 hour TTL
redis_client.setex(
f"session:{session_id}",
86400, # 24 hours
json.dumps(session_data)
)
# Initialize empty chat history
redis_client.setex(
f"chat:{session_id}",
86400, # 24 hours
json.dumps([])
)
# Initialize conversation memory
redis_client.setex(
f"memory:{session_id}",
86400, # 24 hours
json.dumps([])
)
return session_data
def get_session(session_id: str) -> dict:
"""Get session data from Redis"""
session_data = redis_client.get(f"session:{session_id}")
if not session_data:
raise HTTPException(status_code=404, detail="Session not found or expired")
return json.loads(session_data)
def add_message_to_session(session_id: str, role: str, message: str) -> str:
"""Add message to session chat history"""
message_id = str(uuid.uuid4())
message_data = {
"message_id": message_id,
"session_id": session_id,
"role": role,
"message": message,
"timestamp": datetime.now().isoformat()
}
# Get current chat history
chat_history = redis_client.get(f"chat:{session_id}")
if chat_history:
messages = json.loads(chat_history)
else:
messages = []
# Add new message
messages.Redis_session_routerend(message_data)
# Update chat history in Redis with extended TTL
redis_client.setex(
f"chat:{session_id}",
86400, # 24 hours
json.dumps(messages)
)
return message_id
def get_session_memory(session_id: str) -> List[Dict]:
"""Get conversation memory for session"""
memory_data = redis_client.get(f"memory:{session_id}")
if memory_data:
return json.loads(memory_data)
return []
def update_session_memory(session_id: str, messages: List[Dict]):
"""Update conversation memory for session"""
redis_client.setex(
f"memory:{session_id}",
86400, # 24 hours
json.dumps(messages)
)
def generate_session_title(session_id: str) -> str:
"""Generate a title for the session based on chat history"""
try:
# Check session
session_data = redis_client.get(f"session:{session_id}")
if session_data:
session = json.loads(session_data)
if "user_title" in session:
# Don't override user-defined titles
return session["user_title"]
# Get chat history
chat_data = redis_client.get(f"chat:{session_id}")
if not chat_data:
return "New Chat"
messages = json.loads(chat_data)
if not messages:
return "New Chat"
# Get first user message
first_user_message = next(
(msg["message"] for msg in messages if msg["role"] == "user"), None
)
if not first_user_message:
return "New Chat"
# Create a simple title from first message
words = first_user_message.split()[:6]
title = " ".join(words) + ("..." if len(first_user_message.split()) > 6 else "")
# Save to session
if session_data:
session["generated_title"] = title
if not session.get("user_title"):
session["title"] = title
redis_client.setex(f"session:{session_id}", 86400, json.dumps(session))
return title
except Exception as e:
print(f"Error in generate_session_title: {e}")
return "New Chat"
def update_session_title(session_id: str):
"""Update session title after first message"""
try:
# Get session data
session_data = redis_client.get(f"session:{session_id}")
if not session_data:
return
session = json.loads(session_data)
# Only update if current title is "New Chat"
if session.get("title", "New Chat") == "New Chat":
new_title = generate_session_title(session_id)
session["title"] = new_title
# Update session in Redis
redis_client.setex(
f"session:{session_id}",
86400, # 24 hours
json.dumps(session)
)
except Exception as e:
print(f"Error updating session title: {e}")
pass
def get_user_sessions(userLoginId: int) -> List[dict]:
"""Get all sessions for a user with generated titles"""
sessions = []
# Scan for all session keys
for key in redis_client.scan_iter(match="session:*"):
session_data = redis_client.get(key)
if session_data:
session = json.loads(session_data)
if session["userLoginId"] == userLoginId:
# Generate title based on chat history
session["title"] = generate_session_title(session["session_id"])
sessions.Redis_session_routerend(session)
# Sort sessions by created_at (most recent first)
sessions.sort(key=lambda x: x["created_at"], reverse=True)
return sessions
def get_user_sessions(userLoginId: int) -> List[dict]:
"""Get all sessions for a user with generated titles"""
sessions = []
# Scan for all session keys
for key in redis_client.scan_iter(match="session:*"):
session_data = redis_client.get(key)
if session_data:
session = json.loads(session_data)
if session["userLoginId"] == userLoginId:
# Generate title based on chat history
session["title"] = generate_session_title(session["session_id"])
sessions.append(session)
# Sort sessions by created_at (most recent first)
sessions.sort(key=lambda x: x["created_at"], reverse=True)
return sessions
def delete_session(session_id: str):
"""Delete session and associated data"""
# Delete session data
redis_client.delete(f"session:{session_id}")
# Delete chat history
redis_client.delete(f"chat:{session_id}")
# Delete memory
redis_client.delete(f"memory:{session_id}")
# ==================== MIDDLEWARE ====================
# ==================== API ENDPOINTS ====================
@Redis_session_router_old.post("/sessions", response_model=SessionResponse)
def create_new_session(request: CreateSessionRequest):
"""Create a new chat session"""
try:
session_data = create_session(request.userLoginId, request.orgId, request.auth_token)
return SessionResponse(**session_data)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}")
# @Redis_session_router_old.get("/sessions")
# def list_user_sessions(userLoginId: int):
# """List all sessions for a user"""
# try:
# sessions = get_user_sessions(userLoginId)
# print(sessions)
# return {
# "userLoginId": userLoginId,
# "total_sessions": len(sessions),
# "sessions": sessions
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}")
@Redis_session_router_old.get("/sessions")
def list_user_sessions(userLoginId: int):
"""List all sessions for a user"""
try:
sessions = get_user_sessions(userLoginId)
return {
"userLoginId": userLoginId,
"total_sessions": len(sessions),
"sessions": sessions
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}")
@Redis_session_router_old.get("/sessions/{session_id}")
def get_session_details(session_id: str):
"""Get details of a specific session"""
try:
session_data = get_session(session_id)
return session_data
except Exception as e:
raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}")
@Redis_session_router_old.delete("/sessions/{session_id}")
def delete_user_session(session_id: str):
"""Delete/close a session"""
try:
# Verify session exists
get_session(session_id)
# Delete session
delete_session(session_id)
return {
"message": f"Session {session_id} deleted successfully",
"session_id": session_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting session: {str(e)}")
@Redis_session_router_old.get("/sessions/{session_id}/history", response_model=ChatHistoryResponse)
def get_session_history(
session_id: str,
n: int = QueryParam(50, description="Number of recent messages to return")
):
"""Get chat history for a session"""
try:
# Verify session exists
get_session(session_id)
# Get chat history
chat_data = redis_client.get(f"chat:{session_id}")
if not chat_data:
return ChatHistoryResponse(
session_id=session_id,
messages=[],
total_messages=0
)
messages = json.loads(chat_data)
# Get the last n messages (or all if less than n)
recent_messages = messages[-n:] if len(messages) > n else messages
# Convert to MessageResponse objects
message_responses = [MessageResponse(**msg) for msg in recent_messages]
return ChatHistoryResponse(
session_id=session_id,
messages=message_responses,
total_messages=len(messages)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}")
@Redis_session_router_old.post("/sessions/{session_id}/messages")
def add_message(session_id: str, request: AddMessageRequest):
"""Add a message to a session"""
try:
# Verify session exists
get_session(session_id)
# Validate role
if request.role not in ["user", "assistant"]:
raise HTTPException(status_code=400, detail="Role must be 'user' or 'assistant'")
# Add message
message_id = add_message_to_session(session_id, request.role, request.message)
# Update title if it's the first user message
if request.role == "user":
update_session_title(session_id)
return {
"message_id": message_id,
"session_id": session_id,
"role": request.role,
"message": request.message,
"timestamp": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error adding message: {str(e)}")
@Redis_session_router_old.put("/sessions/{session_id}/title")
def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest):
"""Update the user-defined title of an existing session"""
try:
session_data = redis_client.get(f"session:{session_id}")
if not session_data:
raise HTTPException(status_code=404, detail="Session not found or expired")
session = json.loads(session_data)
new_title = request.new_title.strip()
if not new_title:
raise HTTPException(status_code=400, detail="New title cannot be empty")
if len(new_title) > 100:
raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters")
old_title = session.get("title", "New Chat")
session["user_title"] = new_title
session["title"] = new_title
session["last_updated"] = datetime.now().isoformat()
redis_client.setex(f"session:{session_id}", 86400, json.dumps(session))
return {
"message": "Session title updated successfully",
"session_id": session_id,
"old_title": old_title,
"new_title": new_title
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
@Redis_session_router_old.put("/sessions/{session_id}/refresh-title")
def refresh_session_title(session_id: str):
"""Manually refresh/regenerate session title"""
try:
# Verify session exists
session_data = get_session(session_id)
# Generate new title
new_title = generate_session_title(session_id)
# Update session
session_data["title"] = new_title
redis_client.setex(
f"session:{session_id}",
86400, # 24 hours
json.dumps(session_data)
)
return {
"session_id": session_id,
"new_title": new_title,
"message": "Session title updated successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}")
@Redis_session_router_old.get("/sessions/{session_id}/memory")
def get_session_memory_endpoint(session_id: str):
"""Get conversation memory for a session"""
try:
# Verify session exists
get_session(session_id)
memory = get_session_memory(session_id)
return {
"session_id": session_id,
"memory": memory,
"total_items": len(memory)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching memory: {str(e)}")
# ==================== SYSTEM ENDPOINTS ====================
@Redis_session_router_old.get("/redis-info")
def redis_info():
"""Get Redis connection information"""
try:
info = redis_client.info()
return {
"redis_connected": True,
"redis_version": info.get("redis_version"),
"used_memory": info.get("used_memory_human"),
"connected_clients": info.get("connected_clients"),
"total_keys": redis_client.dbsize()
}
except Exception as e:
return {
"redis_connected": False,
"error": str(e)
}
@Redis_session_router_old.get("/health")
def health():
"""System health check"""
try:
redis_client.ping()
redis_status = "connected"
except:
redis_status = "disconnected"
total_sessions = 0
if redis_status == "connected":
try:
total_sessions = len(list(redis_client.scan_iter(match="session:*")))
except:
pass
return {
"status": "ok",
"redis_status": redis_status,
"session_management": "enabled",
"total_sessions": total_sessions,
"ttl": "24 hours"
}
@Redis_session_router_old.get("/")
def root():
"""Root endpoint with API information"""
return {
"service": "Redis Session Management API",
"version": "1.0.0",
"endpoints": {
"sessions": {
"POST /sessions": "Create new session",
"GET /sessions?userLoginId={id}": "List user sessions",
"GET /sessions/{id}": "Get session details",
"DELETE /sessions/{id}": "Delete session",
"GET /sessions/{id}/history": "Get chat history",
"POST /sessions/{id}/messages": "Add message to session",
"PUT /sessions/{id}/title": "Update session title",
"PUT /sessions/{id}/refresh-title": "Refresh session title",
"GET /sessions/{id}/memory": "Get session memory"
},
"system": {
"GET /health": "Health check",
"GET /redis-info": "Redis connection info"
}
}
}
# ==================== RUN SERVER ====================
# if __name__ == "__main__":
# import uvicorn
# try:
# uvicorn.run(Redis_session_router_old, host="0.0.0.0", port=8000)
# except KeyboardInterrupt:
# print("\nπŸ›‘ Server stopped gracefully")
# except Exception as e:
# print(f"❌ Server error: {e}")