Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import torch | |
| import os | |
| import shutil | |
| import time | |
| import logging | |
| import gc | |
| import signal | |
| from contextlib import contextmanager | |
| import threading | |
| import time | |
| # Класс для обработки таймаута без использования signal | |
| class TimeoutManager: | |
| def __init__(self, seconds): | |
| self.seconds = seconds | |
| self.timeout_occurred = False | |
| self.timer = None | |
| def start(self): | |
| self.timeout_occurred = False | |
| self.timer = threading.Timer(self.seconds, self._timeout) | |
| self.timer.daemon = True | |
| self.timer.start() | |
| def _timeout(self): | |
| self.timeout_occurred = True | |
| def stop(self): | |
| if self.timer: | |
| self.timer.cancel() | |
| def check_timeout(self): | |
| if self.timeout_occurred: | |
| raise TimeoutException("Timeout occurred") | |
| class TimeoutException(Exception): | |
| pass | |
| # Настройка логирования | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Получаем API токен из переменных окружения | |
| HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
| if HF_TOKEN: | |
| logger.info("API токен найден") | |
| else: | |
| logger.warning("API токен не найден! Для доступа к закрытой модели необходимо добавить HF_TOKEN в секреты репозитория") | |
| # Информация о системе и CUDA | |
| logger.info("===== Запуск приложения =====") | |
| logger.info(f"PyTorch: {torch.__version__}") | |
| # Проверка CUDA и соответствующие логи | |
| cuda_available = torch.cuda.is_available() | |
| logger.info(f"CUDA доступен: {cuda_available}") | |
| if cuda_available: | |
| try: | |
| cuda_device_count = torch.cuda.device_count() | |
| logger.info(f"Количество CUDA устройств: {cuda_device_count}") | |
| for i in range(cuda_device_count): | |
| logger.info(f"CUDA устройство {i}: {torch.cuda.get_device_name(i)}") | |
| logger.info(f"Текущее CUDA устройство: {torch.cuda.current_device()}") | |
| # Проверка доступной памяти | |
| for i in range(cuda_device_count): | |
| free_mem = torch.cuda.get_device_properties(i).total_memory - torch.cuda.memory_allocated(i) | |
| logger.info(f"Устройство {i}: свободно {free_mem / 1024**3:.2f} ГБ из {torch.cuda.get_device_properties(i).total_memory / 1024**3:.2f} ГБ") | |
| except Exception as e: | |
| logger.warning(f"Ошибка при получении информации о CUDA: {e}") | |
| cuda_available = False | |
| else: | |
| logger.info("CUDA недоступен, будет использоваться CPU") | |
| # Используем домашнюю директорию пользователя (она всегда должна быть доступна) | |
| user_home = os.path.expanduser("~") | |
| DISK_DIR = os.path.join(user_home, "app_data") | |
| # Создаем директорию, если она не существует | |
| os.makedirs(DISK_DIR, exist_ok=True) | |
| logger.info(f"Используем директорию для хранения: {DISK_DIR}") | |
| # Настраиваем пути для сохранения моделей | |
| CACHE_DIR = os.path.join(DISK_DIR, "models_cache") | |
| TORCH_HOME = os.path.join(DISK_DIR, "torch_home") | |
| # Создаем директории | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.makedirs(TORCH_HOME, exist_ok=True) | |
| # Устанавливаем переменные окружения для управления кэшированием | |
| os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR | |
| os.environ["HF_HOME"] = CACHE_DIR | |
| os.environ["TORCH_HOME"] = TORCH_HOME | |
| # Функция для проверки свободного места на диске | |
| def check_disk_space(path): | |
| try: | |
| total, used, free = shutil.disk_usage(path) | |
| logger.info(f"Диск {path}: всего {total // (1024**3)} ГБ, свободно {free // (1024**3)} ГБ") | |
| return free | |
| except Exception as e: | |
| logger.warning(f"Не удалось проверить диск {path}: {e}") | |
| return None | |
| # Выводим информацию о диске перед загрузкой | |
| logger.info("Информация о дисках перед загрузкой:") | |
| check_disk_space("/") | |
| check_disk_space(DISK_DIR) | |
| # Настройка модели - выбор в зависимости от доступных ресурсов | |
| if cuda_available: | |
| # Для режима GPU используем более крупную модель (если она есть) | |
| model_name = "KoDer123/Nerealnost_8M" # Ваша основная модель | |
| else: | |
| # Для режима CPU можно выбрать более легкую модель | |
| model_name = "KoDer123/Nerealnost_8M" # Можно заменить на более легкую, если нужно | |
| logger.info(f"Выбрана модель: {model_name}") | |
| # Глобальные переменные для модели | |
| model = None | |
| tokenizer = None | |
| is_model_loaded = False | |
| # Переопределяем EOS_TOKEN для случая, когда токенизатор не загружен | |
| DEFAULT_EOS_TOKEN = "</s>" | |
| # Класс таймаута для ограничения времени генерации | |
| class TimeoutException(Exception): | |
| pass | |
| def time_limit(seconds): | |
| def signal_handler(signum, frame): | |
| raise TimeoutException("Timeout") | |
| signal.signal(signal.SIGALRM, signal_handler) | |
| signal.alarm(seconds) | |
| try: | |
| yield | |
| finally: | |
| signal.alarm(0) | |
| # Функция для очистки памяти | |
| def clear_memory(): | |
| if cuda_available: | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Функция для загрузки модели | |
| def load_model(): | |
| global model, tokenizer, is_model_loaded | |
| try: | |
| # Очищаем память перед загрузкой | |
| clear_memory() | |
| logger.info("Загружаем токенизатор...") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN, | |
| cache_dir=CACHE_DIR, | |
| local_files_only=False | |
| ) | |
| # Устанавливаем pad_token, если его нет | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| logger.info("Загружаем модель...") | |
| # Определяем оптимальный режим загрузки | |
| model_kwargs = { | |
| "cache_dir": CACHE_DIR, | |
| "trust_remote_code": True, | |
| "token": HF_TOKEN | |
| } | |
| # Проверяем доступность CUDA | |
| if cuda_available: | |
| logger.info("Загружаем модель в режиме GPU...") | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float16, | |
| "device_map": "auto", # Автоматически распределить по устройствам | |
| }) | |
| else: | |
| logger.info("Загружаем модель в режиме CPU...") | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float32, | |
| }) | |
| # Загружаем модель | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| **model_kwargs | |
| ) | |
| # Если GPU недоступен, явно переносим модель на CPU | |
| if not cuda_available: | |
| model = model.to("cpu") | |
| device_info = next(model.parameters()).device | |
| logger.info(f"Модель успешно загружена на устройство: {device_info}") | |
| is_model_loaded = True | |
| return f"Модель успешно загружена на {device_info}" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Ошибка загрузки модели: {error_msg}") | |
| is_model_loaded = False | |
| return f"Ошибка загрузки модели: {error_msg}" | |
| # Загружаем модель | |
| start_time = time.time() | |
| load_result = load_model() | |
| end_time = time.time() | |
| logger.info(f"Загрузка модели заняла {end_time - start_time:.2f} секунд. Результат: {load_result}") | |
| # Выводим информацию о диске после загрузки | |
| logger.info("Информация о дисках после загрузки:") | |
| check_disk_space("/") | |
| check_disk_space(DISK_DIR) | |
| # Определяем шаблон Q&A, как при обучении | |
| qa_prompt = "<s>Пользователь: {}\nАссистент: {}" | |
| EOS_TOKEN = DEFAULT_EOS_TOKEN | |
| if tokenizer is not None and hasattr(tokenizer, 'eos_token') and tokenizer.eos_token: | |
| EOS_TOKEN = tokenizer.eos_token | |
| def respond( | |
| message, | |
| history: list[tuple[str, str]], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| generation_timeout, | |
| ): | |
| global model, tokenizer, is_model_loaded | |
| # Проверяем, загружена ли модель | |
| if not is_model_loaded or model is None or tokenizer is None: | |
| if not HF_TOKEN: | |
| return "Модель не загружена. Для доступа к закрытой модели требуется добавить HF_TOKEN в секреты репозитория." | |
| else: | |
| return "Модель не загружена или произошла ошибка при загрузке. Проверьте логи для получения дополнительной информации." | |
| # Очищаем память перед генерацией | |
| clear_memory() | |
| # Замеряем время | |
| start_time = time.time() | |
| # Формируем историю в текстовом формате | |
| full_prompt = "" | |
| if system_message: | |
| full_prompt += qa_prompt.format(system_message, "") + "\n" | |
| for user_msg, assistant_msg in history: | |
| if user_msg and assistant_msg: | |
| full_prompt += qa_prompt.format(user_msg, assistant_msg) + EOS_TOKEN + "\n" | |
| full_prompt += qa_prompt.format(message, "") | |
| logger.info(f"Генерируем ответ на запрос: '{message[:50]}...' (длина промпта: {len(full_prompt)})") | |
| try: | |
| # Настраиваем таймаут | |
| timeout_mgr = TimeoutManager(generation_timeout) | |
| timeout_mgr.start() | |
| # Токенизация входных данных | |
| inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device) | |
| # Генерация ответа с периодической проверкой таймаута | |
| gen_kwargs = { | |
| "input_ids": inputs.input_ids, | |
| "max_new_tokens": max_tokens, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "do_sample": True, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| } | |
| outputs = model.generate(**gen_kwargs) | |
| # Останавливаем таймаут | |
| timeout_mgr.stop() | |
| # Декодирование полного вывода | |
| generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Извлекаем только часть после "Ассистент: " | |
| response_start = generated_text.rfind("Ассистент: ") + len("Ассистент: ") | |
| if response_start >= len("Ассистент: "): # Проверяем, что "Ассистент: " найден | |
| response = generated_text[response_start:].strip() | |
| else: | |
| # Если не найдено, возвращаем весь текст | |
| response = generated_text.strip() | |
| end_time = time.time() | |
| generation_time = end_time - start_time | |
| logger.info(f"Генерация заняла {generation_time:.2f} секунд. Получен ответ длиной {len(response)} символов") | |
| return response | |
| except TimeoutException: | |
| logger.warning(f"Генерация превысила лимит времени ({generation_timeout} секунд)") | |
| return f"Генерация ответа превысила лимит времени ({generation_timeout} секунд). Попробуйте уменьшить количество токенов или задать более простой вопрос." | |
| except Exception as e: | |
| logger.error(f"Ошибка при генерации ответа: {str(e)}") | |
| return f"Произошла ошибка при генерации ответа: {str(e)}" | |
| finally: | |
| # Гарантируем остановку таймера | |
| if 'timeout_mgr' in locals(): | |
| timeout_mgr.stop() | |
| # Настройка интерфейса Gradio | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# НереальностьQA - Чат с экспертом по эзотерике") | |
| if not HF_TOKEN: | |
| gr.Markdown(""" | |
| ## ⚠️ Внимание: API токен не найден! | |
| Для работы с закрытой моделью необходимо добавить HF_TOKEN в секреты репозитория: | |
| 1. Settings > Repository secrets > New secret | |
| 2. Name: HF_TOKEN | |
| 3. Value: ваш токен доступа с huggingface.co/settings/tokens | |
| """, elem_id="warning-box") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot(label="Диалог") # Удаляем type="messages" | |
| user_input = gr.Textbox( | |
| placeholder="Введите ваш вопрос здесь...", | |
| label="Ваш вопрос", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Отправить", variant="primary") | |
| clear_btn = gr.Button("Очистить историю") | |
| with gr.Column(scale=1): | |
| with gr.Accordion("Настройки генерации", open=False): | |
| system_msg = gr.Textbox( | |
| value="Ты - эксперт по эзотерике, специализирующийся на энергетике человека, мире отшедших душ и метафизических знаниях. Отвечай подробно, опираясь на свои знания.", | |
| label="Системное сообщение", | |
| lines=4 | |
| ) | |
| max_tokens = gr.Slider( | |
| minimum=1, | |
| maximum=1024, | |
| value=64 if not cuda_available else 256, # Меньше токенов для CPU | |
| step=1, | |
| label="Максимальное число токенов" | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.2, | |
| value=0.7, | |
| step=0.1, | |
| label="Температура" | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.9, | |
| step=0.05, | |
| label="Top-p" | |
| ) | |
| generation_timeout = gr.Slider( | |
| minimum=10, | |
| maximum=300, | |
| value=60 if cuda_available else 120, # Больше времени для CPU | |
| step=10, | |
| label="Таймаут генерации (секунды)" | |
| ) | |
| with gr.Accordion("Информация о системе", open=True): | |
| info_text = gr.Markdown(f""" | |
| * **Модель**: {model_name} | |
| * **Режим работы**: {"GPU" if cuda_available else "CPU"} | |
| * **Директория для кэша**: {CACHE_DIR} | |
| * **Статус загрузки**: {"Успешно" if is_model_loaded else "Ошибка"} | |
| * **API токен**: {"Настроен" if HF_TOKEN else "Отсутствует"} | |
| """) | |
| # Примеры вопросов | |
| with gr.Accordion("Примеры вопросов", open=True): | |
| examples = gr.Examples( | |
| examples=[ | |
| "Что известно о мире отшедших душ и их взаимодействии с нашим миром?", | |
| "Что такое энергетическая ось человека и как она связана с его биополем?", | |
| "Расскажи о роли энергии мысли и желания в мире отшедших." | |
| ], | |
| inputs=user_input | |
| ) | |
| # Функция обработки отправки сообщения | |
| def chat(message, history): | |
| if message == "": | |
| return history, "" | |
| # Генерируем ответ | |
| bot_message = respond( | |
| message, | |
| history, | |
| system_msg.value, | |
| max_tokens.value, | |
| temperature.value, | |
| top_p.value, | |
| generation_timeout.value | |
| ) | |
| # Добавляем в историю и возвращаем | |
| history = history + [(message, bot_message)] | |
| return history, "" | |
| # Обработчики событий | |
| submit_btn.click( | |
| chat, | |
| inputs=[user_input, chatbot], | |
| outputs=[chatbot, user_input] | |
| ) | |
| user_input.submit( | |
| chat, | |
| inputs=[user_input, chatbot], | |
| outputs=[chatbot, user_input] | |
| ) | |
| clear_btn.click( | |
| lambda: ([], ""), | |
| outputs=[chatbot, user_input] | |
| ) | |
| # Запуск приложения | |
| if __name__ == "__main__": | |
| demo.launch() |