import os from typing import List, Dict, Optional from enum import Enum import chainlit as cl from huggingface_hub import InferenceClient from qdrant_client import QdrantClient from openai import OpenAI # ================================ # CONFIG # ================================ HF_TOKEN = os.getenv("HF_TOKEN") QDRANT_URL = os.getenv("QDRANT_URL") QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") MODEL_ID = "Qwen/Qwen2.5-72B-Instruct" QDRANT_COLLECTION = "sales_knowledge" HISTORY_KEEP = 20 HISTORY_SEND_LAST = 10 RAG_LIMIT = 4 RAG_SCORE_THRESHOLD = 0.27 RAG_MAX_CHARS = 2500 # ================================ # СТАДИИ ВОРОНКИ # ================================ class Stage(str, Enum): GREETING = "greeting" DISCOVERY = "discovery" QUALIFICATION = "qualification" SOLUTION = "solution" CLOSING = "closing" # ================================ # SYSTEM PROMPTS ПО СТАДИЯМ # ================================ BASE_CONTEXT = """ Ты — AI-консультант компании Alex.Dev. Специализация: чат-боты, 3D-аватары, AI-автоматизация для малого бизнеса. ТВОИ УСЛУГИ И ЦЕНЫ: - Telegram/WhatsApp бот с AI: 50,000 - 120,000 ₽ - Бот + интеграция с CRM: 80,000 - 180,000 ₽ - 3D-аватар для сайта/презентаций: 70,000 - 150,000 ₽ - Комплексная AI-автоматизация: 150,000 - 300,000 ₽ - Сроки: 2-6 недель в зависимости от сложности СТИЛЬ ОБЩЕНИЯ: - Уверенный эксперт, не продавец - Короткие ответы: 2-4 предложения - Без восклицательных знаков и эмодзи (кроме 👋 в приветствии) - Не задавай больше одного вопроса за раз - Если клиент задал вопрос — сначала ответь, потом можешь спросить """ STAGE_PROMPTS = { Stage.GREETING: BASE_CONTEXT + """ ТЕКУЩАЯ ЗАДАЧА: Установить контакт и понять, с чем пришел клиент. Если клиент уже описал задачу — переходи к уточнению деталей. Если просто поздоровался — спроси одним вопросом, какую задачу хочет решить. """, Stage.DISCOVERY: BASE_CONTEXT + """ ТЕКУЩАЯ ЗАДАЧА: Выявить боль клиента и понять контекст. ВЫЯСНИ (не всё сразу, по одному): - Какую проблему хочет решить - Что сейчас не работает / что теряет - Пробовал ли другие решения ПРИЁМ: Отражай боль клиента: "Понимаю, ручная обработка заявок съедает время..." После 2-3 обменов репликами — переходи к квалификации. """, Stage.QUALIFICATION: BASE_CONTEXT + """ ТЕКУЩАЯ ЗАДАЧА: Мягко квалифицировать клиента. ВЫЯСНИ (элегантно, не как анкету): - Размер бизнеса: "Решение оптимально для команд от 5 человек. Это ваш случай?" - Бюджет: "Обычно такие проекты стоят от X до Y. Это вписывается в ожидания?" - Срочность: "Когда хотели бы запустить?" - ЛПР: "Вы принимаете решение или нужно согласовать?" Если клиент квалифицирован (есть бюджет, потребность, срочность) — переходи к решению. Если не квалифицирован — вежливо предложи бесплатные материалы и завершай. """, Stage.SOLUTION: BASE_CONTEXT + """ ТЕКУЩАЯ ЗАДАЧА: Дать конкретное предложение с ценой. ФОРМУЛА ОТВЕТА: 1. "Для вашей задачи подойдет [решение]" 2. "Это стоит примерно [диапазон цен]" 3. "Включает: [2-3 ключевых пункта]" 4. "Точную стоимость и сроки обсудим на коротком созвоне" После презентации цены — переходи к закрытию. """, Stage.CLOSING: BASE_CONTEXT + """ ТЕКУЩАЯ ЗАДАЧА: Закрыть на встречу с менеджером. СКРИПТ ЗАКРЫТИЯ: "Предлагаю созвониться на 15-20 минут: покажу похожие кейсы, обсудим детали, дам точную оценку. Когда удобно — завтра или в четверг?" ЕСЛИ ВОЗРАЖАЕТ: - "Дорого" → "Понимаю. Давайте на созвоне разберем, что можно оптимизировать под ваш бюджет" - "Надо подумать" → "Конечно. Что именно хотите обдумать? Возможно, отвечу сейчас" - "Пришлите КП" → "КП готовлю после короткого брифа, чтобы цифры были точными. 15 минут созвона — и будет детальное предложение" ЦЕЛЬ: Получить согласие на созвон или контакт (телефон/email) для менеджера. """ } # ================================ # ОПРЕДЕЛЕНИЕ СТАДИИ # ================================ def detect_stage(history: List[Dict[str, str]], user_text: str) -> Stage: msg_count = len(history) if msg_count == 0: return Stage.GREETING full_text = " ".join([m["content"].lower() for m in history]) + " " + user_text.lower() closing_signals = ["созвон", "встреч", "позвон", "когда удобно", "давайте обсудим", "телефон", "почта", "email", "контакт"] if any(s in full_text for s in closing_signals) and msg_count > 4: return Stage.CLOSING price_signals = ["сколько стоит", "цена", "стоимость", "бюджет", "во сколько обойдется"] if any(s in full_text for s in price_signals) and msg_count > 2: return Stage.SOLUTION if msg_count > 4: return Stage.QUALIFICATION if msg_count > 0: return Stage.DISCOVERY return Stage.GREETING # ================================ # SAFETY CHECK # ================================ def check_env(): if not HF_TOKEN: raise ValueError("HF_TOKEN is missing!") if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY is missing!") # ================================ # RAG RETRIEVAL # ================================ def get_context( query: str, q_client: Optional[QdrantClient], openai_client: OpenAI, ) -> str: if not q_client: return "" try: response = openai_client.embeddings.create( model="text-embedding-3-small", input=query ) vector = response.data[0].embedding result = q_client.query_points( collection_name=QDRANT_COLLECTION, query=vector, limit=RAG_LIMIT, with_payload=True, ) good_chunks = [] for hit in result.points: score = getattr(hit, "score", 0.0) if score < RAG_SCORE_THRESHOLD: continue payload = hit.payload or {} text = payload.get("text") if text: good_chunks.append(text.strip()) if not good_chunks: return "" context = "\n\n---\n\n".join(good_chunks) if len(context) > RAG_MAX_CHARS: context = context[:RAG_MAX_CHARS] return context except Exception as e: print("RAG ERROR:", e) return "" # ================================ # CHAT START # ================================ @cl.on_chat_start async def start(): check_env() await cl.Message( content="👋 Привет! Я AI-консультант Alex.Dev. Помогаю бизнесу внедрять чат-боты и автоматизацию. Какую задачу хотите решить?" ).send() hf_client = InferenceClient(MODEL_ID, token=HF_TOKEN) q_client = None if QDRANT_URL and QDRANT_API_KEY: try: q_client = QdrantClient( url=QDRANT_URL, api_key=QDRANT_API_KEY, timeout=10 ) print("✅ Qdrant connected") except Exception as e: print("❌ Qdrant error:", e) openai_client = OpenAI(api_key=OPENAI_API_KEY) cl.user_session.set("hf_client", hf_client) cl.user_session.set("q_client", q_client) cl.user_session.set("openai_client", openai_client) cl.user_session.set("message_history", []) # ================================ # MAIN MESSAGE HANDLER # ================================ @cl.on_message async def main(message: cl.Message): hf_client: InferenceClient = cl.user_session.get("hf_client") q_client: Optional[QdrantClient] = cl.user_session.get("q_client") openai_client: OpenAI = cl.user_session.get("openai_client") history: List[Dict[str, str]] = cl.user_session.get("message_history") or [] user_text = (message.content or "").strip() if not user_text: await cl.Message(content="Напишите ваш вопрос").send() return # ========================= # ОПРЕДЕЛЯЕМ СТАДИЮ # ========================= stage = detect_stage(history, user_text) system_prompt = STAGE_PROMPTS[stage] print(f"📊 Stage: {stage.value}, Messages: {len(history)}") # ========================= # RAG # ========================= context = get_context(user_text, q_client, openai_client) # ========================= # BUILD MESSAGES # ========================= messages_payload = [] messages_payload.append({ "role": "system", "content": system_prompt }) if context: messages_payload.append({ "role": "system", "content": f""" РЕЛЕВАНТНАЯ ИНФОРМАЦИЯ ИЗ БАЗЫ ЗНАНИЙ: {context} Используй эти данные, если они отвечают на вопрос клиента. """ }) history_to_send = history[-HISTORY_SEND_LAST:] messages_payload.extend(history_to_send) messages_payload.append({ "role": "user", "content": user_text }) # ========================= # STREAM RESPONSE # ========================= msg = cl.Message(content="") await msg.send() full_response = "" try: stream = hf_client.chat_completion( messages=messages_payload, max_tokens=350, temperature=0.4, top_p=0.85, stream=True, ) for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token await msg.stream_token(token) await msg.update() # ========================= # SAVE MEMORY # ========================= history.append({"role": "user", "content": user_text}) history.append({"role": "assistant", "content": full_response.strip()}) history = history[-HISTORY_KEEP:] cl.user_session.set("message_history", history) except Exception as e: await cl.Message(content=f"Произошла ошибка. Попробуйте еще раз или напишите нам напрямую: @alexdev").send() print(f"LLM Error: {e}")