Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| import os | |
| import shutil | |
| import time | |
| import logging | |
| import gc | |
| import threading | |
| import json | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig | |
| # Настройка логирования | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Проверка наличия ZERO GPU | |
| ZERO_GPU_ENABLED = os.environ.get("HF_ZERO_GPU", "0") == "1" | |
| logger.info(f"Zero GPU активирован: {ZERO_GPU_ENABLED}") | |
| # Получаем API токен из переменных окружения | |
| HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
| logger.info("API токен найден" if HF_TOKEN else "API токен не найден! Добавьте HF_TOKEN в секреты репозитория") | |
| # Информация о системе и CUDA | |
| logger.info("===== Запуск приложения =====") | |
| logger.info(f"PyTorch: {torch.__version__}") | |
| # Проверка CUDA | |
| cuda_available = torch.cuda.is_available() | |
| logger.info(f"CUDA доступен: {cuda_available}") | |
| if cuda_available: | |
| logger.info(f"Количество CUDA устройств: {torch.cuda.device_count()}") | |
| for i in range(torch.cuda.device_count()): | |
| logger.info(f"CUDA устройство {i}: {torch.cuda.get_device_name(i)}") | |
| free_mem = torch.cuda.get_device_properties(i).total_memory - torch.cuda.memory_allocated(i) | |
| logger.info(f"Устройство {i}: свободно {free_mem / 1024**3:.2f} ГБ") | |
| else: | |
| logger.info("CUDA недоступен, используется CPU") | |
| # Настройка директорий | |
| user_home = os.path.expanduser("~") | |
| DISK_DIR = os.path.join(user_home, "app_data") | |
| CACHE_DIR = os.path.join(DISK_DIR, "models_cache") | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR | |
| os.environ["HF_HOME"] = CACHE_DIR | |
| logger.info(f"Используем директорию для кэша: {CACHE_DIR}") | |
| # Модель | |
| model_name = "unsloth/Phi-3.5-mini-instruct" | |
| logger.info(f"Выбрана модель: {model_name}") | |
| # Глобальные переменные | |
| model = None | |
| tokenizer = None | |
| is_model_loaded = False | |
| DEFAULT_EOS_TOKEN = "</s>" | |
| # Класс для таймаута | |
| class TimeoutManager: | |
| def __init__(self, seconds): | |
| self.seconds = seconds | |
| self.timeout_occurred = False | |
| self.timer = None | |
| def start(self): | |
| self.timeout_occurred = False | |
| self.timer = threading.Timer(self.seconds, self._timeout) | |
| self.timer.daemon = True | |
| self.timer.start() | |
| def _timeout(self): | |
| self.timeout_occurred = True | |
| def stop(self): | |
| if self.timer: | |
| self.timer.cancel() | |
| def check_timeout(self): | |
| if self.timeout_occurred: | |
| raise TimeoutException("Timeout occurred") | |
| class TimeoutException(Exception): | |
| pass | |
| # Очистка памяти | |
| def clear_memory(): | |
| if cuda_available: | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Загрузка модели | |
| def load_model(): | |
| global model, tokenizer, is_model_loaded | |
| try: | |
| clear_memory() | |
| logger.info("Загружаем токенизатор...") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN, | |
| cache_dir=CACHE_DIR, | |
| local_files_only=False, | |
| revision="main" | |
| ) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| logger.info(f"Токенизатор загружен: vocab_size={tokenizer.vocab_size}") | |
| logger.info("Загружаем конфигурацию модели...") | |
| config = AutoConfig.from_pretrained(model_name, token=HF_TOKEN, cache_dir=CACHE_DIR) | |
| logger.info(f"Конфигурация модели: {config}") | |
| logger.info("Загружаем модель...") | |
| model_kwargs = { | |
| "cache_dir": CACHE_DIR, | |
| "trust_remote_code": True, | |
| "token": HF_TOKEN, | |
| "config": config | |
| } | |
| if cuda_available: | |
| logger.info("Загружаем модель в режиме GPU...") | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float16, | |
| "device_map": "auto", | |
| "load_in_4bit": True # Оптимизация от unsloth | |
| }) | |
| else: | |
| logger.info("Загружаем модель в режиме CPU...") | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float32, | |
| "load_in_4bit": False | |
| }) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| **model_kwargs | |
| ) | |
| if not cuda_available: | |
| model = model.to("cpu") | |
| device_info = next(model.parameters()).device | |
| logger.info(f"Модель загружена на устройство: {device_info}") | |
| is_model_loaded = True | |
| return f"Модель загружена на {device_info}" | |
| except Exception as e: | |
| logger.error(f"Ошибка загрузки модели: {str(e)}") | |
| is_model_loaded = False | |
| return f"Ошибка загрузки модели: {str(e)}" | |
| # Загружаем модель при запуске | |
| start_time = time.time() | |
| load_result = load_model() | |
| logger.info(f"Загрузка заняла {time.time() - start_time:.2f} секунд. Результат: {load_result}") | |
| # Шаблон для генерации | |
| EOS_TOKEN = tokenizer.eos_token if tokenizer and tokenizer.eos_token else DEFAULT_EOS_TOKEN | |
| qa_prompt = "<|user|>{}\n<|assistant|> {}" # Формат для Phi-3.5-mini-instruct | |
| # Функция генерации ответа | |
| def respond(message, history, system_message, max_tokens, temperature, top_p, generation_timeout): | |
| global model, tokenizer, is_model_loaded | |
| if not is_model_loaded or model is None or tokenizer is None: | |
| return "Модель не загружена. Проверьте логи или добавьте HF_TOKEN." | |
| clear_memory() | |
| start_time = time.time() | |
| # Форматирование истории | |
| full_prompt = "" | |
| if system_message: | |
| full_prompt += qa_prompt.format(system_message, "") + "\n" | |
| for user_msg, assistant_msg in history: | |
| if user_msg and assistant_msg: | |
| full_prompt += qa_prompt.format(user_msg, assistant_msg) + EOS_TOKEN + "\n" | |
| full_prompt += qa_prompt.format(message, "") | |
| logger.info(f"Генерируем ответ на: '{message[:50]}...'") | |
| try: | |
| timeout_mgr = TimeoutManager(generation_timeout) | |
| timeout_mgr.start() | |
| inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device) | |
| timeout_mgr.check_timeout() | |
| gen_kwargs = { | |
| "input_ids": inputs.input_ids, | |
| "max_new_tokens": max_tokens, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "do_sample": True, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| } | |
| outputs = model.generate(**gen_kwargs) | |
| timeout_mgr.stop() | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| response_start = response.rfind("<|assistant|> ") + len("<|assistant|> ") | |
| response = response[response_start:].strip() if response_start >= len("<|assistant|> ") else response.strip() | |
| logger.info(f"Генерация заняла {time.time() - start_time:.2f} секунд") | |
| return response | |
| except TimeoutException: | |
| return f"Таймаут генерации ({generation_timeout} секунд)." | |
| except Exception as e: | |
| logger.error(f"Ошибка генерации: {str(e)}") | |
| return f"Ошибка: {str(e)}" | |
| finally: | |
| if 'timeout_mgr' in locals(): | |
| timeout_mgr.stop() | |
| # Интерфейс Gradio | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# НереальностьQA - Чат с экспертом по эзотерике") | |
| if not HF_TOKEN: | |
| gr.Markdown("⚠️ Добавьте HF_TOKEN в секреты репозитория!") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot(label="Диалог") | |
| user_input = gr.Textbox(placeholder="Введите вопрос...", label="Ваш вопрос", lines=2) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Отправить", variant="primary") | |
| clear_btn = gr.Button("Очистить") | |
| with gr.Column(scale=1): | |
| with gr.Accordion("Настройки", open=False): | |
| system_msg = gr.Textbox( | |
| value="Твоя задача — дать точный ответ на вопрос пользователя.", | |
| label="Системное сообщение", | |
| lines=4 | |
| ) | |
| max_tokens = gr.Slider(1, 1024, value=256, step=1, label="Макс. токенов") | |
| temperature = gr.Slider(0.1, 1.2, value=0.7, step=0.1, label="Температура") | |
| top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top-p") | |
| generation_timeout = gr.Slider(10, 300, value=60, step=10, label="Таймаут (с)") | |
| with gr.Accordion("Информация", open=True): | |
| system_info = { | |
| "Модель": model_name, | |
| "Режим": "GPU" if cuda_available else "CPU", | |
| "Статус": "Успешно" if is_model_loaded else "Ошибка", | |
| "API токен": "Настроен" if HF_TOKEN else "Отсутствует" | |
| } | |
| gr.Markdown("\n".join([f"* **{k}**: {v}" for k, v in system_info.items()])) | |
| with gr.Accordion("Примеры вопросов", open=True): | |
| gr.Examples( | |
| examples=[ | |
| "Что известно о мире отшедших душ?", | |
| "Что такое энергетическая ось человека?", | |
| "Роль энергии мысли в мире отшедших?" | |
| ], | |
| inputs=user_input | |
| ) | |
| def chat(message, history): | |
| if not message: | |
| return history, "" | |
| bot_message = respond(message, history, system_msg.value, max_tokens.value, temperature.value, top_p.value, generation_timeout.value) | |
| history.append((message, bot_message)) | |
| return history, "" | |
| submit_btn.click(chat, [user_input, chatbot], [chatbot, user_input]) | |
| user_input.submit(chat, [user_input, chatbot], [chatbot, user_input]) | |
| clear_btn.click(lambda: ([], ""), None, [chatbot, user_input]) | |
| if __name__ == "__main__": | |
| demo.launch() |