Nerealnost / app.py
KoDer123's picture
Update app.py
5ee1df2 verified
raw
history blame
18.5 kB
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import os
import shutil
import time
import logging
import gc
import signal
from contextlib import contextmanager
import threading
import time
# Класс для обработки таймаута без использования signal
class TimeoutManager:
def __init__(self, seconds):
self.seconds = seconds
self.timeout_occurred = False
self.timer = None
def start(self):
self.timeout_occurred = False
self.timer = threading.Timer(self.seconds, self._timeout)
self.timer.daemon = True
self.timer.start()
def _timeout(self):
self.timeout_occurred = True
def stop(self):
if self.timer:
self.timer.cancel()
def check_timeout(self):
if self.timeout_occurred:
raise TimeoutException("Timeout occurred")
class TimeoutException(Exception):
pass
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Получаем API токен из переменных окружения
HF_TOKEN = os.environ.get("HF_TOKEN", None)
if HF_TOKEN:
logger.info("API токен найден")
else:
logger.warning("API токен не найден! Для доступа к закрытой модели необходимо добавить HF_TOKEN в секреты репозитория")
# Информация о системе и CUDA
logger.info("===== Запуск приложения =====")
logger.info(f"PyTorch: {torch.__version__}")
# Проверка CUDA и соответствующие логи
cuda_available = torch.cuda.is_available()
logger.info(f"CUDA доступен: {cuda_available}")
if cuda_available:
try:
cuda_device_count = torch.cuda.device_count()
logger.info(f"Количество CUDA устройств: {cuda_device_count}")
for i in range(cuda_device_count):
logger.info(f"CUDA устройство {i}: {torch.cuda.get_device_name(i)}")
logger.info(f"Текущее CUDA устройство: {torch.cuda.current_device()}")
# Проверка доступной памяти
for i in range(cuda_device_count):
free_mem = torch.cuda.get_device_properties(i).total_memory - torch.cuda.memory_allocated(i)
logger.info(f"Устройство {i}: свободно {free_mem / 1024**3:.2f} ГБ из {torch.cuda.get_device_properties(i).total_memory / 1024**3:.2f} ГБ")
except Exception as e:
logger.warning(f"Ошибка при получении информации о CUDA: {e}")
cuda_available = False
else:
logger.info("CUDA недоступен, будет использоваться CPU")
# Используем домашнюю директорию пользователя (она всегда должна быть доступна)
user_home = os.path.expanduser("~")
DISK_DIR = os.path.join(user_home, "app_data")
# Создаем директорию, если она не существует
os.makedirs(DISK_DIR, exist_ok=True)
logger.info(f"Используем директорию для хранения: {DISK_DIR}")
# Настраиваем пути для сохранения моделей
CACHE_DIR = os.path.join(DISK_DIR, "models_cache")
TORCH_HOME = os.path.join(DISK_DIR, "torch_home")
# Создаем директории
os.makedirs(CACHE_DIR, exist_ok=True)
os.makedirs(TORCH_HOME, exist_ok=True)
# Устанавливаем переменные окружения для управления кэшированием
os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR
os.environ["HF_HOME"] = CACHE_DIR
os.environ["TORCH_HOME"] = TORCH_HOME
# Функция для проверки свободного места на диске
def check_disk_space(path):
try:
total, used, free = shutil.disk_usage(path)
logger.info(f"Диск {path}: всего {total // (1024**3)} ГБ, свободно {free // (1024**3)} ГБ")
return free
except Exception as e:
logger.warning(f"Не удалось проверить диск {path}: {e}")
return None
# Выводим информацию о диске перед загрузкой
logger.info("Информация о дисках перед загрузкой:")
check_disk_space("/")
check_disk_space(DISK_DIR)
# Настройка модели - выбор в зависимости от доступных ресурсов
if cuda_available:
# Для режима GPU используем более крупную модель (если она есть)
model_name = "KoDer123/Nerealnost_8M" # Ваша основная модель
else:
# Для режима CPU можно выбрать более легкую модель
model_name = "KoDer123/Nerealnost_8M" # Можно заменить на более легкую, если нужно
logger.info(f"Выбрана модель: {model_name}")
# Глобальные переменные для модели
model = None
tokenizer = None
is_model_loaded = False
# Переопределяем EOS_TOKEN для случая, когда токенизатор не загружен
DEFAULT_EOS_TOKEN = "</s>"
# Класс таймаута для ограничения времени генерации
class TimeoutException(Exception):
pass
@contextmanager
def time_limit(seconds):
def signal_handler(signum, frame):
raise TimeoutException("Timeout")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
# Функция для очистки памяти
def clear_memory():
if cuda_available:
torch.cuda.empty_cache()
gc.collect()
# Функция для загрузки модели
def load_model():
global model, tokenizer, is_model_loaded
try:
# Очищаем память перед загрузкой
clear_memory()
logger.info("Загружаем токенизатор...")
tokenizer = AutoTokenizer.from_pretrained(
model_name,
token=HF_TOKEN,
cache_dir=CACHE_DIR,
local_files_only=False
)
# Устанавливаем pad_token, если его нет
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
logger.info("Загружаем модель...")
# Определяем оптимальный режим загрузки
model_kwargs = {
"cache_dir": CACHE_DIR,
"trust_remote_code": True,
"token": HF_TOKEN
}
# Проверяем доступность CUDA
if cuda_available:
logger.info("Загружаем модель в режиме GPU...")
model_kwargs.update({
"torch_dtype": torch.float16,
"device_map": "auto", # Автоматически распределить по устройствам
})
else:
logger.info("Загружаем модель в режиме CPU...")
model_kwargs.update({
"torch_dtype": torch.float32,
})
# Загружаем модель
model = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
# Если GPU недоступен, явно переносим модель на CPU
if not cuda_available:
model = model.to("cpu")
device_info = next(model.parameters()).device
logger.info(f"Модель успешно загружена на устройство: {device_info}")
is_model_loaded = True
return f"Модель успешно загружена на {device_info}"
except Exception as e:
error_msg = str(e)
logger.error(f"Ошибка загрузки модели: {error_msg}")
is_model_loaded = False
return f"Ошибка загрузки модели: {error_msg}"
# Загружаем модель
start_time = time.time()
load_result = load_model()
end_time = time.time()
logger.info(f"Загрузка модели заняла {end_time - start_time:.2f} секунд. Результат: {load_result}")
# Выводим информацию о диске после загрузки
logger.info("Информация о дисках после загрузки:")
check_disk_space("/")
check_disk_space(DISK_DIR)
# Определяем шаблон Q&A, как при обучении
qa_prompt = "<s>Пользователь: {}\nАссистент: {}"
EOS_TOKEN = DEFAULT_EOS_TOKEN
if tokenizer is not None and hasattr(tokenizer, 'eos_token') and tokenizer.eos_token:
EOS_TOKEN = tokenizer.eos_token
def respond(
message,
history: list[tuple[str, str]],
system_message,
max_tokens,
temperature,
top_p,
generation_timeout,
):
global model, tokenizer, is_model_loaded
# Проверяем, загружена ли модель
if not is_model_loaded or model is None or tokenizer is None:
if not HF_TOKEN:
return "Модель не загружена. Для доступа к закрытой модели требуется добавить HF_TOKEN в секреты репозитория."
else:
return "Модель не загружена или произошла ошибка при загрузке. Проверьте логи для получения дополнительной информации."
# Очищаем память перед генерацией
clear_memory()
# Замеряем время
start_time = time.time()
# Формируем историю в текстовом формате
full_prompt = ""
if system_message:
full_prompt += qa_prompt.format(system_message, "") + "\n"
for user_msg, assistant_msg in history:
if user_msg and assistant_msg:
full_prompt += qa_prompt.format(user_msg, assistant_msg) + EOS_TOKEN + "\n"
full_prompt += qa_prompt.format(message, "")
logger.info(f"Генерируем ответ на запрос: '{message[:50]}...' (длина промпта: {len(full_prompt)})")
try:
# Настраиваем таймаут
timeout_mgr = TimeoutManager(generation_timeout)
timeout_mgr.start()
# Токенизация входных данных
inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device)
# Генерация ответа с периодической проверкой таймаута
gen_kwargs = {
"input_ids": inputs.input_ids,
"max_new_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"do_sample": True,
"pad_token_id": tokenizer.pad_token_id,
}
outputs = model.generate(**gen_kwargs)
# Останавливаем таймаут
timeout_mgr.stop()
# Декодирование полного вывода
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Извлекаем только часть после "Ассистент: "
response_start = generated_text.rfind("Ассистент: ") + len("Ассистент: ")
if response_start >= len("Ассистент: "): # Проверяем, что "Ассистент: " найден
response = generated_text[response_start:].strip()
else:
# Если не найдено, возвращаем весь текст
response = generated_text.strip()
end_time = time.time()
generation_time = end_time - start_time
logger.info(f"Генерация заняла {generation_time:.2f} секунд. Получен ответ длиной {len(response)} символов")
return response
except TimeoutException:
logger.warning(f"Генерация превысила лимит времени ({generation_timeout} секунд)")
return f"Генерация ответа превысила лимит времени ({generation_timeout} секунд). Попробуйте уменьшить количество токенов или задать более простой вопрос."
except Exception as e:
logger.error(f"Ошибка при генерации ответа: {str(e)}")
return f"Произошла ошибка при генерации ответа: {str(e)}"
finally:
# Гарантируем остановку таймера
if 'timeout_mgr' in locals():
timeout_mgr.stop()
# Настройка интерфейса Gradio
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# НереальностьQA - Чат с экспертом по эзотерике")
if not HF_TOKEN:
gr.Markdown("""
## ⚠️ Внимание: API токен не найден!
Для работы с закрытой моделью необходимо добавить HF_TOKEN в секреты репозитория:
1. Settings > Repository secrets > New secret
2. Name: HF_TOKEN
3. Value: ваш токен доступа с huggingface.co/settings/tokens
""", elem_id="warning-box")
with gr.Row():
with gr.Column(scale=4):
chatbot = gr.Chatbot(label="Диалог") # Удаляем type="messages"
user_input = gr.Textbox(
placeholder="Введите ваш вопрос здесь...",
label="Ваш вопрос",
lines=2
)
with gr.Row():
submit_btn = gr.Button("Отправить", variant="primary")
clear_btn = gr.Button("Очистить историю")
with gr.Column(scale=1):
with gr.Accordion("Настройки генерации", open=False):
system_msg = gr.Textbox(
value="Ты - эксперт по эзотерике, специализирующийся на энергетике человека, мире отшедших душ и метафизических знаниях. Отвечай подробно, опираясь на свои знания.",
label="Системное сообщение",
lines=4
)
max_tokens = gr.Slider(
minimum=1,
maximum=1024,
value=64 if not cuda_available else 256, # Меньше токенов для CPU
step=1,
label="Максимальное число токенов"
)
temperature = gr.Slider(
minimum=0.1,
maximum=1.2,
value=0.7,
step=0.1,
label="Температура"
)
top_p = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.9,
step=0.05,
label="Top-p"
)
generation_timeout = gr.Slider(
minimum=10,
maximum=300,
value=60 if cuda_available else 120, # Больше времени для CPU
step=10,
label="Таймаут генерации (секунды)"
)
with gr.Accordion("Информация о системе", open=True):
info_text = gr.Markdown(f"""
* **Модель**: {model_name}
* **Режим работы**: {"GPU" if cuda_available else "CPU"}
* **Директория для кэша**: {CACHE_DIR}
* **Статус загрузки**: {"Успешно" if is_model_loaded else "Ошибка"}
* **API токен**: {"Настроен" if HF_TOKEN else "Отсутствует"}
""")
# Примеры вопросов
with gr.Accordion("Примеры вопросов", open=True):
examples = gr.Examples(
examples=[
"Что известно о мире отшедших душ и их взаимодействии с нашим миром?",
"Что такое энергетическая ось человека и как она связана с его биополем?",
"Расскажи о роли энергии мысли и желания в мире отшедших."
],
inputs=user_input
)
# Функция обработки отправки сообщения
def chat(message, history):
if message == "":
return history, ""
# Генерируем ответ
bot_message = respond(
message,
history,
system_msg.value,
max_tokens.value,
temperature.value,
top_p.value,
generation_timeout.value
)
# Добавляем в историю и возвращаем
history = history + [(message, bot_message)]
return history, ""
# Обработчики событий
submit_btn.click(
chat,
inputs=[user_input, chatbot],
outputs=[chatbot, user_input]
)
user_input.submit(
chat,
inputs=[user_input, chatbot],
outputs=[chatbot, user_input]
)
clear_btn.click(
lambda: ([], ""),
outputs=[chatbot, user_input]
)
# Запуск приложения
if __name__ == "__main__":
demo.launch()