managerChat / app.py
LevinAleksey's picture
Update app.py
9ba6fc3 verified
import os
from typing import List, Dict, Optional
from enum import Enum
import chainlit as cl
from huggingface_hub import InferenceClient
from qdrant_client import QdrantClient
from openai import OpenAI
# ================================
# CONFIG
# ================================
HF_TOKEN = os.getenv("HF_TOKEN")
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_ID = "Qwen/Qwen2.5-72B-Instruct"
QDRANT_COLLECTION = "sales_knowledge"
HISTORY_KEEP = 20
HISTORY_SEND_LAST = 10
RAG_LIMIT = 4
RAG_SCORE_THRESHOLD = 0.27
RAG_MAX_CHARS = 2500
# ================================
# СТАДИИ ВОРОНКИ
# ================================
class Stage(str, Enum):
GREETING = "greeting"
DISCOVERY = "discovery"
QUALIFICATION = "qualification"
SOLUTION = "solution"
CLOSING = "closing"
# ================================
# SYSTEM PROMPTS ПО СТАДИЯМ
# ================================
BASE_CONTEXT = """
Ты — AI-консультант компании Alex.Dev. Специализация: чат-боты, 3D-аватары, AI-автоматизация для малого бизнеса.
ТВОИ УСЛУГИ И ЦЕНЫ:
- Telegram/WhatsApp бот с AI: 50,000 - 120,000 ₽
- Бот + интеграция с CRM: 80,000 - 180,000 ₽
- 3D-аватар для сайта/презентаций: 70,000 - 150,000 ₽
- Комплексная AI-автоматизация: 150,000 - 300,000 ₽
- Сроки: 2-6 недель в зависимости от сложности
СТИЛЬ ОБЩЕНИЯ:
- Уверенный эксперт, не продавец
- Короткие ответы: 2-4 предложения
- Без восклицательных знаков и эмодзи (кроме 👋 в приветствии)
- Не задавай больше одного вопроса за раз
- Если клиент задал вопрос — сначала ответь, потом можешь спросить
"""
STAGE_PROMPTS = {
Stage.GREETING: BASE_CONTEXT + """
ТЕКУЩАЯ ЗАДАЧА: Установить контакт и понять, с чем пришел клиент.
Если клиент уже описал задачу — переходи к уточнению деталей.
Если просто поздоровался — спроси одним вопросом, какую задачу хочет решить.
""",
Stage.DISCOVERY: BASE_CONTEXT + """
ТЕКУЩАЯ ЗАДАЧА: Выявить боль клиента и понять контекст.
ВЫЯСНИ (не всё сразу, по одному):
- Какую проблему хочет решить
- Что сейчас не работает / что теряет
- Пробовал ли другие решения
ПРИЁМ: Отражай боль клиента: "Понимаю, ручная обработка заявок съедает время..."
После 2-3 обменов репликами — переходи к квалификации.
""",
Stage.QUALIFICATION: BASE_CONTEXT + """
ТЕКУЩАЯ ЗАДАЧА: Мягко квалифицировать клиента.
ВЫЯСНИ (элегантно, не как анкету):
- Размер бизнеса: "Решение оптимально для команд от 5 человек. Это ваш случай?"
- Бюджет: "Обычно такие проекты стоят от X до Y. Это вписывается в ожидания?"
- Срочность: "Когда хотели бы запустить?"
- ЛПР: "Вы принимаете решение или нужно согласовать?"
Если клиент квалифицирован (есть бюджет, потребность, срочность) — переходи к решению.
Если не квалифицирован — вежливо предложи бесплатные материалы и завершай.
""",
Stage.SOLUTION: BASE_CONTEXT + """
ТЕКУЩАЯ ЗАДАЧА: Дать конкретное предложение с ценой.
ФОРМУЛА ОТВЕТА:
1. "Для вашей задачи подойдет [решение]"
2. "Это стоит примерно [диапазон цен]"
3. "Включает: [2-3 ключевых пункта]"
4. "Точную стоимость и сроки обсудим на коротком созвоне"
После презентации цены — переходи к закрытию.
""",
Stage.CLOSING: BASE_CONTEXT + """
ТЕКУЩАЯ ЗАДАЧА: Закрыть на встречу с менеджером.
СКРИПТ ЗАКРЫТИЯ:
"Предлагаю созвониться на 15-20 минут: покажу похожие кейсы, обсудим детали, дам точную оценку. Когда удобно — завтра или в четверг?"
ЕСЛИ ВОЗРАЖАЕТ:
- "Дорого" → "Понимаю. Давайте на созвоне разберем, что можно оптимизировать под ваш бюджет"
- "Надо подумать" → "Конечно. Что именно хотите обдумать? Возможно, отвечу сейчас"
- "Пришлите КП" → "КП готовлю после короткого брифа, чтобы цифры были точными. 15 минут созвона — и будет детальное предложение"
ЦЕЛЬ: Получить согласие на созвон или контакт (телефон/email) для менеджера.
"""
}
# ================================
# ОПРЕДЕЛЕНИЕ СТАДИИ
# ================================
def detect_stage(history: List[Dict[str, str]], user_text: str) -> Stage:
msg_count = len(history)
if msg_count == 0:
return Stage.GREETING
full_text = " ".join([m["content"].lower() for m in history]) + " " + user_text.lower()
closing_signals = ["созвон", "встреч", "позвон", "когда удобно", "давайте обсудим",
"телефон", "почта", "email", "контакт"]
if any(s in full_text for s in closing_signals) and msg_count > 4:
return Stage.CLOSING
price_signals = ["сколько стоит", "цена", "стоимость", "бюджет", "во сколько обойдется"]
if any(s in full_text for s in price_signals) and msg_count > 2:
return Stage.SOLUTION
if msg_count > 4:
return Stage.QUALIFICATION
if msg_count > 0:
return Stage.DISCOVERY
return Stage.GREETING
# ================================
# SAFETY CHECK
# ================================
def check_env():
if not HF_TOKEN:
raise ValueError("HF_TOKEN is missing!")
if not OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY is missing!")
# ================================
# RAG RETRIEVAL
# ================================
def get_context(
query: str,
q_client: Optional[QdrantClient],
openai_client: OpenAI,
) -> str:
if not q_client:
return ""
try:
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=query
)
vector = response.data[0].embedding
result = q_client.query_points(
collection_name=QDRANT_COLLECTION,
query=vector,
limit=RAG_LIMIT,
with_payload=True,
)
good_chunks = []
for hit in result.points:
score = getattr(hit, "score", 0.0)
if score < RAG_SCORE_THRESHOLD:
continue
payload = hit.payload or {}
text = payload.get("text")
if text:
good_chunks.append(text.strip())
if not good_chunks:
return ""
context = "\n\n---\n\n".join(good_chunks)
if len(context) > RAG_MAX_CHARS:
context = context[:RAG_MAX_CHARS]
return context
except Exception as e:
print("RAG ERROR:", e)
return ""
# ================================
# CHAT START
# ================================
@cl.on_chat_start
async def start():
check_env()
await cl.Message(
content="👋 Привет! Я AI-консультант Alex.Dev. Помогаю бизнесу внедрять чат-боты и автоматизацию. Какую задачу хотите решить?"
).send()
hf_client = InferenceClient(MODEL_ID, token=HF_TOKEN)
q_client = None
if QDRANT_URL and QDRANT_API_KEY:
try:
q_client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
timeout=10
)
print("✅ Qdrant connected")
except Exception as e:
print("❌ Qdrant error:", e)
openai_client = OpenAI(api_key=OPENAI_API_KEY)
cl.user_session.set("hf_client", hf_client)
cl.user_session.set("q_client", q_client)
cl.user_session.set("openai_client", openai_client)
cl.user_session.set("message_history", [])
# ================================
# MAIN MESSAGE HANDLER
# ================================
@cl.on_message
async def main(message: cl.Message):
hf_client: InferenceClient = cl.user_session.get("hf_client")
q_client: Optional[QdrantClient] = cl.user_session.get("q_client")
openai_client: OpenAI = cl.user_session.get("openai_client")
history: List[Dict[str, str]] = cl.user_session.get("message_history") or []
user_text = (message.content or "").strip()
if not user_text:
await cl.Message(content="Напишите ваш вопрос").send()
return
# =========================
# ОПРЕДЕЛЯЕМ СТАДИЮ
# =========================
stage = detect_stage(history, user_text)
system_prompt = STAGE_PROMPTS[stage]
print(f"📊 Stage: {stage.value}, Messages: {len(history)}")
# =========================
# RAG
# =========================
context = get_context(user_text, q_client, openai_client)
# =========================
# BUILD MESSAGES
# =========================
messages_payload = []
messages_payload.append({
"role": "system",
"content": system_prompt
})
if context:
messages_payload.append({
"role": "system",
"content": f"""
РЕЛЕВАНТНАЯ ИНФОРМАЦИЯ ИЗ БАЗЫ ЗНАНИЙ:
{context}
Используй эти данные, если они отвечают на вопрос клиента.
"""
})
history_to_send = history[-HISTORY_SEND_LAST:]
messages_payload.extend(history_to_send)
messages_payload.append({
"role": "user",
"content": user_text
})
# =========================
# STREAM RESPONSE
# =========================
msg = cl.Message(content="")
await msg.send()
full_response = ""
try:
stream = hf_client.chat_completion(
messages=messages_payload,
max_tokens=350,
temperature=0.4,
top_p=0.85,
stream=True,
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
full_response += token
await msg.stream_token(token)
await msg.update()
# =========================
# SAVE MEMORY
# =========================
history.append({"role": "user", "content": user_text})
history.append({"role": "assistant", "content": full_response.strip()})
history = history[-HISTORY_KEEP:]
cl.user_session.set("message_history", history)
except Exception as e:
await cl.Message(content=f"Произошла ошибка. Попробуйте еще раз или напишите нам напрямую: @alexdev").send()
print(f"LLM Error: {e}")