from flask import Flask, render_template, request, Response, jsonify from mistralai import Mistral import logging import time import requests import re import threading import queue import json import os import trafilatura from bs4 import BeautifulSoup import random app = Flask(__name__) app.secret_key = 'super_secret_key' message_queue = queue.Queue() # Конфигурация Mistral MISTRAL_MODEL = "mistral-large-latest" N_CTX = 32768 MAX_RESULTS = 10 MIN_VALID_SOURCES = 3 MAX_SEARCH_ATTEMPTS = 3 MAX_CONTENT_LENGTH = 40000 # Максимальная длина контента на источник # Новый клиент Mistral mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) SYSTEM_PROMPT = """ Ты PrintMaster, сервисный инженер по печатной технике. Критически важные правила: 1. Формат ответа СТРОГО: **Проблема:** [только краткое описание проблемы] **Решение:** [пошаговые действия] 2. Для шагов решения используй ТОЛЬКО формат: [Цифра]. [Действие] - Подпункт 1 - Подпункт 2 3. Примечания ТОЛЬКО если есть: **Примечания:** - Пункт 1 - Пункт 2 ЖЕСТКИЕ ЗАПРЕТЫ: - Никогда не используй подзаголовки с ### - Никогда не добавляй разделы "Удалены шаги" или подобные - Начинай сразу с **Проблема:** без преамбул - Всегда основывай решение ТОЛЬКО на предоставленных источниках """ BLACKLISTED_DOMAINS = [ 'reddit.com', 'stackoverflow.com', 'quora.com', 'facebook.com', 'youtube.com', 'x.com', 'twitter.com', 'tiktok.com', 'instagram.com' ] USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15" ] logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("/tmp/printer_assistant.log"), logging.StreamHandler() ] ) def get_random_headers(): return { 'User-Agent': random.choice(USER_AGENTS), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://www.google.com/', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } def extract_main_content(html, url): """Извлекает основной контент страницы с помощью trafilatura или BeautifulSoup""" try: # Пробуем trafilatura content = trafilatura.extract(html, include_links=False, include_tables=False) if content and len(content) > 500: return content[:MAX_CONTENT_LENGTH] except Exception as e: logging.error(f"Trafilatura error: {str(e)}") # Fallback на BeautifulSoup try: soup = BeautifulSoup(html, 'html.parser') # Удаляем ненужные элементы for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'form']): element.decompose() # Пытаемся найти основной контент main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile('content|main|article|post', re.I)) if main_content: text = main_content.get_text(separator='\n', strip=True) return text[:MAX_CONTENT_LENGTH] if text else None # Fallback: весь текст body return soup.body.get_text(separator='\n', strip=True)[:MAX_CONTENT_LENGTH] except Exception as e: logging.error(f"BeautifulSoup error: {str(e)}") return None def generate_search_query(prompt: str) -> dict: system_prompt = """ You are a technical expert. Extract structured data from the user's query and generate an English search query. Return data in strict JSON format with these fields: - brand: English brand name (HP, Canon, Konica Minolta, etc.) - model: equipment model (model name only) - error_code: error code (if present) - problem_description: brief English problem description (1-2 sentences) - search_query: full English search query Important rules: 1. All fields MUST be in English 2. For brands use official English names 3. Remove brand mentions and the word "error" from model name 4. If error code is specified - include it in search_query 5. Problem description should be concise technical terms (max 7 words) """ try: response = mistral_client.chat.complete( model=MISTRAL_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], temperature=0.1, max_tokens=350, response_format={"type": "json_object"} ) json_data = json.loads(response.choices[0].message.content) required_fields = ['brand', 'model', 'error_code', 'problem_description', 'search_query'] for field in required_fields: if field not in json_data: json_data[field] = "" if json_data['brand'] and json_data['model']: json_data['model'] = re.sub( re.escape(json_data['brand']), '', json_data['model'], flags=re.IGNORECASE ).strip() if not json_data['search_query']: search_parts = [json_data['brand'], json_data['model']] if json_data['error_code']: search_parts.append(f"error {json_data['error_code']}") if json_data['problem_description']: search_parts.append(json_data['problem_description']) json_data['search_query'] = " ".join(search_parts).strip() return json_data except Exception as e: error_msg = f"❌ Ошибка извлечения данных: {str(e)}" message_queue.put(('log', error_msg)) return { 'brand': "", 'model': "", 'error_code': "", 'problem_description': "", 'search_query': prompt } def gather_sources(search_query: str) -> list: """Поиск источников с несколькими попытками и фильтрацией""" attempts = 0 all_sources = [] valid_sources = [] while attempts < MAX_SEARCH_ATTEMPTS and len(valid_sources) < MIN_VALID_SOURCES: attempts += 1 message_queue.put(('log', f"🔍 Попытка {attempts} поиска по запросу: {search_query}")) try: params = { "api_key": os.getenv("SERPAPI_KEY"), "engine": "google", "q": search_query, "hl": "en", "gl": "us", "num": 20, # Запрашиваем больше результатов "safe": "off", } response = requests.get("https://serpapi.com/search", params=params, timeout=15) response.raise_for_status() data = response.json() # Обработка organic results organic_results = data.get("organic_results", []) for res in organic_results: if len(valid_sources) >= MIN_VALID_SOURCES: break title = res.get("title", "Без заголовка") link = res.get("link", "#") snippet = res.get("snippet", "") or "" # Пропускаем нежелательные домены if any(domain in link for domain in BLACKLISTED_DOMAINS): continue # Пропускаем дубликаты if any(src['url'] == link for src in all_sources): continue # Загрузка полного контента content = None try: headers = get_random_headers() page_response = requests.get(link, headers=headers, timeout=8) if page_response.status_code == 200: content = extract_main_content(page_response.text, link) except Exception as e: logging.error(f"Ошибка загрузки {link}: {str(e)}") # Если контент не получен, используем сниппет if not content: content = snippet # Проверяем релевантность контента if content and len(content) > 100: source_data = { "title": title, "url": link, "content": content[:MAX_CONTENT_LENGTH] } all_sources.append(source_data) valid_sources.append(source_data) message_queue.put(('log', f"✅ Найден источник: {title}")) message_queue.put(('log', f"ℹ️ На попытке {attempts} найдено {len(valid_sources)} валидных источников")) # Изменяем запрос для следующей попытки if len(valid_sources) < MIN_VALID_SOURCES: search_query += " troubleshooting OR fix OR repair" except Exception as e: error_msg = f"❌ Ошибка поиска (попытка {attempts}): {str(e)}" message_queue.put(('log', error_msg)) return valid_sources[:MAX_RESULTS] # Возвращаем не более MAX_RESULTS источников def web_search(query: str) -> list: """Основная функция поиска с несколькими попытками""" start_time = time.time() sources = gather_sources(query) elapsed = time.time() - start_time if sources: message_queue.put(('log', f"✅ Поиск завершен за {elapsed:.2f}с. Найдено {len(sources)} источников.")) else: message_queue.put(('log', f"⚠️ Не удалось найти источники за {elapsed:.2f}с. Ответ будет основан на общих знаниях.")) return sources def clean_response(response: str) -> str: # Удаление служебных тегов response = re.sub(r'|<\|system\|>|', '', response, flags=re.IGNORECASE) # Удаление лишних разделителей response = re.sub(r'^-{3,}\s*', '', response) # Удаление дублирования разделов response = re.sub(r'(\*\*Проблема:\*\*.+?)(\*\*Проблема:\*\*)', r'\1', response, flags=re.DOTALL) response = re.sub(r'(\*\*Решение:\*\*.+?)(\*\*Решение:\*\*)', r'\1', response, flags=re.DOTALL) # Удаление лишних переносов response = re.sub(r'\n\s*\n', '\n\n', response) response = re.sub(r'[ \t]{2,}', ' ', response) # Удаление начальных фраз response = re.sub(r'^Вот исправленный ответ[^:]+:\s*', '', response) # Форматирование примечаний response = re.sub(r'^---\s*Примечания:\s*', '**Примечания:**\n', response) # Удаление лишних маркеров response = re.sub(r'^---\s*', '', response, flags=re.MULTILINE) # Очистка завершающих символов response = re.sub(r'\s*\.{3,}\s*$', '', response) return response.strip() def process_query(prompt: str): try: start_time = time.time() message_queue.put(('log', f"👤 Запрос: {prompt}")) message_queue.put(('log', f"⚙️ Извлекаю параметры из входящего запроса")) norm_data = generate_search_query(prompt) message_queue.put(('log', f"⏏️ Извлечено: {json.dumps(norm_data, ensure_ascii=False)}")) search_query = norm_data['search_query'] sources = web_search(search_query) # Если источников нет, используем fallback if not sources: message_queue.put(('log', "⚠️ Использую резервные данные для генерации ответа")) sources = [{ "title": "Общие знания о принтерах", "url": "", "content": f"Проблема: {norm_data['problem_description']}. Бренд: {norm_data['brand']}, Модель: {norm_data['model']}" }] message_queue.put(('log', f"📚 Найдено {len(sources)} источников")) # Формируем контекст для LLM context_content = "" for i, source in enumerate(sources): context_content += f"[[Источник {i+1}]] {source['title']}\n{source['content']}\n\n" context_content = context_content.strip() message_queue.put(('log', f"⚙️ Определяю проблему")) problem_response = mistral_client.chat.complete( model=MISTRAL_MODEL, messages=[ {"role": "system", "content": "Опиши СУТЬ проблемы в одном предложении. Только диагноз, без решений. Не более 12 слов. На русском."}, {"role": "user", "content": f"Запрос пользователя: {prompt}\nДанные из источников:\n{context_content}"} ], max_tokens=150, temperature=0.2 ) extracted_problem = problem_response.choices[0].message.content.strip() if not extracted_problem or len(extracted_problem) < 5: extracted_problem = f"Неисправность {norm_data['brand']} {norm_data['model']}" message_queue.put(('log', f"🧩 Определённая проблема: {extracted_problem}")) # Формируем промпт с источниками messages = [ {"role": "system", "content": SYSTEM_PROMPT + f""" Контекст: Бренд: {norm_data['brand']} Модель: {norm_data['model']} Ошибка: {norm_data['error_code']} Суть проблемы (на основе источников): {extracted_problem} Данные из источников: {context_content} """}, {"role": "user", "content": f"Проблема: {prompt}"} ] message_queue.put(('log', "🧠 Генерирую ответ на основе источников...")) message_queue.put(('response_start', "")) full_response = "" for chunk in mistral_client.chat.stream( model=MISTRAL_MODEL, messages=messages, max_tokens=2048, temperature=0.3 ): if chunk.data.choices[0].delta.content is not None: chunk_text = chunk.data.choices[0].delta.content full_response += chunk_text message_queue.put(('response_chunk', chunk_text)) # Очистка и форматирование ответа final_response = clean_response(full_response) # Добавляем источники в ответ if sources: final_response += "\n\n**Источники информации:**\n" for i, source in enumerate(sources): final_response += f"- [{source['title']}]({source['url']})\n" message_queue.put(('response_end', final_response)) message_queue.put(('sources', json.dumps(sources))) total_time = time.time() - start_time message_queue.put(('log', f"💡 Ответ сгенерирован за {total_time:.1f}с")) message_queue.put(('done', '')) except Exception as e: error_msg = f"❌ Ошибка: {str(e)}" message_queue.put(('log', error_msg)) message_queue.put(('response', "\n⚠️ Ошибка обработки запроса")) message_queue.put(('done', '')) @app.route('/') def index(): return render_template('index.html') @app.route('/ask', methods=['POST']) def ask(): user_input = request.form['message'] thread = threading.Thread(target=process_query, args=(user_input,)) thread.daemon = True thread.start() return jsonify({'status': 'processing'}) @app.route('/stream') def stream(): def generate(): while True: if not message_queue.empty(): msg_type, content = message_queue.get() data = json.dumps({"type": msg_type, "content": content}) yield f"data: {data}\n\n" else: time.sleep(0.1) return Response(generate(), mimetype='text/event-stream') if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)