| import time |
| import gradio as gr |
|
|
| try: |
| from neo_rest import start_server |
| except Exception: |
| start_server = None |
|
|
| from logica import ( |
| find_custom_response, generate_game_explanation, |
| detect_roblox, calculator_mode_active, |
| ) |
| from buscador import search as web_search, ERROR_NETWORK, ERROR_RATE_LIMIT, ERROR_LICENSE |
| from resumidor import summarize |
| from roblox_api import search_player, search_game, format_player, format_game |
| from matematicas import ( |
| is_calculator_request, is_math_operation, |
| solve_operation, format_result, extract_username, |
| ) |
|
|
| def add_turn(history, user_msg, bot_msg=""): |
| return history + [ |
| {"role": "user", "content": user_msg}, |
| {"role": "assistant", "content": bot_msg}, |
| ] |
|
|
| def stream_tokens(text, history): |
| tokens, accumulated = text.split(" "), "" |
| for i, token in enumerate(tokens): |
| accumulated += token + (" " if i < len(tokens) - 1 else "") |
| history[-1]["content"] = accumulated |
| time.sleep(0.055 if len(token) > 4 else 0.030) |
| yield history |
|
|
| def respond(message, history): |
| text = message.strip().lower() |
|
|
| if is_calculator_request(message): |
| name = extract_username(history) |
| greeting = ( |
| f"Sure! ๐ {name}, here's our calculator:\n\n" |
| "๐งฎ **NEO-2 Virtual Calculator**\n" |
| "Type any math operation and I'll solve it instantly.\n\n" |
| "**Examples:** `5 + 3` `12 * 7` `100 / 4` `2 ** 8` (power)\n\n" |
| "_Say 'exit calculator' to go back._" |
| ) |
| history = add_turn(history, message) |
| for h in stream_tokens(greeting, history): |
| yield h, "" |
| return |
|
|
| if text in ("exit calculator", "close calculator", "quit calculator", "back to chat"): |
| history = add_turn(history, message, "Alright, back to normal chat. Ask me anything! ๐") |
| yield history, "" |
| return |
|
|
| if calculator_mode_active(history) or is_math_operation(message): |
| result = solve_operation(message) |
| if result is not None: |
| history = add_turn(history, message) |
| for h in stream_tokens(format_result(message, result), history): |
| yield h, "" |
| return |
|
|
| roblox_type, roblox_name = detect_roblox(message) |
| if roblox_type == "player": |
| history = add_turn(history, message, "๐ Searching for player on Roblox...") |
| yield history, "" |
| history[-1]["content"] = format_player(search_player(roblox_name)) |
| yield history, "" |
| return |
| if roblox_type == "game": |
| history = add_turn(history, message, "๐ Searching for game on Roblox...") |
| yield history, "" |
| data = search_game(roblox_name) |
| result = format_game(data) |
| if data and "error" not in data: |
| result += "\n\n๐ก **What is this game about?**\n" + generate_game_explanation(data) |
| history[-1]["content"] = result |
| yield history, "" |
| return |
|
|
| response = find_custom_response(message) |
| if response: |
| history = add_turn(history, message) |
| for h in stream_tokens(response, history): |
| yield h, "" |
| return |
|
|
| history = add_turn(history, message, "๐ Searching the web...") |
| yield history, "" |
| web_result = web_search(message) |
| if web_result.get("found"): |
| summary = summarize(message, web_result) |
| if summary: |
| history[-1]["content"] = "" |
| for h in stream_tokens(summary, history): |
| yield h, "" |
| return |
|
|
| error_type = web_result.get("error_type", "") |
| if error_type == ERROR_NETWORK: |
| msg = "โ ๏ธ **No internet connection**\n\nCouldn't reach the search engine. Try again in a few seconds." |
| elif error_type == ERROR_RATE_LIMIT: |
| msg = "โณ **Too many searches**\n\nPlease wait a few seconds and try again. ๐" |
| elif error_type == ERROR_LICENSE: |
| msg = "๐ **Sources unavailable**\n\nAll results use restrictive licenses. Try rephrasing your question." |
| else: |
| msg = "๐ค **No results found**\n\nNothing found in my knowledge base or on the web. Try rephrasing." |
| history[-1]["content"] = msg |
| yield history, "" |
|
|
| with gr.Blocks(title="NEO-2 โ mdfjbots") as demo: |
| gr.Markdown("# ๐ NEO-2\n### Conversational AI with web search ยท mdfjbots") |
| chatbot = gr.Chatbot( |
| show_label=False, height=480, type="messages", |
| avatar_images=(None, "https://api.dicebear.com/7.x/bottts/svg?seed=neo2"), |
| ) |
| with gr.Row(): |
| input_box = gr.Textbox( |
| placeholder="Type your message... (e.g. 'what is a black hole?')", |
| show_label=False, scale=9, container=False, autofocus=True, |
| ) |
| btn_send = gr.Button("Send", scale=1, variant="primary") |
| btn_clear = gr.Button("๐๏ธ Clear", size="sm") |
| gr.Examples( |
| ["Hello, who are you?", "search player Builderman", "search game Adopt Me", |
| "what is a supernova?", "calculator", "what is linux"], |
| inputs=input_box, |
| ) |
| input_box.submit(fn=respond, inputs=[input_box, chatbot], outputs=[chatbot, input_box]) |
| btn_send.click(fn=respond, inputs=[input_box, chatbot], outputs=[chatbot, input_box]) |
| btn_clear.click(fn=lambda: ([], ""), outputs=[chatbot, input_box]) |
|
|
| if __name__ == "__main__": |
| if start_server: |
| try: |
| start_server() |
| except Exception: |
| pass |
| demo.queue().launch() |
|
|