| import gradio as gr |
| import asyncio |
| import aiosqlite |
| from openai import AsyncOpenAI |
| import os |
|
|
| |
| authorized_users_str = os.getenv("AUTHORIZED_USERS") |
| if not authorized_users_str: |
| raise ValueError("Переменная окружения AUTHORIZED_USERS не установлена") |
| AUTHORIZED_USERS = dict(user.split(":") for user in authorized_users_str.split(",")) |
|
|
| proctor_links = os.getenv("PROCTOR_LINKS") |
| if not proctor_links: |
| raise ValueError("Переменная окружения PROCTOR_LINKS не установлена") |
| PROCTOR_LINKS = dict(user.split(":", 1) for user in proctor_links.split(",")) |
|
|
|
|
| |
| CHAT_IDS = ["chat1", "chat2", "chat3", "chat4", "chat5"] |
|
|
| |
| async def init_db(): |
| async with aiosqlite.connect('/data/chats.db') as db: |
| for chat_id in CHAT_IDS: |
| await db.execute(f''' |
| CREATE TABLE IF NOT EXISTS {chat_id} ( |
| id INTEGER PRIMARY KEY, |
| username TEXT, |
| role TEXT, |
| content TEXT, |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP |
| ) |
| ''') |
| await db.commit() |
|
|
| |
| async def save_message(chat_id, username, role, content): |
| async with aiosqlite.connect('/data/chats.db') as db: |
| await db.execute(f'INSERT INTO {chat_id} (username, role, content) VALUES (?, ?, ?)', |
| (username, role, content)) |
| await db.commit() |
|
|
| |
| async def load_history(chat_id, username): |
| async with aiosqlite.connect('/data/chats.db') as db: |
| async with db.execute(f''' |
| SELECT role, content FROM {chat_id} |
| WHERE username = ? |
| ORDER BY timestamp |
| ''', (username,)) as cursor: |
| history = [] |
| async for row in cursor: |
| history.append({"role": row[0], "content": row[1]}) |
| return history |
|
|
| |
| async def clear_history(chat_id, username): |
| async with aiosqlite.connect('/data/chats.db') as db: |
| await db.execute(f'DELETE FROM {chat_id} WHERE username = ?', (username,)) |
| await db.commit() |
| return [] |
|
|
| |
| client = AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY')) |
|
|
| |
| async def send_message(message, history, username, chat_id): |
| if not message: |
| return "", history |
|
|
|
|
| |
| await save_message(chat_id, username, "user", message) |
|
|
|
|
| if message.strip().lower() == "proctorlink": |
| if username in PROCTOR_LINKS: |
| link = PROCTOR_LINKS[username] |
| new_history = history + [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": link} |
| ] |
| await save_message(chat_id, username, "assistant", link) |
| return "", new_history |
| else: |
| error_msg = "Вас нет в списке пользователей с доступом к прокторингу." |
| new_history = history + [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": error_msg} |
| ] |
| await save_message(chat_id, username, "assistant", error_msg) |
| return "", new_history |
|
|
| else: |
|
|
| messages = [{"role": "system", "content": os.getenv("SYSTEM_PROMPT")}] |
| messages.append({"role": "user", "content": message}) |
| try: |
| response = await client.chat.completions.create( |
| model="gpt-4o", |
| messages=messages, |
| temperature=0.7, |
| max_tokens=16000, |
| timeout=120 |
| ) |
| assistant_response = response.choices[0].message.content |
| except Exception as e: |
| print(f"OpenAI API error: {e}") |
| assistant_response = "Error: Could not connect to the AI service." |
| await save_message(chat_id, username, "assistant", assistant_response) |
| new_history = history + [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": assistant_response} |
| ] |
| return "", new_history |
|
|
| |
| custom_css = """ |
| #chatbot { |
| width: 100% !important; |
| max-width: 1200px !important; |
| margin: 0 auto !important; |
| height: 65vh !important; |
| overflow-y: auto; |
| border: 1px solid #ccc; |
| border-radius: 8px; |
| } |
| #chatbot .message { |
| font-size: 16px !important; |
| padding: 10px; |
| } |
| #submit_btn, #load_history_btn, #clear_history_btn { |
| background-color: #4CAF50 !important; |
| color: white !important; |
| margin: 5px !important; |
| border-radius: 5px; |
| } |
| #clear_history_btn { |
| background-color: #ff4444 !important; |
| } |
| #msg { |
| width: 100% !important; |
| max-width: 1200px !important; |
| margin: 10px auto !important; |
| border-radius: 5px; |
| } |
| #chat_selector { |
| margin-bottom: 20px !important; |
| } |
| """ |
|
|
| |
| with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: |
| username_state = gr.State() |
| chat_id_state = gr.State(value="chat1") |
|
|
| |
| chat_selector = gr.Dropdown( |
| choices=CHAT_IDS, |
| label="Выберите чат", |
| value="chat1", |
| elem_id="chat_selector" |
| ) |
|
|
| with gr.Column(elem_id="chatbot_container"): |
| chatbot = gr.Chatbot(type="messages", elem_id="chatbot") |
| msg = gr.Textbox(label="Ваше сообщение", placeholder="Введите ваше сообщение...", elem_id="msg") |
| with gr.Row(): |
| submit_btn = gr.Button("Отправить", elem_id="submit_btn") |
| load_history_btn = gr.Button("Загрузить историю", elem_id="load_history_btn") |
| clear_history_btn = gr.Button("Очистить историю", elem_id="clear_history_btn") |
|
|
| |
| demo.load(init_db, inputs=None, outputs=None) |
|
|
| |
| def set_username(request: gr.Request): |
| return request.username if request else None |
|
|
| demo.load(set_username, inputs=None, outputs=username_state) |
|
|
| |
| def update_chat_id(selected_chat): |
| return selected_chat |
|
|
| chat_selector.change(update_chat_id, inputs=chat_selector, outputs=chat_id_state) |
|
|
| |
| async def load_chat_history(username, chat_id): |
| if username and chat_id: |
| history = await load_history(chat_id, username) |
| return history |
| return [] |
|
|
| chat_id_state.change(load_chat_history, inputs=[username_state, chat_id_state], outputs=chatbot) |
| username_state.change(load_chat_history, inputs=[username_state, chat_id_state], outputs=chatbot) |
|
|
| |
| async def handle_clear_history(username, chat_id): |
| if username and chat_id: |
| await clear_history(chat_id, username) |
| return [] |
|
|
| clear_history_btn.click( |
| handle_clear_history, |
| inputs=[username_state, chat_id_state], |
| outputs=chatbot |
| ) |
|
|
| |
| submit_btn.click( |
| send_message, |
| inputs=[msg, chatbot, username_state, chat_id_state], |
| outputs=[msg, chatbot] |
| ) |
|
|
| msg.submit( |
| send_message, |
| inputs=[msg, chatbot, username_state, chat_id_state], |
| outputs=[msg, chatbot] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.queue(default_concurrency_limit=40, max_size=50).launch( |
| auth=[(u, p) for u, p in AUTHORIZED_USERS.items()], |
| share=True, |
| ssr_mode=False |
| ) |