| from flask import Flask, render_template, request, Response, jsonify |
| from mistralai import Mistral |
| import logging |
| import time |
| import requests |
| import re |
| import threading |
| import queue |
| import json |
| import os |
| import trafilatura |
| from bs4 import BeautifulSoup |
| import random |
|
|
| app = Flask(__name__) |
| app.secret_key = 'super_secret_key' |
|
|
| message_queue = queue.Queue() |
|
|
| |
| MISTRAL_MODEL = "mistral-large-latest" |
| N_CTX = 32768 |
| MAX_RESULTS = 10 |
| MIN_VALID_SOURCES = 3 |
| MAX_SEARCH_ATTEMPTS = 3 |
| MAX_CONTENT_LENGTH = 40000 |
|
|
| |
| mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY")) |
|
|
| SYSTEM_PROMPT = """ |
| Ты PrintMaster, сервисный инженер по печатной технике. Критически важные правила: |
| 1. Формат ответа СТРОГО: |
| **Проблема:** [только краткое описание проблемы] |
| **Решение:** [пошаговые действия] |
| 2. Для шагов решения используй ТОЛЬКО формат: |
| [Цифра]. [Действие] |
| - Подпункт 1 |
| - Подпункт 2 |
| 3. Примечания ТОЛЬКО если есть: |
| **Примечания:** |
| - Пункт 1 |
| - Пункт 2 |
| ЖЕСТКИЕ ЗАПРЕТЫ: |
| - Никогда не используй подзаголовки с ### |
| - Никогда не добавляй разделы "Удалены шаги" или подобные |
| - Начинай сразу с **Проблема:** без преамбул |
| - Всегда основывай решение ТОЛЬКО на предоставленных источниках |
| """ |
|
|
| BLACKLISTED_DOMAINS = [ |
| 'reddit.com', |
| 'stackoverflow.com', |
| 'quora.com', |
| 'facebook.com', |
| 'youtube.com', |
| 'x.com', |
| 'twitter.com', |
| 'tiktok.com', |
| 'instagram.com' |
| ] |
|
|
| USER_AGENTS = [ |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15" |
| ] |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler("/tmp/printer_assistant.log"), |
| logging.StreamHandler() |
| ] |
| ) |
|
|
| def get_random_headers(): |
| return { |
| 'User-Agent': random.choice(USER_AGENTS), |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
| 'Accept-Language': 'en-US,en;q=0.5', |
| 'Referer': 'https://www.google.com/', |
| 'DNT': '1', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1' |
| } |
|
|
| def extract_main_content(html, url): |
| """Извлекает основной контент страницы с помощью trafilatura или BeautifulSoup""" |
| try: |
| |
| content = trafilatura.extract(html, include_links=False, include_tables=False) |
| if content and len(content) > 500: |
| return content[:MAX_CONTENT_LENGTH] |
| except Exception as e: |
| logging.error(f"Trafilatura error: {str(e)}") |
| |
| |
| try: |
| soup = BeautifulSoup(html, 'html.parser') |
| |
| |
| for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'form']): |
| element.decompose() |
| |
| |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile('content|main|article|post', re.I)) |
| |
| if main_content: |
| text = main_content.get_text(separator='\n', strip=True) |
| return text[:MAX_CONTENT_LENGTH] if text else None |
| |
| |
| return soup.body.get_text(separator='\n', strip=True)[:MAX_CONTENT_LENGTH] |
| except Exception as e: |
| logging.error(f"BeautifulSoup error: {str(e)}") |
| return None |
|
|
| def generate_search_query(prompt: str) -> dict: |
| system_prompt = """ |
| You are a technical expert. Extract structured data from the user's query and generate an English search query. |
| Return data in strict JSON format with these fields: |
| - brand: English brand name (HP, Canon, Konica Minolta, etc.) |
| - model: equipment model (model name only) |
| - error_code: error code (if present) |
| - problem_description: brief English problem description (1-2 sentences) |
| - search_query: full English search query |
| |
| Important rules: |
| 1. All fields MUST be in English |
| 2. For brands use official English names |
| 3. Remove brand mentions and the word "error" from model name |
| 4. If error code is specified - include it in search_query |
| 5. Problem description should be concise technical terms (max 7 words) |
| """ |
| |
| try: |
| response = mistral_client.chat.complete( |
| model=MISTRAL_MODEL, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": prompt} |
| ], |
| temperature=0.1, |
| max_tokens=350, |
| response_format={"type": "json_object"} |
| ) |
|
|
| json_data = json.loads(response.choices[0].message.content) |
| |
| required_fields = ['brand', 'model', 'error_code', 'problem_description', 'search_query'] |
| for field in required_fields: |
| if field not in json_data: |
| json_data[field] = "" |
| |
| if json_data['brand'] and json_data['model']: |
| json_data['model'] = re.sub( |
| re.escape(json_data['brand']), |
| '', |
| json_data['model'], |
| flags=re.IGNORECASE |
| ).strip() |
| |
| if not json_data['search_query']: |
| search_parts = [json_data['brand'], json_data['model']] |
| if json_data['error_code']: |
| search_parts.append(f"error {json_data['error_code']}") |
| if json_data['problem_description']: |
| search_parts.append(json_data['problem_description']) |
| json_data['search_query'] = " ".join(search_parts).strip() |
| |
| return json_data |
|
|
| except Exception as e: |
| error_msg = f"❌ Ошибка извлечения данных: {str(e)}" |
| message_queue.put(('log', error_msg)) |
| return { |
| 'brand': "", |
| 'model': "", |
| 'error_code': "", |
| 'problem_description': "", |
| 'search_query': prompt |
| } |
|
|
| def gather_sources(search_query: str) -> list: |
| """Поиск источников с несколькими попытками и фильтрацией""" |
| attempts = 0 |
| all_sources = [] |
| valid_sources = [] |
| |
| while attempts < MAX_SEARCH_ATTEMPTS and len(valid_sources) < MIN_VALID_SOURCES: |
| attempts += 1 |
| message_queue.put(('log', f"🔍 Попытка {attempts} поиска по запросу: {search_query}")) |
| |
| try: |
| params = { |
| "api_key": os.getenv("SERPAPI_KEY"), |
| "engine": "google", |
| "q": search_query, |
| "hl": "en", |
| "gl": "us", |
| "num": 20, |
| "safe": "off", |
| } |
|
|
| response = requests.get("https://serpapi.com/search", params=params, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
|
|
| |
| organic_results = data.get("organic_results", []) |
| for res in organic_results: |
| if len(valid_sources) >= MIN_VALID_SOURCES: |
| break |
| |
| title = res.get("title", "Без заголовка") |
| link = res.get("link", "#") |
| snippet = res.get("snippet", "") or "" |
|
|
| |
| if any(domain in link for domain in BLACKLISTED_DOMAINS): |
| continue |
| |
| |
| if any(src['url'] == link for src in all_sources): |
| continue |
|
|
| |
| content = None |
| try: |
| headers = get_random_headers() |
| page_response = requests.get(link, headers=headers, timeout=8) |
| if page_response.status_code == 200: |
| content = extract_main_content(page_response.text, link) |
| except Exception as e: |
| logging.error(f"Ошибка загрузки {link}: {str(e)}") |
| |
| |
| if not content: |
| content = snippet |
| |
| |
| if content and len(content) > 100: |
| source_data = { |
| "title": title, |
| "url": link, |
| "content": content[:MAX_CONTENT_LENGTH] |
| } |
| all_sources.append(source_data) |
| valid_sources.append(source_data) |
| message_queue.put(('log', f"✅ Найден источник: {title}")) |
|
|
| message_queue.put(('log', f"ℹ️ На попытке {attempts} найдено {len(valid_sources)} валидных источников")) |
| |
| |
| if len(valid_sources) < MIN_VALID_SOURCES: |
| search_query += " troubleshooting OR fix OR repair" |
| |
| except Exception as e: |
| error_msg = f"❌ Ошибка поиска (попытка {attempts}): {str(e)}" |
| message_queue.put(('log', error_msg)) |
| |
| return valid_sources[:MAX_RESULTS] |
|
|
| def web_search(query: str) -> list: |
| """Основная функция поиска с несколькими попытками""" |
| start_time = time.time() |
| sources = gather_sources(query) |
| elapsed = time.time() - start_time |
| |
| if sources: |
| message_queue.put(('log', f"✅ Поиск завершен за {elapsed:.2f}с. Найдено {len(sources)} источников.")) |
| else: |
| message_queue.put(('log', f"⚠️ Не удалось найти источники за {elapsed:.2f}с. Ответ будет основан на общих знаниях.")) |
| |
| return sources |
|
|
|
|
| def clean_response(response: str) -> str: |
| |
| response = re.sub(r'</?assistant>|<\|system\|>|</s>', '', response, flags=re.IGNORECASE) |
| |
| |
| response = re.sub(r'^-{3,}\s*', '', response) |
| |
| |
| response = re.sub(r'(\*\*Проблема:\*\*.+?)(\*\*Проблема:\*\*)', r'\1', response, flags=re.DOTALL) |
| response = re.sub(r'(\*\*Решение:\*\*.+?)(\*\*Решение:\*\*)', r'\1', response, flags=re.DOTALL) |
| |
| |
| response = re.sub(r'\n\s*\n', '\n\n', response) |
| response = re.sub(r'[ \t]{2,}', ' ', response) |
| |
| |
| response = re.sub(r'^Вот исправленный ответ[^:]+:\s*', '', response) |
| |
| |
| response = re.sub(r'^---\s*Примечания:\s*', '**Примечания:**\n', response) |
| |
| |
| response = re.sub(r'^---\s*', '', response, flags=re.MULTILINE) |
| |
| |
| response = re.sub(r'\s*\.{3,}\s*$', '', response) |
| |
| return response.strip() |
|
|
| def process_query(prompt: str): |
| try: |
| start_time = time.time() |
| message_queue.put(('log', f"👤 Запрос: {prompt}")) |
| message_queue.put(('log', f"⚙️ Извлекаю параметры из входящего запроса")) |
|
|
| norm_data = generate_search_query(prompt) |
| message_queue.put(('log', f"⏏️ Извлечено: {json.dumps(norm_data, ensure_ascii=False)}")) |
| |
| search_query = norm_data['search_query'] |
| sources = web_search(search_query) |
|
|
| |
| if not sources: |
| message_queue.put(('log', "⚠️ Использую резервные данные для генерации ответа")) |
| sources = [{ |
| "title": "Общие знания о принтерах", |
| "url": "", |
| "content": f"Проблема: {norm_data['problem_description']}. Бренд: {norm_data['brand']}, Модель: {norm_data['model']}" |
| }] |
|
|
| message_queue.put(('log', f"📚 Найдено {len(sources)} источников")) |
|
|
| |
| context_content = "" |
| for i, source in enumerate(sources): |
| context_content += f"[[Источник {i+1}]] {source['title']}\n{source['content']}\n\n" |
| |
| context_content = context_content.strip() |
|
|
| message_queue.put(('log', f"⚙️ Определяю проблему")) |
| problem_response = mistral_client.chat.complete( |
| model=MISTRAL_MODEL, |
| messages=[ |
| {"role": "system", "content": "Опиши СУТЬ проблемы в одном предложении. Только диагноз, без решений. Не более 12 слов. На русском."}, |
| {"role": "user", "content": f"Запрос пользователя: {prompt}\nДанные из источников:\n{context_content}"} |
| ], |
| max_tokens=150, |
| temperature=0.2 |
| ) |
| extracted_problem = problem_response.choices[0].message.content.strip() |
|
|
| if not extracted_problem or len(extracted_problem) < 5: |
| extracted_problem = f"Неисправность {norm_data['brand']} {norm_data['model']}" |
|
|
| message_queue.put(('log', f"🧩 Определённая проблема: {extracted_problem}")) |
|
|
| |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT + f""" |
| Контекст: |
| Бренд: {norm_data['brand']} |
| Модель: {norm_data['model']} |
| Ошибка: {norm_data['error_code']} |
| Суть проблемы (на основе источников): {extracted_problem} |
| Данные из источников: |
| {context_content} |
| """}, |
| {"role": "user", "content": f"Проблема: {prompt}"} |
| ] |
|
|
| message_queue.put(('log', "🧠 Генерирую ответ на основе источников...")) |
| message_queue.put(('response_start', "")) |
|
|
| full_response = "" |
| for chunk in mistral_client.chat.stream( |
| model=MISTRAL_MODEL, |
| messages=messages, |
| max_tokens=2048, |
| temperature=0.3 |
| ): |
| if chunk.data.choices[0].delta.content is not None: |
| chunk_text = chunk.data.choices[0].delta.content |
| full_response += chunk_text |
| message_queue.put(('response_chunk', chunk_text)) |
|
|
| |
| final_response = clean_response(full_response) |
| |
| |
| if sources: |
| final_response += "\n\n**Источники информации:**\n" |
| for i, source in enumerate(sources): |
| final_response += f"- [{source['title']}]({source['url']})\n" |
|
|
| message_queue.put(('response_end', final_response)) |
| message_queue.put(('sources', json.dumps(sources))) |
|
|
| total_time = time.time() - start_time |
| message_queue.put(('log', f"💡 Ответ сгенерирован за {total_time:.1f}с")) |
| message_queue.put(('done', '')) |
|
|
| except Exception as e: |
| error_msg = f"❌ Ошибка: {str(e)}" |
| message_queue.put(('log', error_msg)) |
| message_queue.put(('response', "\n⚠️ Ошибка обработки запроса")) |
| message_queue.put(('done', '')) |
|
|
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
|
|
| @app.route('/ask', methods=['POST']) |
| def ask(): |
| user_input = request.form['message'] |
| thread = threading.Thread(target=process_query, args=(user_input,)) |
| thread.daemon = True |
| thread.start() |
| return jsonify({'status': 'processing'}) |
|
|
|
|
| @app.route('/stream') |
| def stream(): |
| def generate(): |
| while True: |
| if not message_queue.empty(): |
| msg_type, content = message_queue.get() |
| data = json.dumps({"type": msg_type, "content": content}) |
| yield f"data: {data}\n\n" |
| else: |
| time.sleep(0.1) |
| return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=False) |