agent_backend / app /main.py
GoutamSachdev's picture
Update app/main.py
9714fb7 verified
Raw
History Blame Contribute Delete
41.8 kB
from __future__ import annotations
from typing import Any, AsyncIterator,List
from pydantic import BaseModel
from agents import RunConfig, Runner, SQLiteSession
from agents.model_settings import ModelSettings
from chatkit.agents import AgentContext, stream_agent_response
from chatkit.server import ChatKitServer, StreamingResult
import os
from chatkit.types import (
Attachment,
ClientToolCallItem,
ThreadMetadata,
ThreadStreamEvent,
UserMessageItem,
AssistantMessageItem
)
from .MultiAgent import build_sugguestion_information_agent, build_kimi_information_agent, build_summarizer_agent,build_google_information_agent
from chatkit.types import AssistantMessageItem as AssistantMsg
from fastapi import Depends, FastAPI, Query, Request, HTTPException, BackgroundTasks, Header, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, StreamingResponse
from openai.types.responses import ResponseInputContentParam
from starlette.responses import JSONResponse
import json
import asyncio
from collections import defaultdict
import random
import string
import traceback
from datetime import datetime, timezone, timedelta
from .sqlite_store import SQLiteStore
from .memory_store import MemoryStore
from .user_state import UserStateManager
from dotenv import load_dotenv
load_dotenv(dotenv_path="./.env.local")
DEFAULT_THREAD_ID = "demo_default_thread"
def _user_message_text(item: UserMessageItem) -> str:
parts: list[str] = []
for part in item.content:
text = getattr(part, "text", None)
if text:
parts.append(text)
return " ".join(parts).strip()
def _is_tool_completion_item(item: Any) -> bool:
return isinstance(item, ClientToolCallItem)
class deepseek_CustomerSupportServer(ChatKitServer[dict[str, Any]]):
def __init__(
self,
agent_state: UserStateManager,
) -> None:
store = SQLiteStore(db_path=os.getenv("CHATKIT_DB_PATH", "chatkit_threads.db"))
super().__init__(store)
self.store = store
self.agent_state = agent_state
self.information_agent = build_sugguestion_information_agent()
self.summarizer_agent = build_summarizer_agent()
def _resolve_thread_id(self, thread: ThreadMetadata | None) -> str:
return thread.id if thread and thread.id else DEFAULT_THREAD_ID
async def prepare_conversation_context(self, thread_key: str, message_text: str, status:str) -> str:
# Await handle_history which is now fast because summarization is backgrounded
summary_text, recent_context = await self.handle_history(thread_key)
user_data = self.agent_state.get_user(thread_key)
customer_context = (
"Customer context:\n"
f"- Name: {user_data.customer_name or ''}\n"
f"- Email: {user_data.customer_email or ''}\n"
f"- Phone: {user_data.customer_phone or ''}\n"
f"- Timezone: {user_data.Timezone or ''}\n"
)
if status.lower() == "offline":
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"If the user asks to talk to a human sales agent, respond: "
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
# Await handle_history which is now fast because summarization is backgrounded
summary_text, recent_context = await self.handle_history(thread_key)
user_data = self.agent_state.get_user(thread_key)
customer_context = (
"Customer context:\n"
f"- Name: {user_data.customer_name or ''}\n"
f"- Email: {user_data.customer_email or ''}\n"
f"- Phone: {user_data.customer_phone or ''}\n"
f"- Company: {user_data.company_name or ''}\n"
f"- Timezone: {user_data.Timezone or ''}\n"
)
if status.lower() == "offline":
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"If the user asks to talk to a human sales agent, respond: "
f'\"Our human sales agent is currently offline, May i help in book an appointment for you.\" '
f"Current request: {message_text}\n"
)
return combined_prompt
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"Current request: {message_text}\n"
)
return combined_prompt
async def _async_summarize(self, thread_key: str, user_messages: list, previous_summary: str):
"""Background task to perform summarization without blocking the main flow."""
try:
to_summarize = user_messages[:-5]
combined_text = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in to_summarize
)
summarizer_prompt = (
f"Previous summary:\n{previous_summary}\n\n"
f"Add the following messages into the summary:\n{combined_text}\n"
f"Return a concise updated summary of the entire conversation."
)
session = SQLiteSession(thread_key)
result = await Runner.run(
self.summarizer_agent,
summarizer_prompt,
session=session,
)
self.agent_state.set_summary(thread_key, result.final_output)
print(f"🧠 [BACKGROUND] Summary updated for thread: {thread_key}")
except Exception as e:
print(f"⚠️ [BACKGROUND] Summarization failed for thread {thread_key}: {e}")
async def handle_history(self, thread_key: str) -> tuple[str, str]:
"""Handles message history, returns current state, and triggers summarization in background if needed."""
# 1. Fetch history from store (Fast)
history = self.store._items(thread_key)
user_messages = [i for i in history if isinstance(i, (UserMessageItem, AssistantMessageItem))]
# Keep context within limits
if len(user_messages) > 15:
user_messages = user_messages[-15:]
# 2. Get existing summary from persistence (Fast)
summary_text = self.agent_state.get_summary(thread_key)
# 3. Trigger summarization in background if criteria met (Non-blocking)
if len(user_messages) >= 5 and len(user_messages) % 5 == 0:
print(f"🧠 [HISTORY] Triggering background summarization for {thread_key}")
asyncio.create_task(self._async_summarize(thread_key, user_messages, summary_text))
# 4. Compute recent context for the prompt (Fast)
last_five = user_messages[-10:]
recent_context = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in last_five
)
return summary_text, recent_context
async def respond(
self,
thread: ThreadMetadata,
item: UserMessageItem | None,
context: dict[str, Any],
) -> AsyncIterator[ThreadStreamEvent]:
if item is None:
return
message_text = _user_message_text(item)
if not message_text:
return
thread_key = self._resolve_thread_id(thread)
try:
user_data = self.agent_state.get_user(thread_key)
request_context_enriched = {
**(context or {}),
}
except Exception:
request_context_enriched = context or {}
session = SQLiteSession(thread_key)
agent_context = AgentContext(thread=thread, store=self.store, request_context=request_context_enriched)
combined_prompt = await self.prepare_conversation_context(thread_key, message_text, 'offline')
result_stream = Runner.run_streamed(self.information_agent, combined_prompt, context=agent_context, session=session)
async for event in stream_agent_response(agent_context, result_stream):
yield event
class kimi_CustomerSupportServer(ChatKitServer[dict[str, Any]]):
def __init__(
self,
agent_state: UserStateManager,
) -> None:
store = SQLiteStore(db_path="chatkit_threads.db")
super().__init__(store)
self.store = store
self.agent_state = agent_state
self.summarizer_agent = build_summarizer_agent()
self.information_agent = build_kimi_information_agent()
def _resolve_thread_id(self, thread: ThreadMetadata | None) -> str:
return thread.id if thread and thread.id else DEFAULT_THREAD_ID
async def prepare_conversation_context(self, thread_key: str, message_text: str, status:str) -> str:
# Await handle_history which is now fast because summarization is backgrounded
summary_text, recent_context = await self.handle_history(thread_key)
user_data = self.agent_state.get_user(thread_key)
customer_context = (
"Customer context:\n"
f"- Name: {user_data.customer_name or ''}\n"
f"- Email: {user_data.customer_email or ''}\n"
f"- Phone: {user_data.customer_phone or ''}\n"
f"- Timezone: {user_data.Timezone or ''}\n"
)
if status.lower() == "offline":
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"If the user asks to talk to a human sales agent, respond: "
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
# Await handle_history which is now fast because summarization is backgrounded
summary_text, recent_context = await self.handle_history(thread_key)
user_data = self.agent_state.get_user(thread_key)
customer_context = (
"Customer context:\n"
f"- Name: {user_data.customer_name or ''}\n"
f"- Email: {user_data.customer_email or ''}\n"
f"- Phone: {user_data.customer_phone or ''}\n"
f"- Company: {user_data.company_name or ''}\n"
f"- Timezone: {user_data.Timezone or ''}\n"
)
if status.lower() == "offline":
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"If the user asks to talk to a human sales agent, respond: "
f'\"Our human sales agent is currently offline, May i help in book an appointment for you.\" '
f"Current request: {message_text}\n"
)
return combined_prompt
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"Current request: {message_text}\n"
)
return combined_prompt
async def _async_summarize(self, thread_key: str, user_messages: list, previous_summary: str):
"""Background task to perform summarization without blocking the main flow."""
try:
to_summarize = user_messages[:-5]
combined_text = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in to_summarize
)
summarizer_prompt = (
f"Previous summary:\n{previous_summary}\n\n"
f"Add the following messages into the summary:\n{combined_text}\n"
f"Return a concise updated summary of the entire conversation."
)
session = SQLiteSession(thread_key)
result = await Runner.run(
self.summarizer_agent,
summarizer_prompt,
session=session,
)
self.agent_state.set_summary(thread_key, result.final_output)
print(f"🧠 [BACKGROUND] Summary updated for thread: {thread_key}")
except Exception as e:
print(f"⚠️ [BACKGROUND] Summarization failed for thread {thread_key}: {e}")
async def handle_history(self, thread_key: str) -> tuple[str, str]:
"""Handles message history, returns current state, and triggers summarization in background if needed."""
# 1. Fetch history from store (Fast)
history = self.store._items(thread_key)
user_messages = [i for i in history if isinstance(i, (UserMessageItem, AssistantMessageItem))]
# Keep context within limits
if len(user_messages) > 15:
user_messages = user_messages[-15:]
# 2. Get existing summary from persistence (Fast)
summary_text = self.agent_state.get_summary(thread_key)
# 3. Trigger summarization in background if criteria met (Non-blocking)
if len(user_messages) >= 5 and len(user_messages) % 5 == 0:
print(f"🧠 [HISTORY] Triggering background summarization for {thread_key}")
asyncio.create_task(self._async_summarize(thread_key, user_messages, summary_text))
# 4. Compute recent context for the prompt (Fast)
last_five = user_messages[-10:]
recent_context = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in last_five
)
return summary_text, recent_context
async def respond(
self,
thread: ThreadMetadata,
item: UserMessageItem | None,
context: dict[str, Any],
) -> AsyncIterator[ThreadStreamEvent]:
if item is None:
return
message_text = _user_message_text(item)
if not message_text:
return
thread_key = self._resolve_thread_id(thread)
try:
user_data = self.agent_state.get_user(thread_key)
request_context_enriched = {
**(context or {}),
}
except Exception:
request_context_enriched = context or {}
session = SQLiteSession(thread_key)
agent_context = AgentContext(thread=thread, store=self.store, request_context=request_context_enriched)
combined_prompt = await self.prepare_conversation_context(thread_key, message_text, 'offline')
result_stream = Runner.run_streamed(self.information_agent, combined_prompt, context=agent_context, session=session)
async for event in stream_agent_response(agent_context, result_stream):
yield event
class google_CustomerSupportServer(ChatKitServer[dict[str, Any]]):
def __init__(
self,
agent_state: UserStateManager,
) -> None:
store = SQLiteStore(db_path="chatkit_threads.db")
super().__init__(store)
self.store = store
self.agent_state = agent_state
self.information_agent = build_google_information_agent()
self.summarizer_agent = build_summarizer_agent()
def _resolve_thread_id(self, thread: ThreadMetadata | None) -> str:
return thread.id if thread and thread.id else DEFAULT_THREAD_ID
async def prepare_conversation_context(self, thread_key: str, message_text: str, status:str) -> str:
# Await handle_history which is now fast because summarization is backgrounded
summary_text, recent_context = await self.handle_history(thread_key)
user_data = self.agent_state.get_user(thread_key)
customer_context = (
"Customer context:\n"
f"- Name: {user_data.customer_name or ''}\n"
f"- Email: {user_data.customer_email or ''}\n"
f"- Phone: {user_data.customer_phone or ''}\n"
f"- Timezone: {user_data.Timezone or ''}\n"
)
if status.lower() == "offline":
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"If the user asks to talk to a human sales agent, respond: "
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
combined_prompt = (
f"{customer_context}\n"
f"Previous summary:\n{summary_text}\n\n"
f"Recent conversation (last 5 messages):\n{recent_context}\n\n"
f"-This Company you are representing for : Sunmarke School\n"
f"Current request: {message_text}\n"
)
return combined_prompt
async def _async_summarize(self, thread_key: str, user_messages: list, previous_summary: str):
"""Background task to perform summarization without blocking the main flow."""
try:
to_summarize = user_messages[:-5]
combined_text = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in to_summarize
)
summarizer_prompt = (
f"Previous summary:\n{previous_summary}\n\n"
f"Add the following messages into the summary:\n{combined_text}\n"
f"Return a concise updated summary of the entire conversation."
)
session = SQLiteSession(thread_key)
result = await Runner.run(
self.summarizer_agent,
summarizer_prompt,
session=session,
)
self.agent_state.set_summary(thread_key, result.final_output)
print(f"🧠 [BACKGROUND] Summary updated for thread: {thread_key}")
except Exception as e:
print(f"⚠️ [BACKGROUND] Summarization failed for thread {thread_key}: {e}")
async def handle_history(self, thread_key: str) -> tuple[str, str]:
"""Handles message history, returns current state, and triggers summarization in background if needed."""
# 1. Fetch history from store (Fast)
history = self.store._items(thread_key)
user_messages = [i for i in history if isinstance(i, (UserMessageItem, AssistantMessageItem))]
# Keep context within limits
if len(user_messages) > 15:
user_messages = user_messages[-15:]
# 2. Get existing summary from persistence (Fast)
summary_text = self.agent_state.get_summary(thread_key)
# 3. Trigger summarization in background if criteria met (Non-blocking)
if len(user_messages) >= 5 and len(user_messages) % 5 == 0:
print(f"🧠 [HISTORY] Triggering background summarization for {thread_key}")
asyncio.create_task(self._async_summarize(thread_key, user_messages, summary_text))
# 4. Compute recent context for the prompt (Fast)
last_five = user_messages[-10:]
recent_context = "\n".join(
f"{'User' if isinstance(i, UserMessageItem) else 'Assistant'}: {_user_message_text(i)}"
for i in last_five
)
return summary_text, recent_context
async def respond(
self,
thread: ThreadMetadata,
item: UserMessageItem | None,
context: dict[str, Any],
) -> AsyncIterator[ThreadStreamEvent]:
if item is None:
return
message_text = _user_message_text(item)
if not message_text:
return
thread_key = self._resolve_thread_id(thread)
try:
user_data = self.agent_state.get_user(thread_key)
request_context_enriched = {
**(context or {}),
}
except Exception:
request_context_enriched = context or {}
session = SQLiteSession(thread_key)
agent_context = AgentContext(thread=thread, store=self.store, request_context=request_context_enriched)
combined_prompt = await self.prepare_conversation_context(thread_key, message_text, 'offline')
result_stream = Runner.run_streamed(self.information_agent, combined_prompt, context=agent_context, session=session)
async for event in stream_agent_response(agent_context, result_stream):
yield event
state_manager = UserStateManager(db_path=os.getenv("USER_STATE_DB_PATH", "user_state.db"))
support_server = deepseek_CustomerSupportServer(agent_state=state_manager)
kimi_support_server = kimi_CustomerSupportServer(agent_state=state_manager)
google_support_server = google_CustomerSupportServer(agent_state=state_manager)
app = FastAPI(title="ChatKit Customer Support API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def get_server() -> deepseek_CustomerSupportServer:
return support_server
def get_kimi_server() -> kimi_CustomerSupportServer:
return kimi_support_server
def get_google_server() -> google_CustomerSupportServer:
return google_support_server
@app.post("/deepseek/support/chatkit")
async def chatkit_endpoint(
request: Request, server: deepseek_CustomerSupportServer = Depends(get_server)
) -> Response:
payload = await request.body()
result = await server.process(payload, {"request": request})
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
if hasattr(result, "json"):
return Response(content=result.json, media_type="application/json")
return JSONResponse(result)
@app.post("/kimi/support/chatkit")
async def chatkit_endpoint(
request: Request, server: kimi_CustomerSupportServer = Depends(get_kimi_server)
) -> Response:
payload = await request.body()
result = await server.process(payload, {"request": request})
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
if hasattr(result, "json"):
return Response(content=result.json, media_type="application/json")
return JSONResponse(result)
@app.post("/google/support/chatkit")
async def chatkit_endpoint(
request: Request, server: google_CustomerSupportServer = Depends(get_google_server)
) -> Response:
payload = await request.body()
result = await server.process(payload, {"request": request})
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
if hasattr(result, "json"):
return Response(content=result.json, media_type="application/json")
return JSONResponse(result)
@app.post("/api/chat/debug")
async def chat_debug(request: Request):
body = await request.body()
print("RAW BODY RECEIVED:", body)
return {"received": body.decode()}
def _thread_param(thread_id: str | None) -> str:
return thread_id or DEFAULT_THREAD_ID
@app.get("/deepseek/support/customer")
async def deepseek_customer_snapshot(
thread_id: str | None = Query(None, description="ChatKit thread identifier"),
server: deepseek_CustomerSupportServer = Depends(get_server),
) -> dict[str, Any]:
key = _thread_param(thread_id)
data = server.agent_state.to_dict(key)
print("data", data)
return {"customer": data}
@app.get("/kimi/support/customer")
async def kimi_customer_snapshot(
thread_id: str | None = Query(None, description="ChatKit thread identifier"),
server: kimi_CustomerSupportServer = Depends(get_kimi_server),
) -> dict[str, Any]:
key = _thread_param(thread_id)
data = server.agent_state.to_dict(key)
print("data", data)
return {"customer": data}
@app.get("/google/support/customer")
async def google_customer_snapshot(
thread_id: str | None = Query(None, description="ChatKit thread identifier"),
server: google_CustomerSupportServer = Depends(get_google_server),
) -> dict[str, Any]:
key = _thread_param(thread_id)
data = server.agent_state.to_dict(key)
print("data", data)
return {"customer": data}
@app.get("/support/customer")
async def customer_snapshot(
thread_id: str | None = Query(None, description="ChatKit thread identifier"),
server: google_CustomerSupportServer = Depends(get_google_server),
) -> dict[str, Any]:
key = _thread_param(thread_id)
data = server.agent_state.to_dict(key)
print("data", data)
return {"customer": data}
@app.post("/support/transcribe")
async def transcribe_audio(file: UploadFile = File(...)):
"""Transcribe audio using Groq Whisper model"""
import tempfile
import os
from groq import Groq
try:
# Initialize Groq client with API key from environment
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise HTTPException(status_code=500, detail="GROQ_API_KEY not found in environment")
client = Groq(api_key=groq_api_key)
# Read audio file
audio_data = await file.read()
# Create temporary file with original extension or default to .webm
file_extension = os.path.splitext(file.filename)[1] if file.filename else ".webm"
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
temp_file.write(audio_data)
temp_file_path = temp_file.name
# Transcribe with Groq Whisper
with open(temp_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
file=audio_file,
model="whisper-large-v3-turbo",
response_format="verbose_json",
timestamp_granularities=["word", "segment"],
language="en",
temperature=0.0
)
# Cleanup temporary file
os.unlink(temp_file_path)
# Return transcription text and full details
return {
"text": transcription.text,
"details": json.loads(json.dumps(transcription, default=str))
}
except Exception as e:
# Cleanup temp file if it exists
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"❌ Transcription error: {str(e)}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
@app.post("/deepseek/support/transcribe")
async def deepseek_transcribe_audio(file: UploadFile = File(...)):
"""Transcribe audio using Groq Whisper model"""
import tempfile
import os
from groq import Groq
try:
# Initialize Groq client with API key from environment
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise HTTPException(status_code=500, detail="GROQ_API_KEY not found in environment")
client = Groq(api_key=groq_api_key)
# Read audio file
audio_data = await file.read()
# Create temporary file with original extension or default to .webm
file_extension = os.path.splitext(file.filename)[1] if file.filename else ".webm"
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
temp_file.write(audio_data)
temp_file_path = temp_file.name
# Transcribe with Groq Whisper
with open(temp_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
file=audio_file,
model="whisper-large-v3-turbo",
response_format="verbose_json",
timestamp_granularities=["word", "segment"],
language="en",
temperature=0.0
)
# Cleanup temporary file
os.unlink(temp_file_path)
# Return transcription text and full details
return {
"text": transcription.text,
"details": json.loads(json.dumps(transcription, default=str))
}
except Exception as e:
# Cleanup temp file if it exists
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"❌ Transcription error: {str(e)}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
@app.post("/google/support/transcribe")
async def google_transcribe_audio(file: UploadFile = File(...)):
"""Transcribe audio using Groq Whisper model"""
import tempfile
import os
from groq import Groq
try:
# Initialize Groq client with API key from environment
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise HTTPException(status_code=500, detail="GROQ_API_KEY not found in environment")
client = Groq(api_key=groq_api_key)
# Read audio file
audio_data = await file.read()
# Create temporary file with original extension or default to .webm
file_extension = os.path.splitext(file.filename)[1] if file.filename else ".webm"
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
temp_file.write(audio_data)
temp_file_path = temp_file.name
# Transcribe with Groq Whisper
with open(temp_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
file=audio_file,
model="whisper-large-v3-turbo",
response_format="verbose_json",
timestamp_granularities=["word", "segment"],
language="en",
temperature=0.0
)
# Cleanup temporary file
os.unlink(temp_file_path)
# Return transcription text and full details
return {
"text": transcription.text,
"details": json.loads(json.dumps(transcription, default=str))
}
except Exception as e:
# Cleanup temp file if it exists
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"❌ Transcription error: {str(e)}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
@app.post("/kimi/support/transcribe")
async def kimi_transcribe_audio(file: UploadFile = File(...)):
"""Transcribe audio using Groq Whisper model"""
import tempfile
import os
from groq import Groq
try:
# Initialize Groq client with API key from environment
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise HTTPException(status_code=500, detail="GROQ_API_KEY not found in environment")
client = Groq(api_key=groq_api_key)
# Read audio file
audio_data = await file.read()
# Create temporary file with original extension or default to .webm
file_extension = os.path.splitext(file.filename)[1] if file.filename else ".webm"
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
temp_file.write(audio_data)
temp_file_path = temp_file.name
# Transcribe with Groq Whisper
with open(temp_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
file=audio_file,
model="whisper-large-v3-turbo",
response_format="verbose_json",
timestamp_granularities=["word", "segment"],
language="en",
temperature=0.0
)
# Cleanup temporary file
os.unlink(temp_file_path)
# Return transcription text and full details
return {
"text": transcription.text,
"details": json.loads(json.dumps(transcription, default=str))
}
except Exception as e:
# Cleanup temp file if it exists
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
print(f"❌ Transcription error: {str(e)}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
@app.get("/deepseek/support/threads/{thread_id}/messages")
async def deepseek_get_thread_messages(
thread_id: str,
server: deepseek_CustomerSupportServer = Depends(get_server)
):
"""Get last 10 messages for a specific thread."""
try:
# Get all items from the thread
items = server.store._items(thread_id)
# Filter to only UserMessageItem and AssistantMessageItem
messages = []
for item in items:
if isinstance(item, (UserMessageItem, AssistantMessageItem)):
message_dict = item.model_dump()
# Ensure created_at is a string
if hasattr(message_dict.get('created_at'), 'isoformat'):
message_dict['created_at'] = message_dict['created_at'].isoformat()
elif message_dict.get('created_at'):
message_dict['created_at'] = str(message_dict['created_at'])
messages.append(message_dict)
# Get last 10 messages
last_10_messages = messages[-10:] if len(messages) > 10 else messages
return {
"thread_id": thread_id,
"total_message_count": len(messages),
"returned_message_count": len(last_10_messages),
"messages": last_10_messages
}
except Exception as e:
print(f"❌ Error fetching messages for thread {thread_id}: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to fetch messages: {str(e)}")
@app.get("/support/threads/{thread_id}/messages")
async def get_thread_messages(
thread_id: str,
server: deepseek_CustomerSupportServer = Depends(get_server)
):
"""Get last 10 messages for a specific thread."""
try:
# Get all items from the thread
items = server.store._items(thread_id)
# Filter to only UserMessageItem and AssistantMessageItem
messages = []
for item in items:
if isinstance(item, (UserMessageItem, AssistantMessageItem)):
message_dict = item.model_dump()
# Ensure created_at is a string
if hasattr(message_dict.get('created_at'), 'isoformat'):
message_dict['created_at'] = message_dict['created_at'].isoformat()
elif message_dict.get('created_at'):
message_dict['created_at'] = str(message_dict['created_at'])
messages.append(message_dict)
# Get last 10 messages
last_10_messages = messages[-10:] if len(messages) > 10 else messages
return {
"thread_id": thread_id,
"total_message_count": len(messages),
"returned_message_count": len(last_10_messages),
"messages": last_10_messages
}
except Exception as e:
print(f"❌ Error fetching messages for thread {thread_id}: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to fetch messages: {str(e)}")
@app.get("/kimi/support/threads/{thread_id}/messages")
async def kimi_get_thread_messages(
thread_id: str,
server:kimi_CustomerSupportServer = Depends(get_kimi_server)
):
"""Get last 10 messages for a specific thread."""
try:
# Get all items from the thread
items = server.store._items(thread_id)
# Filter to only UserMessageItem and AssistantMessageItem
messages = []
for item in items:
if isinstance(item, (UserMessageItem, AssistantMessageItem)):
message_dict = item.model_dump()
# Ensure created_at is a string
if hasattr(message_dict.get('created_at'), 'isoformat'):
message_dict['created_at'] = message_dict['created_at'].isoformat()
elif message_dict.get('created_at'):
message_dict['created_at'] = str(message_dict['created_at'])
messages.append(message_dict)
# Get last 10 messages
last_10_messages = messages[-10:] if len(messages) > 10 else messages
return {
"thread_id": thread_id,
"total_message_count": len(messages),
"returned_message_count": len(last_10_messages),
"messages": last_10_messages
}
except Exception as e:
print(f"❌ Error fetching messages for thread {thread_id}: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to fetch messages: {str(e)}")
@app.get("/google/support/threads/{thread_id}/messages")
async def google_get_thread_messages(
thread_id: str,
server:google_CustomerSupportServer = Depends(get_google_server)
):
"""Get last 10 messages for a specific thread."""
try:
# Get all items from the thread
items = server.store._items(thread_id)
# Filter to only UserMessageItem and AssistantMessageItem
messages = []
for item in items:
if isinstance(item, (UserMessageItem, AssistantMessageItem)):
message_dict = item.model_dump()
# Ensure created_at is a string
if hasattr(message_dict.get('created_at'), 'isoformat'):
message_dict['created_at'] = message_dict['created_at'].isoformat()
elif message_dict.get('created_at'):
message_dict['created_at'] = str(message_dict['created_at'])
messages.append(message_dict)
# Get last 10 messages
last_10_messages = messages[-10:] if len(messages) > 10 else messages
return {
"thread_id": thread_id,
"total_message_count": len(messages),
"returned_message_count": len(last_10_messages),
"messages": last_10_messages
}
except Exception as e:
print(f"❌ Error fetching messages for thread {thread_id}: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Failed to fetch messages: {str(e)}")
@app.post("/support/customer")
async def customer_update(
request: Request,
thread_id: str | None = Query(None, description="ChatKit thread identifier"),
server: deepseek_CustomerSupportServer = Depends(get_server),
) -> dict[str, str]:
try:
payload = await request.json()
except Exception:
payload = {}
key = _thread_param(thread_id)
try:
print(f"payload: {payload}")
name = (payload.get("name") or "").strip()
email = (payload.get("email") or "").strip()
phone = (payload.get("phone") or "").strip()
company_name = (payload.get("company_name") or payload.get("company") or "").strip()
timezone = (payload.get("timeZone") or payload.get("timezone") or "").strip()
server.agent_state.set_customer_info(
key,
name=name or None,
email=email or None,
phone=phone or None,
company_name=company_name or None,
)
if timezone:
server.agent_state.set_timezone(key, timezone)
# 🔥 Preload vector index for company to avoid 19s delay on first message
return {"status": "ok"}
except Exception:
return {"status": "error"}
@app.get("/support/health")
async def health_check() -> dict[str, str]:
return {"status": "healthy"}
@app.get("/")
def root():
return {
"status": "ok",
"message": "ChatKit backend is running 🚀"
}