Spaces:
Paused
Paused
| # Ultroid - UserBot | |
| # Copyright (C) 2021-2025 TeamUltroid | |
| # | |
| # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
| # PLease read the GNU Affero General Public License in | |
| # <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>. | |
| """ | |
| β Commands Available - | |
| β’ `{i}gemini <prompt>` | |
| Get response from Google Gemini. | |
| β’ `{i}antr <prompt>` | |
| Get response from Anthropic Claude. | |
| β’ `{i}gpt <prompt>` | |
| Get response from OpenAI GPT. | |
| β’ `{i}deepseek <prompt>` | |
| Get response from DeepSeek AI. | |
| Set custom models using: | |
| β’ OPENAI_MODEL: default: gpt-4o-mini | |
| β’ ANTHROPIC_MODEL: claude-3-opus-20240229 | |
| β’ GEMINI_MODEL: gemini-1.5-flash | |
| β’ DEEPSEEK_MODEL: deepseek-chat | |
| """ | |
| import json | |
| from . import LOGS, eor, get_string, udB, ultroid_cmd, async_searcher | |
| import aiohttp | |
| import asyncio | |
| ENDPOINTS = { | |
| "gpt": "https://api.openai.com/v1/chat/completions", | |
| "antr": "https://api.anthropic.com/v1/messages", | |
| "gemini": "https://generativelanguage.googleapis.com/v1beta/chat/completions", | |
| "deepseek": "https://api.deepseek.com/chat/completions" | |
| } | |
| DEFAULT_MODELS = { | |
| "gpt": "gpt-4o-mini", | |
| "antr": "claude-3-opus-20240229", | |
| "gemini": "gemini-1.5-flash", | |
| "deepseek": "deepseek-chat" | |
| } | |
| def get_model(provider): | |
| """Get model name from database or use default""" | |
| model_keys = { | |
| "gpt": "OPENAI_MODEL", | |
| "antr": "ANTHROPIC_MODEL", | |
| "gemini": "GEMINI_MODEL", | |
| "deepseek": "DEEPSEEK_MODEL" | |
| } | |
| return udB.get_key(model_keys[provider]) or DEFAULT_MODELS[provider] | |
| async def stream_response(msg, text): | |
| """Stream response by editing message""" | |
| current = "" | |
| # Split into chunks of ~100 characters at word boundaries | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| for word in words: | |
| current_chunk.append(word) | |
| if len(" ".join(current_chunk)) > 100: | |
| chunks.append(" ".join(current_chunk[:-1])) | |
| current_chunk = [word] | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| for chunk in chunks: | |
| current += chunk + " " | |
| try: | |
| await msg.edit(current) | |
| except Exception: | |
| pass | |
| await asyncio.sleep(0.5) | |
| return current | |
| async def get_ai_response(provider, prompt, api_key, stream=False): | |
| """Get response from AI provider""" | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| model = get_model(provider) | |
| if provider == "gpt": | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| data = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "stream": stream | |
| } | |
| if not stream: | |
| response = await async_searcher( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| post=True, | |
| json=data, | |
| re_json=True | |
| ) | |
| yield response["choices"][0]["message"]["content"] | |
| return | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| json=data | |
| ) as resp: | |
| async for line in resp.content: | |
| if line: | |
| try: | |
| json_line = json.loads(line.decode('utf-8').strip().strip('data:').strip()) | |
| if 'choices' in json_line and json_line['choices']: | |
| content = json_line['choices'][0].get('delta', {}).get('content', '') | |
| if content: | |
| yield content | |
| except Exception: | |
| continue | |
| elif provider == "antr": | |
| headers["x-api-key"] = api_key | |
| headers["anthropic-version"] = "2023-06-01" | |
| data = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "stream": stream | |
| } | |
| if not stream: | |
| response = await async_searcher( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| post=True, | |
| json=data, | |
| re_json=True | |
| ) | |
| yield response["content"][0]["text"] | |
| return | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| json=data | |
| ) as resp: | |
| async for line in resp.content: | |
| if line: | |
| try: | |
| json_line = json.loads(line.decode('utf-8').strip()) | |
| if 'content' in json_line: | |
| content = json_line['content'][0]['text'] | |
| if content: | |
| yield content | |
| except Exception: | |
| continue | |
| elif provider == "gemini": | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| data = { | |
| "model": model, | |
| "messages": [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "stream": stream | |
| } | |
| if not stream: | |
| try: | |
| response = await async_searcher( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| post=True, | |
| json=data, | |
| re_json=True | |
| ) | |
| if "error" in response: | |
| error = response["error"] | |
| if error.get("code") == 429: | |
| retry_delay = None | |
| for detail in error.get("details", []): | |
| if detail.get("@type") == "type.googleapis.com/google.rpc.RetryInfo": | |
| retry_delay = detail.get("retryDelay", "60s").rstrip("s") | |
| error_msg = f"β οΈ Rate limit exceeded. Please try again in {retry_delay} seconds." | |
| if "free_tier" in str(error): | |
| error_msg += "\nConsider upgrading to a paid tier for higher quotas." | |
| yield error_msg | |
| return | |
| yield f"Error: {error.get('message', 'Unknown error occurred')}" | |
| return | |
| yield response["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| LOGS.exception(e) | |
| yield f"Error: {str(e)}" | |
| return | |
| async with aiohttp.ClientSession() as session: | |
| try: | |
| async with session.post( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| json=data | |
| ) as resp: | |
| if resp.status == 429: | |
| error_data = await resp.json() | |
| retry_delay = "60" | |
| for detail in error_data.get("error", {}).get("details", []): | |
| if detail.get("@type") == "type.googleapis.com/google.rpc.RetryInfo": | |
| retry_delay = detail.get("retryDelay", "60s").rstrip("s") | |
| yield f"β οΈ Rate limit exceeded. Please try again in {retry_delay} seconds." | |
| return | |
| if resp.status != 200: | |
| error_data = await resp.json() | |
| yield f"Error: {error_data.get('error', {}).get('message', 'Unknown error occurred')}" | |
| return | |
| async for line in resp.content: | |
| if line: | |
| text = line.decode('utf-8').strip() | |
| if text.startswith('data: '): | |
| data = text[6:] # Remove 'data: ' prefix | |
| if data == '[DONE]': | |
| break | |
| try: | |
| json_data = json.loads(data) | |
| if 'choices' in json_data and json_data['choices']: | |
| content = json_data['choices'][0].get('delta', {}).get('content', '') | |
| if content: | |
| yield content | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| LOGS.exception(e) | |
| yield f"Error: {str(e)}" | |
| elif provider == "deepseek": | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| data = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "stream": stream | |
| } | |
| if not stream: | |
| response = await async_searcher( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| post=True, | |
| json=data, | |
| re_json=True | |
| ) | |
| yield response["choices"][0]["message"]["content"] | |
| return | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| ENDPOINTS[provider], | |
| headers=headers, | |
| json=data | |
| ) as resp: | |
| async for line in resp.content: | |
| if line: | |
| try: | |
| json_line = json.loads(line.decode('utf-8').strip()) | |
| if 'choices' in json_line and json_line['choices']: | |
| content = json_line['choices'][0].get('delta', {}).get('content', '') | |
| if content: | |
| yield content | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| LOGS.exception(e) | |
| yield f"Error: {str(e)}" | |
| async def gemini_ai(event): | |
| """Use Google Gemini""" | |
| prompt = event.pattern_match.group(1).strip() | |
| if not prompt: | |
| return await event.eor("β Please provide a prompt!") | |
| api_key = udB.get_key("GEMINI_API_KEY") | |
| if not api_key: | |
| return await event.eor("β οΈ Please set Gemini API key using `setdb GEMINI_API_KEY your_api_key`") | |
| msg = await event.eor("π€ Thinking...") | |
| model = get_model("gemini") | |
| header = ( | |
| "π€ **Google Gemini**\n" | |
| f"**Model:** `{model}`\n" | |
| "ββββββββββ\n\n" | |
| f"**π Prompt:**\n{prompt}\n\n" | |
| "**π‘ Response:**\n" | |
| ) | |
| if event.client.me.bot: | |
| await msg.edit(header) | |
| response = "" | |
| async for chunk in get_ai_response("gemini", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(header + response) | |
| except Exception: | |
| pass | |
| else: | |
| response = "" | |
| async for chunk in get_ai_response("gemini", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(header + response) | |
| except Exception: | |
| pass | |
| async def anthropic_ai(event): | |
| """Use Anthropic Claude""" | |
| prompt = event.pattern_match.group(1).strip() | |
| if not prompt: | |
| return await event.eor("β Please provide a prompt!") | |
| api_key = udB.get_key("ANTHROPIC_KEY") | |
| if not api_key: | |
| return await event.eor("β οΈ Please set Anthropic API key using `setdb ANTHROPIC_KEY your_api_key`") | |
| msg = await event.eor("π€ Thinking...") | |
| model = get_model("antr") | |
| formatted_response = ( | |
| "π§ **Anthropic Claude**\n" | |
| f"**Model:** `{model}`\n" | |
| "ββββββββββ\n\n" | |
| f"**π Prompt:**\n{prompt}\n\n" | |
| f"**π‘ Response:**\n" | |
| ) | |
| if event.client.me.bot: | |
| await msg.edit(formatted_response) | |
| response = "" | |
| async for chunk in get_ai_response("antr", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(formatted_response + response) | |
| except Exception: | |
| pass | |
| else: | |
| response = "" | |
| async for chunk in get_ai_response("antr", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(formatted_response + response) | |
| except Exception: | |
| pass | |
| async def openai_ai(event): | |
| """Use OpenAI GPT""" | |
| prompt = event.pattern_match.group(1).strip() | |
| if not prompt: | |
| return await event.eor("β Please provide a prompt!") | |
| api_key = udB.get_key("OPENAI_API_KEY") | |
| if not api_key: | |
| return await event.eor("β οΈ Please set GPT API key using `setdb OPENAI_API_KEY your_api_key`") | |
| msg = await event.eor("π€ Thinking...") | |
| model = get_model("gpt") | |
| header = ( | |
| "π **OpenAI GPT**\n" | |
| f"**Model:** `{model}`\n" | |
| "ββββββββββ\n\n" | |
| f"**π Prompt:**\n{prompt}\n\n" | |
| "**π‘ Response:**\n" | |
| ) | |
| if event.client.me.bot: | |
| await msg.edit(header) | |
| response = "" | |
| async for chunk in get_ai_response("gpt", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(header + response) | |
| except Exception: | |
| pass | |
| else: | |
| response ="" | |
| async for chunk in get_ai_response("gpt", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(header + response) | |
| except Exception: | |
| pass | |
| async def deepseek_ai(event): | |
| """Use DeepSeek AI""" | |
| prompt = event.pattern_match.group(1).strip() | |
| if not prompt: | |
| return await event.eor("β Please provide a prompt!") | |
| api_key = udB.get_key("DEEPSEEK_API_KEY") | |
| if not api_key: | |
| return await event.eor("β οΈ Please set DeepSeek API key using `setdb DEEPSEEK_API_KEY your_api_key`") | |
| msg = await event.eor("π€ Thinking...") | |
| model = get_model("deepseek") | |
| formatted_response = ( | |
| "π€ **DeepSeek AI**\n" | |
| f"**Model:** `{model}`\n" | |
| "ββββββββββ\n\n" | |
| f"**π Prompt:**\n{prompt}\n\n" | |
| f"**π‘ Response:**\n" | |
| ) | |
| if event.client.me.bot: | |
| await msg.edit(formatted_response) | |
| response = "" | |
| async for chunk in get_ai_response("deepseek", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(formatted_response + response) | |
| except Exception: | |
| pass | |
| else: | |
| response = "" | |
| async for chunk in get_ai_response("deepseek", prompt, api_key, stream=True): | |
| response += chunk | |
| try: | |
| await msg.edit(formatted_response + response) | |
| except Exception: | |
| pass | |