import os from fastapi import FastAPI, Request from pinecone import Pinecone from groq import Groq import httpx import re # 1. Configuration & Clients # Use Hugging Face Secrets for these! PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") GROQ_API_KEY = os.environ.get("GROQ_API_KEY") TELEGRAM_TOKEN = os.environ.get("TELEGRAM_TOKEN") TELEGRAM_URL = f"https://149.154.167.220/bot{TELEGRAM_TOKEN}/sendMessage" EMBED_MODEL= os.environ.get("EMBED_MODEL") GROQ_MODEL = os.environ.get("GROQ_MODEL") PROMPT = os.environ.get("PROMPT") pc = Pinecone(api_key=PINECONE_API_KEY) index = pc.Index("customerserviceindex") groq_client = Groq(api_key=GROQ_API_KEY) app = FastAPI() def clean_ai_response(text: str): # إزالة كل ما بين وسمي و بما في ذلك الوسوم نفسها cleaned_text = re.sub(r'.*?', '', text, flags=re.DOTALL) return cleaned_text.strip() # 2. The Core AI Logic async def get_ai_response(user_query: str): # Vectorize query using Pinecone Inference query_embedding = pc.inference.embed( model=EMBED_MODEL, inputs=[user_query], parameters={"input_type": "query"} ) # Search Pinecone for Bank Context search_results = index.query( vector=query_embedding[0].values, top_k=3, include_metadata=True ) retrieved_context = "\n".join([res.metadata['original_text'] for res in search_results.matches]) prompt = f""" {PROMPT} Message:{user_query} Retrieved Context:{retrieved_context} Final Answer: """ completion = groq_client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model=GROQ_MODEL, temperature=0.1, max_completion_tokens=800, top_p=0.9, ) ai_response = completion.choices[0].message.content return clean_ai_response(ai_response) # 3. The Webhook Endpoint @app.post("/webhook") async def telegram_webhook(request: Request): data = await request.json() if "message" in data: chat_id = data["message"]["chat"]["id"] user_text = data["message"].get("text", "") if user_text: # Get the intelligent response ai_answer = await get_ai_response(user_text) # Send back to Telegram async with httpx.AsyncClient(verify=False) as client: await client.post( TELEGRAM_URL, headers={"Host": "api.telegram.org"}, json={ "chat_id": chat_id, "text": ai_answer, "parse_mode": "Markdown" } ) return {"status": "ok"} @app.get("/") async def root(): return {"message": "Hadhramout Bank AI Backend is Live"} @app.post("/test") async def test_webhook(request: Request): data = await request.json() response = await get_ai_response(data["message"]["text"]) return {"response": response} import socket @app.get("/dns-test") async def dns_test(): try: ip = socket.gethostbyname("api.telegram.org") return {"resolved_ip": ip} except Exception as e: return {"error": str(e)}