Spaces:
Sleeping
Sleeping
| from pyrogram import Client, filters | |
| from config import API_ID, API_HASH, BOT_TOKEN, HUGGINGFACE_TOKEN | |
| import requests | |
| import os | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| # Add these constants at the top | |
| API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3-turbo" | |
| AI_URL = "https://charan5775-fastest.hf.space/t2t" | |
| AI_HEADERS = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| # Initialize message history in memory only | |
| message_history = [] | |
| # Create a new Client instance with a custom session name and no local storage | |
| app = Client( | |
| "my_bot", | |
| api_id=API_ID, | |
| api_hash=API_HASH, | |
| bot_token=BOT_TOKEN, | |
| in_memory=True # This prevents SQLite database locks | |
| ) | |
| def save_message_to_history(user_id, username, message_type, content, bot_response=None): | |
| message_data = { | |
| "content": content, | |
| "response": bot_response | |
| } | |
| message_history.append(message_data) | |
| print("Message History Update:", json.dumps(message_data, indent=2)) | |
| # Add command to print history | |
| async def history_command(client, message): | |
| if not message_history: | |
| await message.reply_text("No message history available.") | |
| return | |
| try: | |
| # Create a formatted message | |
| history_text = "📜 Message History:\n\n" | |
| for idx, msg in enumerate(message_history[-10:], 1): # Show last 10 messages | |
| history_text += f"{idx}. Message: {msg['content']}\n" | |
| if msg['response']: | |
| history_text += f" Response: {msg['response']}\n" | |
| history_text += "\n" | |
| await message.reply_text(history_text) | |
| except Exception as e: | |
| await message.reply_text(f"Error retrieving history: {str(e)}") | |
| # Command handler for /start | |
| async def start_command(client, message): | |
| await message.reply_text("Hello! I'm your Telegram bot. Nice to meet you!") | |
| # Command handler for /help | |
| async def help_command(client, message): | |
| help_text = """ | |
| Available commands: | |
| /start - Start the bot | |
| /help - Show this help message | |
| """ | |
| await message.reply_text(help_text) | |
| # Message handler for regular text messages | |
| async def echo(client, message): | |
| try: | |
| thinking_msg = await message.reply_text("🤔 Thinking about your message...") | |
| ai_response = await get_ai_response(message.text) | |
| await thinking_msg.delete() | |
| await message.reply_text(ai_response) | |
| # Save message to history | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "text", | |
| message.text, | |
| ai_response | |
| ) | |
| except Exception as e: | |
| await message.reply_text(f"Sorry, I couldn't process your message: {str(e)}") | |
| # Handle photo messages | |
| async def handle_photo(client, message): | |
| response = "Nice photo!" | |
| await message.reply_text(response) | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "photo", | |
| "Photo message", | |
| response | |
| ) | |
| # Handle sticker messages | |
| async def handle_sticker(client, message): | |
| response = "Cool sticker!" | |
| await message.reply_text(response) | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "sticker", | |
| "Sticker message", | |
| response | |
| ) | |
| # Custom command example | |
| async def info_command(client, message): | |
| user = message.from_user | |
| info_text = f""" | |
| User Information: | |
| ID: {user.id} | |
| Name: {user.first_name} | |
| Username: @{user.username if user.username else 'None'} | |
| """ | |
| await message.reply_text(info_text) | |
| # Add this function after your existing imports | |
| async def transcribe_audio(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| data = f.read() | |
| response = requests.post(API_URL, data=data) | |
| return response.json().get('text', 'Could not transcribe audio') | |
| except Exception as e: | |
| print(f"Error in transcription: {e}") | |
| return "Error transcribing audio" | |
| # Add this new function after transcribe_audio function | |
| async def get_ai_response(text): | |
| try: | |
| # Create condensed history from all messages | |
| context = "" | |
| if message_history: | |
| # If we have more than 10 messages, summarize older ones | |
| if len(message_history) > 10: | |
| older_messages = message_history[:-10] | |
| recent_messages = message_history[-10:] | |
| # Summarize older messages in pairs to save space | |
| context = "Earlier conversation summary:\n" | |
| for i in range(0, len(older_messages), 2): | |
| pair = older_messages[i:i+2] | |
| combined_content = " | ".join(msg['content'] for msg in pair) | |
| if len(combined_content) > 100: | |
| combined_content = combined_content[:100] + "..." | |
| context += f"Chat: {combined_content}\n" | |
| # Add a separator | |
| context += "\nRecent conversation:\n" | |
| # Add recent messages in full detail | |
| for msg in recent_messages: | |
| if msg['content'] and msg['response']: | |
| context += f"Human: {msg['content']}\nAssistant: {msg['response']}\n" | |
| else: | |
| # If less than 10 messages, include all in detail | |
| context = "Conversation history:\n" | |
| for msg in message_history: | |
| if msg['content'] and msg['response']: | |
| context += f"Human: {msg['content']}\nAssistant: {msg['response']}\n" | |
| # Combine context with current query | |
| full_query = f"{context}Human: {text}" | |
| payload = { | |
| "query": full_query, | |
| "stream": False | |
| } | |
| response = requests.post(AI_URL, json=payload) | |
| print(f"Raw API Response: {response.text}") # Debug print | |
| if response.status_code != 200: | |
| print(f"API Error: Status {response.status_code}") | |
| return f"Sorry, the AI service returned an error (Status {response.status_code})" | |
| response_data = response.json() | |
| print(f"Parsed Response Data: {response_data}") # Debug print | |
| # The API returns the response directly | |
| if isinstance(response_data, dict) and 'response' in response_data: | |
| return response_data['response'] | |
| else: | |
| return str(response_data) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Network error: {e}") | |
| return "Sorry, I'm having trouble connecting to the AI service." | |
| except json.JSONDecodeError as e: | |
| print(f"JSON parsing error: {e}\nResponse text: {response.text}") | |
| return "Sorry, I received an invalid response from the AI service." | |
| except Exception as e: | |
| print(f"Error getting AI response: {str(e)}\nFull error: {repr(e)}") | |
| return "Sorry, I couldn't process your message." | |
| # Update the voice message handler with retry logic | |
| async def handle_voice(client, message): | |
| try: | |
| # Send a processing message | |
| processing_msg = await message.reply_text("🎵 Processing your voice message...") | |
| # Download the voice message with retry logic | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| message = await app.get_messages( | |
| message.chat.id, | |
| message.id | |
| ) | |
| voice_file = await message.download() | |
| break | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| raise e | |
| await asyncio.sleep(1) | |
| transcription = await transcribe_audio(voice_file) | |
| await message.reply_text(f"🗣️ Transcription:\n\n{transcription}") | |
| thinking_msg = await message.reply_text("🤔 Thinking about your message...") | |
| ai_response = await get_ai_response(transcription) | |
| await thinking_msg.delete() | |
| await message.reply_text(ai_response) | |
| # Save voice message to history | |
| save_message_to_history( | |
| message.from_user.id, | |
| message.from_user.username, | |
| "voice", | |
| transcription, | |
| ai_response | |
| ) | |
| # Clean up | |
| try: | |
| os.remove(voice_file) | |
| await processing_msg.delete() | |
| except: | |
| pass | |
| except Exception as e: | |
| error_message = f"Sorry, there was an error processing your message: {str(e)}" | |
| print(error_message) | |
| await message.reply_text(error_message) | |
| # Run the bot | |
| if __name__ == "__main__": | |
| print("Bot is running...") | |
| app.run() |