Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import asyncio | |
| import requests | |
| import librosa | |
| import soundfile as sf | |
| from datetime import datetime | |
| from fastapi import FastAPI, Request, Response, BackgroundTasks | |
| from fastapi.staticfiles import StaticFiles | |
| from dotenv import load_dotenv | |
| from supabase import create_client, Client | |
| from huggingface_hub import InferenceClient | |
| from transformers import pipeline | |
| from twilio.rest import Client as TwilioClient | |
| from fpdf import FPDF | |
| import edge_tts | |
| import re | |
| load_dotenv() | |
| app = FastAPI() | |
| # --- 1. CONFIGURATION --- | |
| HF_API_KEY = os.getenv("HF_API_KEY") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| TW_SID = os.getenv("TWILIO_ACCOUNT_SID") | |
| TW_TOKEN = os.getenv("TWILIO_AUTH_TOKEN") | |
| TW_PHONE = os.getenv("TWILIO_PHONE_NUMBER") | |
| BASE_URL = os.getenv("BASE_URL", "").rstrip("/") | |
| OWNER_PHONE = os.getenv("OWNER_PHONE") | |
| if not os.path.exists("static"): os.makedirs("static") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| try: | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| hf_client = InferenceClient(token=HF_API_KEY) | |
| twilio_client = TwilioClient(TW_SID, TW_TOKEN) | |
| except Exception as e: print(f"β οΈ Init Error: {e}") | |
| # --- 2. LOCAL EARS --- | |
| print("β³ Loading Local ASR Model...") | |
| try: | |
| asr_pipeline = pipeline("automatic-speech-recognition", model="NCAIR1/NigerianAccentedEnglish", token=HF_API_KEY) | |
| print("β ASR Ready!") | |
| except: asr_pipeline = None | |
| BRAIN_MODEL_ID = "meta-llama/Meta-Llama-3-8B-Instruct" | |
| MARKET_PRICES = {"cement": 5500, "rice": 80000, "yam": 2500, "pepper": 15000} | |
| # --- 3. HELPER FUNCTIONS --- | |
| def convert_audio_to_16k(input_path): | |
| try: | |
| y, sr = librosa.load(input_path, sr=16000) | |
| sf.write("temp_16k.wav", y, 16000) | |
| return "temp_16k.wav" | |
| except: return None | |
| def transcribe_locally(file_path): | |
| if not asr_pipeline: return None | |
| try: return asr_pipeline(file_path).get("text", "") | |
| except: return None | |
| def update_credit_score(user_phone): | |
| try: | |
| res = supabase.table("transactions").select("id", count="exact").eq("user_phone", user_phone).execute() | |
| count = res.count or 0 | |
| new_score = min(800, 300 + (count * 10)) | |
| supabase.table("users").upsert({"phone": user_phone, "credit_score": new_score}).execute() | |
| return new_score | |
| except: return 300 | |
| def simulate_receipt_scan(): | |
| return "I bought 30 cartons of Indomie for 150,000 naira." | |
| def generate_pdf_report(user_phone): | |
| try: | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt="AkoweAje Financial Report", ln=1, align='C') | |
| pdf.cell(200, 10, txt=f"Generated for: {user_phone}", ln=1, align='C') | |
| pdf.cell(200, 10, txt=f"Date: {datetime.now().strftime('%Y-%m-%d')}", ln=1, align='C') | |
| pdf.ln(10) | |
| score = update_credit_score(user_phone) | |
| pdf.set_font("Arial", 'B', 14) | |
| pdf.cell(200, 10, txt=f"Akowe Credit Score: {score}/800", ln=1) | |
| pdf.ln(5) | |
| res = supabase.table("transactions").select("*").order("created_at", desc=True).limit(20).execute() | |
| pdf.set_font("Arial", 'B', 10) | |
| pdf.cell(30, 10, "Date", 1) | |
| pdf.cell(25, 10, "Type", 1) | |
| pdf.cell(75, 10, "Item", 1) | |
| pdf.cell(35, 10, "Amount", 1) | |
| pdf.cell(25, 10, "Profit", 1) | |
| pdf.ln() | |
| pdf.set_font("Arial", size=9) | |
| total_sales = 0 | |
| for row in res.data: | |
| date_str = row.get('created_at', '')[:10] | |
| intent = row.get('intent') or 'UNK' | |
| item = row.get('item') or 'Unknown' | |
| amount = row.get('amount') or 0 | |
| profit = row.get('profit') or 0 | |
| pdf.cell(30, 10, date_str, 1) | |
| pdf.cell(25, 10, intent[:10], 1) | |
| pdf.cell(75, 10, item[:35], 1) | |
| pdf.cell(35, 10, f"N{amount:,}", 1) | |
| pdf.cell(25, 10, f"N{profit:,}", 1) | |
| pdf.ln() | |
| if intent == 'SALE': total_sales += amount | |
| pdf.ln(10) | |
| pdf.set_font("Arial", 'B', 12) | |
| pdf.cell(200, 10, txt=f"Total Revenue: N{total_sales:,.2f}", ln=1) | |
| filename = f"static/report_{int(datetime.now().timestamp())}.pdf" | |
| pdf.output(filename) | |
| return filename | |
| except Exception as e: | |
| print(f"β PDF Error: {e}") | |
| return None | |
| # --- VOICE BRIEFING (DUAL-ENGINE FALLBACK) --- | |
| async def generate_voice_briefing(text): | |
| filename = f"static/briefing_{int(datetime.now().timestamp())}.mp3" | |
| # Engine 1: Nigerian Accent (Preferred) | |
| try: | |
| print("ποΈ Attempting Nigerian Voice...") | |
| communicate = edge_tts.Communicate(text, "en-NG-EzinneNeural") | |
| await communicate.save(filename) | |
| return filename | |
| except Exception as e: | |
| print(f"β οΈ Nigerian Voice Failed: {e}") | |
| # Engine 2: US Accent (Fallback) | |
| try: | |
| print("ποΈ Attempting US Fallback Voice...") | |
| communicate = edge_tts.Communicate(text, "en-US-AriaNeural") | |
| await communicate.save(filename) | |
| return filename | |
| except Exception as e: | |
| print(f"β All TTS Failed: {e}") | |
| return None | |
| # --- OMNI-BRAIN --- | |
| def extract_financial_data(text_input): | |
| system_prompt = """You are AkoweAje, a strict data extraction AI. | |
| Task: Convert the user's input into valid JSON. | |
| Do NOT speak. Do NOT apologize. OUTPUT JSON ONLY. | |
| INTENTS: | |
| - 'SALE': New sale (extract item, amount, cost_price). | |
| - 'DEBT': New debt (extract customer, amount). | |
| - 'GENERATE_REPORT': User wants a PDF. | |
| - 'GENERATE_REMINDER': User wants a text message for a debtor. | |
| - 'DAILY_BRIEF': User wants a voice summary. | |
| - 'GENERATE_AD': User wants marketing text. | |
| - 'QUERY_SCORE': User asks for credit score. | |
| EXAMPLES: | |
| Input: "I sell 50 bags of cement to Dangote for 200k. I buy am 150k." | |
| JSON: {"intent": "SALE", "item": "50 bags of cement", "amount": 200000, "cost_price": 150000, "customer": "Dangote"} | |
| Input: "Write a message to dangote to pay me." | |
| JSON: {"intent": "GENERATE_REMINDER", "customer": "Dangote"} | |
| Input: "Write advert for fresh pepper" | |
| JSON: {"intent": "GENERATE_AD", "item": "fresh pepper"} | |
| Input: "{text_input}" | |
| JSON:""" | |
| messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": text_input}] | |
| try: | |
| response = hf_client.chat_completion(model=BRAIN_MODEL_ID, messages=messages, max_tokens=200, temperature=0.1) | |
| raw_content = response.choices[0].message.content.strip() | |
| start = raw_content.find("{") | |
| end = raw_content.rfind("}") | |
| if start != -1 and end != -1: | |
| json_str = raw_content[start:end+1] | |
| return json.loads(json_str) | |
| except Exception as e: | |
| print(f"β Brain Error: {e}") | |
| return {"intent": "UNKNOWN"} | |
| # --- MAIN PROCESSOR --- | |
| async def process_message_task(sender_phone, body, media_url, media_type): | |
| final_text = body | |
| is_image_scan = False | |
| if media_url and media_type and media_type.startswith('image/'): | |
| print("π· Image received. Simulating scan...") | |
| final_text = simulate_receipt_scan() | |
| is_image_scan = True | |
| elif media_url and media_type and media_type.startswith('audio/'): | |
| try: | |
| r = requests.get(media_url, headers={'User-Agent': 'Mozilla/5.0'}) | |
| with open("temp_voice.ogg", "wb") as f: f.write(r.content) | |
| wav = convert_audio_to_16k("temp_voice.ogg") | |
| if wav: final_text = transcribe_locally(wav) or final_text | |
| except: pass | |
| if not final_text: return | |
| print(f"π Analyzing: {final_text}") | |
| data = extract_financial_data(final_text) | |
| intent = data.get("intent") | |
| reply_msg = "" | |
| if intent in ["SALE", "DEBT"] and OWNER_PHONE and sender_phone != OWNER_PHONE: | |
| alert_msg = f"π¨ SECURITY ALERT: Apprentice ({sender_phone}) just logged: {final_text}" | |
| try: twilio_client.messages.create(from_=TW_PHONE, to=OWNER_PHONE, body=alert_msg) | |
| except: pass | |
| if intent == "SALE": | |
| amount = int(data.get('amount') or 0) | |
| cost_price = int(data.get('cost_price') or 0) | |
| item = data.get('item') or "Unknown Item" | |
| profit_msg = "" | |
| profit = 0 | |
| if cost_price > 0 and amount > 0: | |
| profit = amount - cost_price | |
| margin = (profit / amount) * 100 | |
| profit_msg = f"\nπ Profit: β¦{profit:,.0f} ({margin:.1f}%)" | |
| watchdog_msg = "" | |
| if amount > 0: | |
| for key, mkt_price in MARKET_PRICES.items(): | |
| if key in item.lower(): | |
| if amount < mkt_price: | |
| watchdog_msg = f"\nβ οΈ Market Alert: You are selling {key} cheap! Market price is N{mkt_price}." | |
| try: | |
| supabase.table("transactions").insert({ | |
| "user_phone": sender_phone, "raw_text": final_text, "intent": "SALE", | |
| "item": item, "amount": amount, | |
| "cost_price": cost_price, "profit": profit | |
| }).execute() | |
| except Exception as e: print(f"DB Error: {e}") | |
| prefix = "π· Receipt Scanned &" if is_image_scan else "π°" | |
| reply_msg = f"{prefix} Sale Recorded!\nItem: {item}\nAmount: β¦{amount:,.0f}{profit_msg}{watchdog_msg}" | |
| update_credit_score(sender_phone) | |
| elif intent == "GENERATE_AD": | |
| reply_msg = f"π¨ FRESH STOCK ALERT! π¨\nJust landed: {data.get('item')}! Best quality, best price. Hurry before it finishes! ππΎββοΈπ¨" | |
| elif intent == "QUERY_SCORE": | |
| score = update_credit_score(sender_phone) | |
| reply_msg = f"π Your current Akowe Credit Score is: {score}/800.\nKeep recording daily to increase it!" | |
| elif intent == "GENERATE_REMINDER": | |
| customer = data.get('customer', 'Customer') | |
| reply_msg = f"π Draft Message:\n\n'Hello {customer}, gentle reminder about the outstanding payment. Please let me know when to expect it. Thanks!'" | |
| elif intent == "GENERATE_REPORT": | |
| pdf_path = generate_pdf_report(sender_phone) | |
| if pdf_path: reply_msg = f"π Official Financial Report & Credit Score:\n{BASE_URL}/{pdf_path}" | |
| else: reply_msg = "β οΈ PDF Generator is temporarily busy. Try again later." | |
| elif intent == "DAILY_BRIEF": | |
| summary = "Good evening. You had a busy day. Your credit score is rising. Check your report for details." | |
| mp3_path = await generate_voice_briefing(summary) | |
| if mp3_path: reply_msg = f"ποΈ Daily Voice Briefing:\n{BASE_URL}/{mp3_path}" | |
| else: reply_msg = f"ποΈ (Voice Unavailable) Daily Briefing:\n{summary}" | |
| else: | |
| reply_msg = f"I heard: '{final_text}'\nBut didn't understand. Try again." | |
| # --- BRANDING FOOTER FIX --- | |
| if reply_msg: | |
| reply_msg += "\n\n(Powered by Awarri)" | |
| # SEND REPLY | |
| try: | |
| twilio_client.messages.create(from_=TW_PHONE, to=sender_phone, body=reply_msg) | |
| print("β Message Sent!") | |
| except Exception as e: print(f"β Twilio Error: {e}") | |
| async def whatsapp_webhook(request: Request, background_tasks: BackgroundTasks): | |
| form = await request.form() | |
| media_type = form.get('MediaContentType0', '') | |
| background_tasks.add_task(process_message_task, form.get('From'), form.get('Body'), form.get('MediaUrl0'), media_type) | |
| return Response(content="<Response></Response>", media_type="application/xml") |