Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, Request | |
| from pinecone import Pinecone | |
| from groq import Groq | |
| import httpx # For sending messages back to Telegram | |
| # 1. Configuration & Clients | |
| # Use Hugging Face Secrets for these! | |
| PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| TELEGRAM_TOKEN = os.environ.get("TELEGRAM_TOKEN") | |
| TELEGRAM_URL = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| index = pc.Index("customerserviceindex") | |
| groq_client = Groq(api_key=GROQ_API_KEY) | |
| app = FastAPI() | |
| # 2. The Core AI Logic | |
| async def get_ai_response(user_query: str): | |
| # Vectorize query using Pinecone Inference | |
| query_embedding = pc.inference.embed( | |
| model_id="multilingual-e5-large", | |
| inputs=[user_query], | |
| parameters={"input_type": "query"} | |
| ) | |
| # Search Pinecone for Bank Context | |
| search_results = index.query( | |
| vector=query_embedding[0].values, | |
| top_k=3, | |
| include_metadata=True | |
| ) | |
| context_text = "\n".join([res.metadata['original_text'] for res in search_results.matches]) | |
| # Construct the System Prompt | |
| # We use facts from the profile: Islamic banking, based in Mukalla [cite: 15, 6] | |
| prompt = f""" | |
| You are the official AI assistant for Hadhramout Bank (بنك حضرموت). | |
| Your tone is professional, helpful, and culturally respectful to the Yemeni community. | |
| Use ONLY the provided context to answer. If the information isn't there, | |
| kindly ask the customer to visit the main branch in Al Mukalla. | |
| Context: | |
| {context_text} | |
| Customer Question: {user_query} | |
| """ | |
| completion = groq_client.chat.completions.create( | |
| messages=[{"role": "user", "content": prompt}], | |
| model="llama3-8b-8192", | |
| ) | |
| return completion.choices[0].message.content | |
| # 3. The Webhook Endpoint | |
| async def telegram_webhook(request: Request): | |
| data = await request.json() | |
| if "message" in data: | |
| chat_id = data["message"]["chat"]["id"] | |
| user_text = data["message"].get("text", "") | |
| if user_text: | |
| # Get the intelligent response | |
| ai_answer = await get_ai_response(user_text) | |
| # Send back to Telegram | |
| async with httpx.AsyncClient() as client: | |
| await client.post(TELEGRAM_URL, json={ | |
| "chat_id": chat_id, | |
| "text": ai_answer | |
| }) | |
| return {"status": "ok"} | |
| async def root(): | |
| return {"message": "Hadhramout Bank AI Backend is Live"} |