tlong-ds's picture
Update app.py
052cc57 verified
import chainlit as cl
import httpx
import asyncio
import os
API_URL = "https://tlong-ds-sentiment-analysis-api.hf.space"
# -----------------------------
# Chat Service (async HTTP calls)
# -----------------------------
class ChatService:
@staticmethod
async def respond_to_chat(message: str) -> str:
"""Send message to backend API and return formatted response"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{API_URL}/chat",
data={"message": message}
)
if response.status_code == 200:
raw_response = str(response.text).strip()
if raw_response:
if raw_response.startswith('"') and raw_response.endswith('"'):
raw_response = raw_response[1:-1]
formatted_response = (
raw_response.replace("\\n", "\n")
.replace('\\"', '"')
.replace("\\*", "*")
)
return formatted_response
else:
return "**No response received from the API.**"
else:
return f"**API Error {response.status_code}:**\n\n```\n{response.text}\n```"
except Exception as e:
return f"**Error:** {str(e)}"
# -----------------------------
# Typing animation
# -----------------------------
async def run_typing_animation(msg: cl.Message):
frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
idx = 0
try:
while True:
msg.content = f"{frames[idx % len(frames)]} Analyzing sentiment..."
await msg.update()
await asyncio.sleep(0.25)
idx += 1
except asyncio.CancelledError:
# Expected when we cancel after API finishes
raise
# -----------------------------
# Chainlit Handlers
# -----------------------------
@cl.on_chat_start
async def on_chat_start():
"""Show intro message when chat starts"""
await cl.Message(
content=(
"**Sentiment Analysis**\n\n"
"Hi there! 😊 I'm Sen, your friendly sentiment analysis assistant! "
"I'm here to help you understand the emotions and feelings behind your words. "
"Share anything with me and I'll cheerfully analyze the sentiment for you! 🌟\n\n"
"Feel free to ask me about the sentiment of your text. Let's get started! 🚀"
),
author="assistant"
).send()
@cl.on_message
async def main(message: cl.Message):
"""Handle user messages with animation + API call"""
# Placeholder message (starts typing animation)
msg = cl.Message(content="⠋ Analyzing sentiment...", author="assistant")
await msg.send()
# Run animation + API call concurrently
animation_task = asyncio.create_task(run_typing_animation(msg))
api_task = asyncio.create_task(ChatService.respond_to_chat(message.content))
try:
response = await api_task # wait for API result
finally:
animation_task.cancel() # stop animation
try:
await asyncio.wait_for(animation_task, timeout=0.1)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
# Update final content
msg.content = response
await msg.update()