| import os |
| import time |
| import gradio as gr |
| from neo_rest import start_server |
| from logica import ( |
| find_custom_response, generate_game_explanation, |
| detect_roblox, calculator_mode_active, extract_text_content, |
| ) |
| from buscador import search as web_search, ERROR_NETWORK, ERROR_RATE_LIMIT, ERROR_LICENSE |
| from resumidor import summarize |
| from roblox_api import search_player, search_game, format_player, format_game |
| from matematicas import ( |
| is_calculator_request, is_math_operation, |
| solve_operation, format_result, extract_username, |
| ) |
|
|
| def add_turn(history, user_msg, bot_msg=""): |
| return history + [ |
| {"role": "user", "content": user_msg}, |
| {"role": "assistant", "content": bot_msg}, |
| ] |
|
|
| def stream_tokens(text, history): |
| """ |
| Emits the response token by token (word by word), |
| simulating the generation process of a real language model. |
| Longer tokens = slightly longer pauses (more semantic weight). |
| """ |
| tokens = text.split(" ") |
| accumulated = "" |
| for i, token in enumerate(tokens): |
| accumulated += token |
| if i < len(tokens) - 1: |
| accumulated += " " |
| history[-1]["content"] = accumulated |
| delay = 0.055 if len(token) > 4 else 0.030 |
| time.sleep(delay) |
| yield history |
|
|
| def respond(message, history): |
| text = message.strip().lower() |
|
|
| if is_calculator_request(message): |
| name = extract_username(history) |
| greeting = ( |
| f"Sure! 😀 {name}, here's our calculator:\n\n" |
| "🧮 **NEO-1 Virtual Calculator**\n" |
| "Type any math operation and I'll solve it instantly.\n\n" |
| "**Examples:**\n" |
| "- `5 + 3`\n- `12 * 7`\n- `100 / 4`\n" |
| "- `2 ** 8` (power)\n- `144 ** 0.5` (square root)\n\n" |
| "_Type your operation or say 'exit calculator' to go back._" |
| ) |
| history = add_turn(history, message) |
| for h in stream_tokens(greeting, history): |
| yield h, "" |
| return |
|
|
| if text in ("exit calculator", "close calculator", "quit calculator", "back to chat"): |
| history = add_turn(history, message, "Alright, back to normal chat. Ask me anything! 😊") |
| yield history, "" |
| return |
|
|
| if calculator_mode_active(history) or is_math_operation(message): |
| result = solve_operation(message) |
| if result is not None: |
| response = format_result(message, result) |
| history = add_turn(history, message) |
| for h in stream_tokens(response, history): |
| yield h, "" |
| return |
|
|
| roblox_type, roblox_name = detect_roblox(message) |
|
|
| if roblox_type == "player": |
| history = add_turn(history, message, "🔍 Searching for player on Roblox...") |
| yield history, "" |
| data = search_player(roblox_name) |
| result = format_player(data) |
| history[-1]["content"] = result |
| yield history, "" |
| return |
|
|
| if roblox_type == "game": |
| history = add_turn(history, message, "🔍 Searching for game on Roblox...") |
| yield history, "" |
| data = search_game(roblox_name) |
| result = format_game(data) |
| if data and "error" not in data: |
| explanation = generate_game_explanation(data) |
| result = result + "\n\n💡 **What is this game about?**\n" + explanation |
| history[-1]["content"] = result |
| yield history, "" |
| return |
|
|
| custom_response = find_custom_response(message) |
| if custom_response: |
| history = add_turn(history, message) |
| for h in stream_tokens(custom_response, history): |
| yield h, "" |
| return |
|
|
| |
| history = add_turn(history, message, "🌐 Searching the web...") |
| yield history, "" |
|
|
| web_result = web_search(message) |
|
|
| if web_result.get("found"): |
| summary = summarize(message, web_result) |
| if summary: |
| history[-1]["content"] = "" |
| for h in stream_tokens(summary, history): |
| yield h, "" |
| return |
|
|
| |
| error_type = web_result.get("error_type", "") |
|
|
| if error_type == ERROR_NETWORK: |
| msg = ( |
| "⚠️ **No internet connection**\n\n" |
| "I couldn't reach the web search engine right now. " |
| "I also checked my knowledge base but found nothing on that topic.\n\n" |
| "_Please try again in a few seconds or rephrase your question._" |
| ) |
| elif error_type == ERROR_RATE_LIMIT: |
| msg = ( |
| "⏳ **Too many searches in a short time**\n\n" |
| "The search engine asked me to slow down for a moment. " |
| "Please wait a few seconds and try again. 😊" |
| ) |
| elif error_type == ERROR_LICENSE: |
| msg = ( |
| "🔒 **Sources unavailable**\n\n" |
| "I found results online, but all sources use restrictive licenses " |
| "(All Rights Reserved, CC BY-ND, etc.) that don't allow me to use their content.\n\n" |
| "_Try rephrasing your question to find open-licensed sources._" |
| ) |
| else: |
| msg = ( |
| "🤖 **No results found**\n\n" |
| "I couldn't find information on that topic in my knowledge base " |
| "or on the web. Try rephrasing your question." |
| ) |
|
|
| history[-1]["content"] = msg |
| yield history, "" |
|
|
| with gr.Blocks(title="mdfjbots-neo-1") as demo: |
| gr.Markdown( |
| """ |
| # 🤖 mdfjbots-neo-1 |
| ### Conversational AI assistant |
| """ |
| ) |
|
|
| chatbot = gr.Chatbot( |
| show_label=False, |
| height=500, |
| avatar_images=(None, "https://api.dicebear.com/7.x/bottts/svg?seed=mdfjbots"), |
| ) |
|
|
| with gr.Row(): |
| input_box = gr.Textbox( |
| placeholder="Type your message here... (e.g. 'search player Builderman')", |
| show_label=False, |
| scale=9, |
| container=False, |
| autofocus=True, |
| ) |
| btn_send = gr.Button("Send", scale=1, variant="primary") |
|
|
| with gr.Row(): |
| btn_clear = gr.Button("🗑️ Clear conversation", size="sm") |
|
|
| gr.Examples( |
| examples=[ |
| "Hello, who are you?", |
| "search player Builderman", |
| "search game Adopt Me", |
| "definition of history", |
| "calculator", |
| "what is linux", |
| "korean war", |
| ], |
| inputs=input_box, |
| label="Example questions", |
| ) |
|
|
| input_box.submit(fn=respond, inputs=[input_box, chatbot], outputs=[chatbot, input_box]) |
| btn_send.click(fn=respond, inputs=[input_box, chatbot], outputs=[chatbot, input_box]) |
| btn_clear.click(fn=lambda: ([], ""), outputs=[chatbot, input_box]) |
|
|
| if __name__ == "__main__": |
| start_server() |
|
|
| port = int(os.environ.get("PORT", 5000)) |
| dev_domain = os.environ.get("REPLIT_DEV_DOMAIN", "") |
| root_path = f"https://{dev_domain}/__neo1" if dev_domain else "" |
| demo.queue() |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| share=False, |
| theme=gr.themes.Soft(primary_hue="blue"), |
| root_path=root_path, |
| ) |
|
|