PrintAI / app.py
KennyOry's picture
Update app.py
68d3265 verified
from flask import Flask, render_template, request, Response, jsonify
from mistralai import Mistral
import logging
import time
import requests
import re
import threading
import queue
import json
import os
import trafilatura
from bs4 import BeautifulSoup
import random
app = Flask(__name__)
app.secret_key = 'super_secret_key'
message_queue = queue.Queue()
# Конфигурация Mistral
MISTRAL_MODEL = "mistral-large-latest"
N_CTX = 32768
MAX_RESULTS = 10
MIN_VALID_SOURCES = 3
MAX_SEARCH_ATTEMPTS = 3
MAX_CONTENT_LENGTH = 40000 # Максимальная длина контента на источник
# Новый клиент Mistral
mistral_client = Mistral(api_key=os.getenv("MISTRAL_API_KEY"))
SYSTEM_PROMPT = """
Ты PrintMaster, сервисный инженер по печатной технике. Критически важные правила:
1. Формат ответа СТРОГО:
**Проблема:** [только краткое описание проблемы]
**Решение:** [пошаговые действия]
2. Для шагов решения используй ТОЛЬКО формат:
[Цифра]. [Действие]
- Подпункт 1
- Подпункт 2
3. Примечания ТОЛЬКО если есть:
**Примечания:**
- Пункт 1
- Пункт 2
ЖЕСТКИЕ ЗАПРЕТЫ:
- Никогда не используй подзаголовки с ###
- Никогда не добавляй разделы "Удалены шаги" или подобные
- Начинай сразу с **Проблема:** без преамбул
- Всегда основывай решение ТОЛЬКО на предоставленных источниках
"""
BLACKLISTED_DOMAINS = [
'reddit.com',
'stackoverflow.com',
'quora.com',
'facebook.com',
'youtube.com',
'x.com',
'twitter.com',
'tiktok.com',
'instagram.com'
]
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15"
]
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("/tmp/printer_assistant.log"),
logging.StreamHandler()
]
)
def get_random_headers():
return {
'User-Agent': random.choice(USER_AGENTS),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
def extract_main_content(html, url):
"""Извлекает основной контент страницы с помощью trafilatura или BeautifulSoup"""
try:
# Пробуем trafilatura
content = trafilatura.extract(html, include_links=False, include_tables=False)
if content and len(content) > 500:
return content[:MAX_CONTENT_LENGTH]
except Exception as e:
logging.error(f"Trafilatura error: {str(e)}")
# Fallback на BeautifulSoup
try:
soup = BeautifulSoup(html, 'html.parser')
# Удаляем ненужные элементы
for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'form']):
element.decompose()
# Пытаемся найти основной контент
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile('content|main|article|post', re.I))
if main_content:
text = main_content.get_text(separator='\n', strip=True)
return text[:MAX_CONTENT_LENGTH] if text else None
# Fallback: весь текст body
return soup.body.get_text(separator='\n', strip=True)[:MAX_CONTENT_LENGTH]
except Exception as e:
logging.error(f"BeautifulSoup error: {str(e)}")
return None
def generate_search_query(prompt: str) -> dict:
system_prompt = """
You are a technical expert. Extract structured data from the user's query and generate an English search query.
Return data in strict JSON format with these fields:
- brand: English brand name (HP, Canon, Konica Minolta, etc.)
- model: equipment model (model name only)
- error_code: error code (if present)
- problem_description: brief English problem description (1-2 sentences)
- search_query: full English search query
Important rules:
1. All fields MUST be in English
2. For brands use official English names
3. Remove brand mentions and the word "error" from model name
4. If error code is specified - include it in search_query
5. Problem description should be concise technical terms (max 7 words)
"""
try:
response = mistral_client.chat.complete(
model=MISTRAL_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=350,
response_format={"type": "json_object"}
)
json_data = json.loads(response.choices[0].message.content)
required_fields = ['brand', 'model', 'error_code', 'problem_description', 'search_query']
for field in required_fields:
if field not in json_data:
json_data[field] = ""
if json_data['brand'] and json_data['model']:
json_data['model'] = re.sub(
re.escape(json_data['brand']),
'',
json_data['model'],
flags=re.IGNORECASE
).strip()
if not json_data['search_query']:
search_parts = [json_data['brand'], json_data['model']]
if json_data['error_code']:
search_parts.append(f"error {json_data['error_code']}")
if json_data['problem_description']:
search_parts.append(json_data['problem_description'])
json_data['search_query'] = " ".join(search_parts).strip()
return json_data
except Exception as e:
error_msg = f"❌ Ошибка извлечения данных: {str(e)}"
message_queue.put(('log', error_msg))
return {
'brand': "",
'model': "",
'error_code': "",
'problem_description': "",
'search_query': prompt
}
def gather_sources(search_query: str) -> list:
"""Поиск источников с несколькими попытками и фильтрацией"""
attempts = 0
all_sources = []
valid_sources = []
while attempts < MAX_SEARCH_ATTEMPTS and len(valid_sources) < MIN_VALID_SOURCES:
attempts += 1
message_queue.put(('log', f"🔍 Попытка {attempts} поиска по запросу: {search_query}"))
try:
params = {
"api_key": os.getenv("SERPAPI_KEY"),
"engine": "google",
"q": search_query,
"hl": "en",
"gl": "us",
"num": 20, # Запрашиваем больше результатов
"safe": "off",
}
response = requests.get("https://serpapi.com/search", params=params, timeout=15)
response.raise_for_status()
data = response.json()
# Обработка organic results
organic_results = data.get("organic_results", [])
for res in organic_results:
if len(valid_sources) >= MIN_VALID_SOURCES:
break
title = res.get("title", "Без заголовка")
link = res.get("link", "#")
snippet = res.get("snippet", "") or ""
# Пропускаем нежелательные домены
if any(domain in link for domain in BLACKLISTED_DOMAINS):
continue
# Пропускаем дубликаты
if any(src['url'] == link for src in all_sources):
continue
# Загрузка полного контента
content = None
try:
headers = get_random_headers()
page_response = requests.get(link, headers=headers, timeout=8)
if page_response.status_code == 200:
content = extract_main_content(page_response.text, link)
except Exception as e:
logging.error(f"Ошибка загрузки {link}: {str(e)}")
# Если контент не получен, используем сниппет
if not content:
content = snippet
# Проверяем релевантность контента
if content and len(content) > 100:
source_data = {
"title": title,
"url": link,
"content": content[:MAX_CONTENT_LENGTH]
}
all_sources.append(source_data)
valid_sources.append(source_data)
message_queue.put(('log', f"✅ Найден источник: {title}"))
message_queue.put(('log', f"ℹ️ На попытке {attempts} найдено {len(valid_sources)} валидных источников"))
# Изменяем запрос для следующей попытки
if len(valid_sources) < MIN_VALID_SOURCES:
search_query += " troubleshooting OR fix OR repair"
except Exception as e:
error_msg = f"❌ Ошибка поиска (попытка {attempts}): {str(e)}"
message_queue.put(('log', error_msg))
return valid_sources[:MAX_RESULTS] # Возвращаем не более MAX_RESULTS источников
def web_search(query: str) -> list:
"""Основная функция поиска с несколькими попытками"""
start_time = time.time()
sources = gather_sources(query)
elapsed = time.time() - start_time
if sources:
message_queue.put(('log', f"✅ Поиск завершен за {elapsed:.2f}с. Найдено {len(sources)} источников."))
else:
message_queue.put(('log', f"⚠️ Не удалось найти источники за {elapsed:.2f}с. Ответ будет основан на общих знаниях."))
return sources
def clean_response(response: str) -> str:
# Удаление служебных тегов
response = re.sub(r'</?assistant>|<\|system\|>|</s>', '', response, flags=re.IGNORECASE)
# Удаление лишних разделителей
response = re.sub(r'^-{3,}\s*', '', response)
# Удаление дублирования разделов
response = re.sub(r'(\*\*Проблема:\*\*.+?)(\*\*Проблема:\*\*)', r'\1', response, flags=re.DOTALL)
response = re.sub(r'(\*\*Решение:\*\*.+?)(\*\*Решение:\*\*)', r'\1', response, flags=re.DOTALL)
# Удаление лишних переносов
response = re.sub(r'\n\s*\n', '\n\n', response)
response = re.sub(r'[ \t]{2,}', ' ', response)
# Удаление начальных фраз
response = re.sub(r'^Вот исправленный ответ[^:]+:\s*', '', response)
# Форматирование примечаний
response = re.sub(r'^---\s*Примечания:\s*', '**Примечания:**\n', response)
# Удаление лишних маркеров
response = re.sub(r'^---\s*', '', response, flags=re.MULTILINE)
# Очистка завершающих символов
response = re.sub(r'\s*\.{3,}\s*$', '', response)
return response.strip()
def process_query(prompt: str):
try:
start_time = time.time()
message_queue.put(('log', f"👤 Запрос: {prompt}"))
message_queue.put(('log', f"⚙️ Извлекаю параметры из входящего запроса"))
norm_data = generate_search_query(prompt)
message_queue.put(('log', f"⏏️ Извлечено: {json.dumps(norm_data, ensure_ascii=False)}"))
search_query = norm_data['search_query']
sources = web_search(search_query)
# Если источников нет, используем fallback
if not sources:
message_queue.put(('log', "⚠️ Использую резервные данные для генерации ответа"))
sources = [{
"title": "Общие знания о принтерах",
"url": "",
"content": f"Проблема: {norm_data['problem_description']}. Бренд: {norm_data['brand']}, Модель: {norm_data['model']}"
}]
message_queue.put(('log', f"📚 Найдено {len(sources)} источников"))
# Формируем контекст для LLM
context_content = ""
for i, source in enumerate(sources):
context_content += f"[[Источник {i+1}]] {source['title']}\n{source['content']}\n\n"
context_content = context_content.strip()
message_queue.put(('log', f"⚙️ Определяю проблему"))
problem_response = mistral_client.chat.complete(
model=MISTRAL_MODEL,
messages=[
{"role": "system", "content": "Опиши СУТЬ проблемы в одном предложении. Только диагноз, без решений. Не более 12 слов. На русском."},
{"role": "user", "content": f"Запрос пользователя: {prompt}\nДанные из источников:\n{context_content}"}
],
max_tokens=150,
temperature=0.2
)
extracted_problem = problem_response.choices[0].message.content.strip()
if not extracted_problem or len(extracted_problem) < 5:
extracted_problem = f"Неисправность {norm_data['brand']} {norm_data['model']}"
message_queue.put(('log', f"🧩 Определённая проблема: {extracted_problem}"))
# Формируем промпт с источниками
messages = [
{"role": "system", "content": SYSTEM_PROMPT + f"""
Контекст:
Бренд: {norm_data['brand']}
Модель: {norm_data['model']}
Ошибка: {norm_data['error_code']}
Суть проблемы (на основе источников): {extracted_problem}
Данные из источников:
{context_content}
"""},
{"role": "user", "content": f"Проблема: {prompt}"}
]
message_queue.put(('log', "🧠 Генерирую ответ на основе источников..."))
message_queue.put(('response_start', ""))
full_response = ""
for chunk in mistral_client.chat.stream(
model=MISTRAL_MODEL,
messages=messages,
max_tokens=2048,
temperature=0.3
):
if chunk.data.choices[0].delta.content is not None:
chunk_text = chunk.data.choices[0].delta.content
full_response += chunk_text
message_queue.put(('response_chunk', chunk_text))
# Очистка и форматирование ответа
final_response = clean_response(full_response)
# Добавляем источники в ответ
if sources:
final_response += "\n\n**Источники информации:**\n"
for i, source in enumerate(sources):
final_response += f"- [{source['title']}]({source['url']})\n"
message_queue.put(('response_end', final_response))
message_queue.put(('sources', json.dumps(sources)))
total_time = time.time() - start_time
message_queue.put(('log', f"💡 Ответ сгенерирован за {total_time:.1f}с"))
message_queue.put(('done', ''))
except Exception as e:
error_msg = f"❌ Ошибка: {str(e)}"
message_queue.put(('log', error_msg))
message_queue.put(('response', "\n⚠️ Ошибка обработки запроса"))
message_queue.put(('done', ''))
@app.route('/')
def index():
return render_template('index.html')
@app.route('/ask', methods=['POST'])
def ask():
user_input = request.form['message']
thread = threading.Thread(target=process_query, args=(user_input,))
thread.daemon = True
thread.start()
return jsonify({'status': 'processing'})
@app.route('/stream')
def stream():
def generate():
while True:
if not message_queue.empty():
msg_type, content = message_queue.get()
data = json.dumps({"type": msg_type, "content": content})
yield f"data: {data}\n\n"
else:
time.sleep(0.1)
return Response(generate(), mimetype='text/event-stream')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)