customer_service / main.py
codeBOKER's picture
Rename app.py to main.py
da79526 verified
raw
history blame
2.65 kB
import os
from fastapi import FastAPI, Request
from pinecone import Pinecone
from groq import Groq
import httpx # For sending messages back to Telegram
# 1. Configuration & Clients
# Use Hugging Face Secrets for these!
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_TOKEN")
TELEGRAM_URL = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
pc = Pinecone(api_key=PINECONE_API_KEY)
index = pc.Index("customerserviceindex")
groq_client = Groq(api_key=GROQ_API_KEY)
app = FastAPI()
# 2. The Core AI Logic
async def get_ai_response(user_query: str):
# Vectorize query using Pinecone Inference
query_embedding = pc.inference.embed(
model_id="multilingual-e5-large",
inputs=[user_query],
parameters={"input_type": "query"}
)
# Search Pinecone for Bank Context
search_results = index.query(
vector=query_embedding[0].values,
top_k=3,
include_metadata=True
)
context_text = "\n".join([res.metadata['original_text'] for res in search_results.matches])
# Construct the System Prompt
# We use facts from the profile: Islamic banking, based in Mukalla [cite: 15, 6]
prompt = f"""
You are the official AI assistant for Hadhramout Bank (بنك حضرموت).
Your tone is professional, helpful, and culturally respectful to the Yemeni community.
Use ONLY the provided context to answer. If the information isn't there,
kindly ask the customer to visit the main branch in Al Mukalla.
Context:
{context_text}
Customer Question: {user_query}
"""
completion = groq_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="llama3-8b-8192",
)
return completion.choices[0].message.content
# 3. The Webhook Endpoint
@app.post("/webhook")
async def telegram_webhook(request: Request):
data = await request.json()
if "message" in data:
chat_id = data["message"]["chat"]["id"]
user_text = data["message"].get("text", "")
if user_text:
# Get the intelligent response
ai_answer = await get_ai_response(user_text)
# Send back to Telegram
async with httpx.AsyncClient() as client:
await client.post(TELEGRAM_URL, json={
"chat_id": chat_id,
"text": ai_answer
})
return {"status": "ok"}
@app.get("/")
async def root():
return {"message": "Hadhramout Bank AI Backend is Live"}