Spaces:
Sleeping
Sleeping
| from pyrogram import Client, filters | |
| from config import API_ID, API_HASH, BOT_TOKEN | |
| import requests | |
| import os | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| import edge_tts | |
| import io | |
| from telegraph.aio import Telegraph | |
| import aiohttp | |
| # Add these constants at the top | |
| API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3-turbo" | |
| AI_URL = "https://charan5775-fastest.hf.space/t2t" | |
| AI_HEADERS = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| # Initialize Telegraph and aiohttp session | |
| telegraph = None | |
| session = None | |
| async def init_telegraph(): | |
| global telegraph, session | |
| try: | |
| if telegraph is None: | |
| session = aiohttp.ClientSession() | |
| telegraph = Telegraph(session=session) | |
| await telegraph.create_account(short_name='TelegramAIBot') | |
| return True | |
| except Exception as e: | |
| print(f"Telegraph initialization error: {e}") | |
| return False | |
| async def cleanup(): | |
| global session | |
| if session: | |
| await session.close() | |
| # TTS settings | |
| DEFAULT_VOICE = "en-IN-NeerjaNeural" | |
| DEFAULT_RATE = "+25%" | |
| # Initialize message history in memory only | |
| message_history = [] | |
| async def text_to_speech(text, voice=DEFAULT_VOICE, rate=DEFAULT_RATE): | |
| """Convert text to speech and return the audio data""" | |
| try: | |
| communicate = edge_tts.Communicate(text, voice, rate=rate) | |
| audio_data = bytes() | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| audio_data += chunk["data"] | |
| return audio_data | |
| except Exception as e: | |
| print(f"Error in text-to-speech conversion: {str(e)}") | |
| return None | |
| # Create a new Client instance with a custom session name and no local storage | |
| myaibot = Client( | |
| "my_bot", | |
| api_id=API_ID, | |
| api_hash=API_HASH, | |
| bot_token=BOT_TOKEN, | |
| in_memory=True # This prevents SQLite database locks | |
| ) | |
| def save_message_to_history(user_id, username, message_type, content, bot_response=None): | |
| message_data = { | |
| "content": content, | |
| "response": bot_response | |
| } | |
| message_history.append(message_data) | |
| print("Message History Update:", json.dumps(message_data, indent=2)) | |
| def condense_text(text, max_length=100): | |
| """Condense text to a maximum length while keeping it readable""" | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length-3] + "..." | |
| # Add command to print history | |
| async def history_command(client, message): | |
| if not message_history: | |
| await message.reply_text("No message history available.") | |
| return | |
| try: | |
| total_messages = len(message_history) | |
| history_text = f"📜 Chat History (Total: {total_messages} messages)\n\n" | |
| # If we have many messages, summarize older ones | |
| if total_messages > 10: | |
| history_text += "Earlier messages:\n" | |
| for msg in message_history[:-10]: | |
| history_text += f"• {condense_text(msg['content'], 50)}\n" | |
| history_text += "\nRecent messages:\n" | |
| recent_messages = message_history[-10:] | |
| else: | |
| recent_messages = message_history | |
| # Add recent messages with more detail | |
| for idx, msg in enumerate(recent_messages, 1): | |
| history_text += f"{idx}. Q: {condense_text(msg['content'], 150)}\n" | |
| if msg['response']: | |
| history_text += f" A: {condense_text(msg['response'], 150)}\n" | |
| history_text += "\n" | |
| await message.reply_text(history_text) | |
| except Exception as e: | |
| print(f"Error in history command: {str(e)}") | |
| # Fallback to super condensed version | |
| short_history = "📜 Last 5 Messages:\n\n" | |
| for msg in message_history[-5:]: | |
| short_history += f"• {condense_text(msg['content'], 30)}\n" | |
| await message.reply_text(short_history) | |
| # Command handler for /start | |
| async def start_command(client, message): | |
| await message.reply_text("Hello! I'm your Telegram bot. Nice to meet you!") | |
| # Command handler for /help | |
| async def help_command(client, message): | |
| help_text = """ | |
| Available commands: | |
| /start - Start the bot | |
| /help - Show this help message | |
| /history - Show chat history | |
| /clear - Clear chat history | |
| /info - Show user information | |
| """ | |
| await message.reply_text(help_text) | |
| # Add clear command | |
| async def clear_command(client, message): | |
| try: | |
| message_history.clear() | |
| await message.reply_text("✨ Chat history has been cleared!") | |
| except Exception as e: | |
| await message.reply_text(f"Error clearing history: {str(e)}") | |
| # Message handler for regular text messages | |
| async def echo(client, message): | |
| try: | |
| thinking_msg = await message.reply_text("🤔 Thinking about your message...") | |
| ai_response = await get_ai_response(message.text) | |
| await thinking_msg.delete() | |
| await message.reply_text(ai_response) | |
| # Save message to history | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "text", | |
| message.text, | |
| ai_response | |
| ) | |
| except Exception as e: | |
| await message.reply_text(f"Sorry, I couldn't process your message: {str(e)}") | |
| # Handle photo messages | |
| async def handle_photo(client, message): | |
| response = "Nice photo!" | |
| await message.reply_text(response) | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "photo", | |
| "Photo message", | |
| response | |
| ) | |
| # Handle sticker messages | |
| async def handle_sticker(client, message): | |
| response = "Cool sticker!" | |
| await message.reply_text(response) | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "sticker", | |
| "Sticker message", | |
| response | |
| ) | |
| # Custom command example | |
| async def info_command(client, message): | |
| user = message.from_user | |
| info_text = f""" | |
| User Information: | |
| ID: {user.id} | |
| Name: {user.first_name} | |
| Username: @{user.username if user.username else 'None'} | |
| """ | |
| await message.reply_text(info_text) | |
| # Add this function after your existing imports | |
| async def transcribe_audio(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| data = f.read() | |
| response = requests.post(API_URL,data=data) | |
| return response.json().get('text', 'Could not transcribe audio') | |
| except Exception as e: | |
| print(f"Error in transcription: {e}") | |
| return "Error transcribing audio" | |
| # Add this new function after transcribe_audio function | |
| async def get_ai_response(text): | |
| try: | |
| # Create context from history | |
| context = "" | |
| if message_history: | |
| # Get last 5 relevant messages for context | |
| recent_history = message_history[-5:] | |
| context = ( | |
| "You are a helpful AI assistant. Below is the conversation history. " | |
| "Use this context to provide a relevant response to the user's latest message. " | |
| "If the current message is related to previous ones, make sure to reference and build upon that information.\n\n" | |
| "Previous conversation:\n" | |
| ) | |
| # Add conversation history with clear markers | |
| for i, msg in enumerate(recent_history, 1): | |
| context += f"Message {i}:\n" | |
| context += f"User: {msg['content']}\n" | |
| if msg['response']: | |
| context += f"Assistant: {msg['response']}\n" | |
| context += "\n" | |
| # Add specific instructions for the response | |
| context += ( | |
| "Instructions:\n" | |
| "1. Consider the conversation history above\n" | |
| "2. If the new message relates to previous ones, reference that information\n" | |
| "3. Maintain consistency with previous responses\n" | |
| "4. Provide a direct and relevant answer\n\n" | |
| "New message to respond to:\n" | |
| ) | |
| # Combine context with current query | |
| full_prompt = f"{context}User: {text}\nAssistant: Let me provide a relevant response based on our conversation..." | |
| payload = { | |
| "query": full_prompt, | |
| "stream": False | |
| } | |
| response = requests.post(AI_URL, json=payload) | |
| print(f"Raw API Response: {response.text}") # Debug print | |
| if response.status_code != 200: | |
| print(f"API Error: Status {response.status_code}") | |
| return f"Sorry, the AI service returned an error (Status {response.status_code})" | |
| response_data = response.json() | |
| print(f"Parsed Response Data: {response_data}") # Debug print | |
| # The API returns the response directly | |
| if isinstance(response_data, dict) and 'response' in response_data: | |
| return response_data['response'].replace("Assistant: Let me provide a relevant response based on our conversation...", "").strip() | |
| else: | |
| return str(response_data) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Network error: {e}") | |
| return "Sorry, I'm having trouble connecting to the AI service." | |
| except json.JSONDecodeError as e: | |
| print(f"JSON parsing error: {e}\nResponse text: {response.text}") | |
| return "Sorry, I received an invalid response from the AI service." | |
| except Exception as e: | |
| print(f"Error getting AI response: {str(e)}\nFull error: {repr(e)}") | |
| return "Sorry, I couldn't process your message." | |
| # Update the voice message handler with TTS response | |
| async def handle_voice(client, message): | |
| try: | |
| processing_msg = await message.reply_text("🎵 Processing your voice message...") | |
| # Download and process voice message | |
| voice_file = await message.download() | |
| transcription = await transcribe_audio(voice_file) | |
| await message.reply_text(f"🗣️ Transcription:\n\n{transcription}") | |
| # Get AI response | |
| thinking_msg = await message.reply_text("🤔 Thinking about your message...") | |
| ai_response = await get_ai_response(transcription) | |
| await thinking_msg.delete() | |
| await message.reply_text(ai_response) | |
| # Convert AI response to speech | |
| processing_tts = await message.reply_text("🔊 Converting response to speech...") | |
| audio_data = await text_to_speech(ai_response) | |
| if audio_data: | |
| # Create an in-memory file-like object | |
| audio_file = io.BytesIO(audio_data) | |
| audio_file.name = "response.mp3" | |
| # Send audio directly from memory | |
| await message.reply_voice( | |
| audio_file, | |
| caption="🎵 Voice response" | |
| ) | |
| # Save to history | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "voice", | |
| transcription, | |
| ai_response | |
| ) | |
| # Clean up | |
| try: | |
| os.remove(voice_file) # Still need to remove the downloaded voice file | |
| await processing_msg.delete() | |
| await processing_tts.delete() | |
| except: | |
| pass | |
| except Exception as e: | |
| error_message = f"Sorry, there was an error processing your message: {str(e)}" | |
| print(error_message) | |
| await message.reply_text(error_message) | |
| # Run the bot | |
| if __name__ == "__main__": | |
| print("Bot is running...") | |
| try: | |
| myaibot.run() | |
| finally: | |
| # Cleanup | |
| if session: | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(cleanup()) |