import os import json import time from datetime import datetime from io import BytesIO from google.cloud.firestore_v1.base_query import FieldFilter import pypdf import firebase_admin import numpy as np import faiss import pickle from flask import Flask, request, jsonify from flask_cors import CORS from dotenv import load_dotenv from firebase_admin import credentials, firestore, storage from google import genai import os import json import pickle import numpy as np from flask import Flask, request, jsonify from flask_cors import CORS from dotenv import load_dotenv from firebase_admin import credentials, firestore, storage, initialize_app from google import genai import faiss load_dotenv() # --- Flask Setup --- app = Flask(__name__) CORS(app) # --- Firebase Initialization --- cred_json = os.environ.get("FIREBASE") if not cred_json: raise RuntimeError("Missing FIREBASE env var") cred = credentials.Certificate(json.loads(cred_json)) initialize_app(cred, {"storageBucket": os.environ.get("Firebase_Storage")}) fs = firestore.client() bucket = storage.bucket() # --- Gemini Client --- client = genai.Client(api_key=os.getenv("Gemini")) model_name = "gemini-2.0-flash" import logging import uuid import time from flask import g, request, jsonify # ---------- Logging setup ---------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("api") # ---------- Request/Response hooks ---------- @app.before_request def _start_timer(): g.request_id = request.headers.get("X-Request-Id", str(uuid.uuid4())) g.t0 = time.time() # Minimal safe logging (avoid dumping full participantInfo / bank statements) body_preview = None try: if request.is_json: j = request.get_json(silent=True) if isinstance(j, dict): body_preview = {"keys": list(j.keys())} else: body_preview = {"type": str(type(j))} else: body_preview = {"content_type": request.content_type} except Exception: body_preview = {"parse": "failed"} logger.info( "REQ id=%s %s %s ip=%s ua=%s body=%s", g.request_id, request.method, request.path, request.headers.get("X-Forwarded-For", request.remote_addr), request.user_agent.string, body_preview, ) @app.after_request def _log_response(resp): dt_ms = int((time.time() - getattr(g, "t0", time.time())) * 1000) logger.info( "RES id=%s %s %s status=%s ms=%s", getattr(g, "request_id", "-"), request.method, request.path, resp.status_code, dt_ms, ) resp.headers["X-Request-Id"] = getattr(g, "request_id", "-") return resp from werkzeug.exceptions import HTTPException @app.errorhandler(HTTPException) def handle_http_exception(err): return jsonify({ "status": "error", "message": err.description, }), err.code @app.errorhandler(Exception) def _unhandled_exception(err): logger.exception( "UNHANDLED id=%s path=%s", getattr(g, "request_id", "-"), request.path, ) return jsonify({ "status": "error", "message": "Internal server error", }), 500 interventions_offered = { "Marketing Support": [ "Domain & Email Registration", "Website Development & Hosting", "Logo", "Social Media Setup & Page", "Industry Memberships", "Company Profile", "Email Signature", "Business Cards", "Branded Banner", "Pamphlets/Brochures", "Market Linkage", "Marketing Plan", "Digital Marketing Support", "Marketing Mentoring" ], "Financial Management": [ "Management Accounts", "Financial Management Templates", "Record Keeping", "Business Plan/Proposal", "Funding Linkages", "Financial Literacy Training", "Tax Compliance Support", "Access to Financial Software", "Financial Management Mentorship", "Grant Application Support", "Cost Management Strategies", "Financial Reporting Standards", "Product Costing" ], "Compliance": [ "Insurance", "CIPC and Annual Returns Registration", "UIF Registration", "VAT Registration", "Risk Management Plan", "HRM Support (i.e., Templates)", "Guidance - Food Compliance (Webinar)", "PAYE Compliance", "COIDA Compliance", "Certificate of Acceptability" ], "Business Strategy & Leadership": [ "Executive Mentoring", "Business Ops Plan", "Strategic Plan", "Business Communication (How to Pitch)", "Digital Transformation", "Leadership and Personal Development", "Design Thinking", "Productivity Training" ], "Skills Development & Training": [ "Excel Skills Training", "Industry Seminars", "Fireside Chat", "Industry Courses/Training", "AI Tools Training", "PowerPoint Presentation Training" ], "Operations & Tools": [ "Tools and Equipment", "Data Support", "Technology Application Support", "CRM Solutions" ], "Health & Safety": [ "OHS Audit", "Health & Safety Training" ], "Customer Experience & Sales": [ "Customer Service – Enhancing service quality to improve client satisfaction and retention", "Technology Readiness and Systems Integration", "Sales and Marketing (including Export Readiness)" ] } class GenericEvaluator: def __init__(self, available_interventions=None): self.available_interventions = available_interventions or interventions_offered def generate_prompt(self, participant_info: dict) -> str: # Create a simplified version of interventions for the prompt interventions_json = json.dumps(self.available_interventions, indent=2) prompt = f""" You are an expert evaluator for a small business incubator in South Africa, reviewing candidate applications. Use your expertise, critical thinking, and judgment to assess the following applicant. There are no predefined criteria or weights — your evaluation should be holistic and based on the information provided. Participant Info: {json.dumps(participant_info, indent=2)} Based on your assessment, provide: 1. "AI Recommendation": either "Accept" or "Reject" 2. "AI Score": a score out of 100 reflecting overall business quality or readiness 3. "Justification": a brief explanation for your decision (3-5 sentences) 4. "Recommended Interventions": Select 3-5 appropriate intervention categories and specific interventions that would most benefit this business. Available interventions: {interventions_json} Return your output strictly as a JSON dictionary with these keys: - "AI Recommendation" (string: "Accept" or "Reject") - "AI Score" (integer between 0-100) - "Justification" (string) - "Recommended Interventions" (object with category names as keys and arrays of specific interventions as values) Example format for "Recommended Interventions": {{ "Branding & Digital Presence": [ "Website Development & Hosting", "Digital Marketing Support" ], "Financial Management & Compliance": [ "Business Plan/Proposal", "Financial Literacy Training" ] }} """ return prompt def parse_gemini_response(self, response_text: str) -> dict: try: # Try to find and extract JSON from the response response_text = response_text.strip() # Look for JSON content between curly braces start_idx = response_text.find('{') end_idx = response_text.rfind('}') if start_idx >= 0 and end_idx > start_idx: json_str = response_text[start_idx:end_idx+1] result = json.loads(json_str) # Validate required fields required_fields = ["AI Recommendation", "AI Score", "Justification", "Recommended Interventions"] missing_fields = [field for field in required_fields if field not in result] if missing_fields: return { "error": f"Missing required fields: {', '.join(missing_fields)}", "parsed_data": result } # Validate AI Recommendation format if result["AI Recommendation"] not in ["Accept", "Reject"]: return { "error": "AI Recommendation must be either 'Accept' or 'Reject'", "parsed_data": result } # Validate AI Score format try: score = int(result["AI Score"]) if not 0 <= score <= 100: return { "error": "AI Score must be between 0 and 100", "parsed_data": result } except (ValueError, TypeError): return { "error": "AI Score must be a valid integer", "parsed_data": result } # Validate Recommended Interventions format interventions = result.get("Recommended Interventions", {}) if not isinstance(interventions, dict): return { "error": "Recommended Interventions must be an object/dictionary", "parsed_data": result } # All validations passed return result else: return {"error": "No valid JSON found in response", "raw_response": response_text} except json.JSONDecodeError as e: return {"error": f"JSON parsing error: {str(e)}", "raw_response": response_text} except Exception as e: return {"error": f"Unexpected error: {str(e)}", "raw_response": response_text} # Lepharo interventions structure lepharo_interventions_offered = { "ROM (Recruitment, Onboarding, and Maintenance)": [ "Gap Analysis", "SMME Onboarding Induction", "Compliance Document Verification", "Developmental Plan" ], "HSE (Health, Safety & Environment) and Labour Compliance": [ "UIF Compliance Training", "UIF Registration", "COID Compliance Training", "COID Registration", "COID Annual Renewal", "Employment Contract Collection", "ID Copy Collection", "Health & Safety File", "HSE & Labour Newsletter", "Risk Management Information Session", "HSE/Labour Compliance Workshop" ], "Financial Compliance": [ "Business Planning", "Budgeting & Financial Planning", "Bookkeeping & Accounting", "Taxation & Compliance Advisory", "Financial Analysis & Reporting", "Funding Linkage" ], "PDS (Personal Development Services)": [ "Personal Insight Assessment", "Psychometric Assessment", "Personal Recommendation Report", "Leadership Fundamentals Module", "Communication Skills Module", "Emotional Intelligence Module", "Leadership Project", "Mentorship Session" ], "Market Linkages": [ "Stakeholder Company Sourcing", "RFP/RFQ Response Support", "Procurement Opportunity Identification", "SMME Engagement Support", "Open Day/Exhibition Participation", "Aftercare Support" ], "Legal Advisory Services": [ "Commercial Law Advisory", "Labour Law Advisory", "Business Law Advisory", "Intellectual Property Advisory", "BBBEE Compliance Support", "Debt Collection Advisory", "Company Tax Compliance Advisory", "Digital Economy Legal Advisory", "Cross-Border Transaction Advisory" ], "Wellness Services": [ "Soft Skills Training", "Counselling Session", "Grief Support", "Health Risk Assessment", "Employee Wellness Newsletter" ], "Training Academy – NVC (New Venture Creation)": [ "Maths in Business Module", "Business Communication Module (1st Language)", "Business Communication Module (2nd Language)", "New Venture Creation Module", "Leadership Skills Module", "Business Ethics Module", "Business Finance Management Module", "Marketing Skills Module" ], "Training Academy – QMS": [ "QMS Certification Training", "ISO Standards Workshop" ], "Marketing and Communication": [ "Logo Design", "Website Development", "Domain Hosting", "Company Profile Design", "Business Cards", "Branded Golf Shirts", "Pull-Up Banner", "Marketing Collateral", "Event Planning" ] } class LepharoEvaluator: def __init__(self, available_interventions=None): self.available_interventions = available_interventions or lepharo_interventions_offered def generate_prompt(self, participant_info: dict) -> str: # Create a simplified version of interventions for the prompt interventions_json = json.dumps(self.available_interventions, indent=2) prompt = f""" You are an expert evaluator for Lepharo, a business development and compliance support organization in South Africa, reviewing candidate applications. Use your expertise, critical thinking, and judgment to assess the following applicant based on their business needs and development stage. There are no predefined criteria or weights — your evaluation should be holistic and based on the information provided. Participant Info: {json.dumps(participant_info, indent=2)} Based on your assessment, provide: 1. "AI Recommendation": either "Accept" or "Reject" 2. "AI Score": a score out of 100 reflecting overall business quality or readiness 3. "Justification": a brief explanation for your decision (3-5 sentences) 4. "Recommended Interventions": Select 3-5 appropriate intervention categories and specific areas of support that would most benefit this business. Available interventions: {interventions_json} Return your output strictly as a JSON dictionary with these keys: - "AI Recommendation" (string: "Accept" or "Reject") - "AI Score" (integer between 0-100) - "Justification" (string) - "Recommended Interventions" (object with intervention names as keys and arrays of specific areas of support as values) - "intervention" (string: the primary intervention category recommended) - "areaOfSupport" (string: the primary area of support recommended) Example format for "Recommended Interventions": {{ "HSE (Health, Safety & Environment) and Labour Compliance": [ "UIF Registration", "Health & Safety File" ], "Financial Compliance": [ "Business Planning", "Taxation & Compliance Advisory" ] }} For "intervention" and "areaOfSupport", select the single most important intervention category and area of support for this participant. """ return prompt def parse_gemini_response(self, response_text: str) -> dict: try: # Try to find and extract JSON from the response response_text = response_text.strip() # Look for JSON content between curly braces start_idx = response_text.find('{') end_idx = response_text.rfind('}') if start_idx >= 0 and end_idx > start_idx: json_str = response_text[start_idx:end_idx+1] result = json.loads(json_str) # Validate required fields required_fields = ["AI Recommendation", "AI Score", "Justification", "Recommended Interventions", "intervention", "areaOfSupport"] missing_fields = [field for field in required_fields if field not in result] if missing_fields: return { "error": f"Missing required fields: {', '.join(missing_fields)}", "parsed_data": result } # Validate AI Recommendation format if result["AI Recommendation"] not in ["Accept", "Reject"]: return { "error": "AI Recommendation must be either 'Accept' or 'Reject'", "parsed_data": result } # Validate AI Score format try: score = int(result["AI Score"]) if not 0 <= score <= 100: return { "error": "AI Score must be between 0 and 100", "parsed_data": result } except (ValueError, TypeError): return { "error": "AI Score must be a valid integer", "parsed_data": result } # Validate Recommended Interventions format interventions = result.get("Recommended Interventions", {}) if not isinstance(interventions, dict): return { "error": "Recommended Interventions must be an object/dictionary", "parsed_data": result } # All validations passed return result else: return {"error": "No valid JSON found in response", "raw_response": response_text} except json.JSONDecodeError as e: return {"error": f"JSON parsing error: {str(e)}", "raw_response": response_text} except Exception as e: return {"error": f"Unexpected error: {str(e)}", "raw_response": response_text} # --- FAISS Setup --- INDEX_PATH = "vector.index" DOCS_PATH = "documents.pkl" # --- Role-Aware Firestore Fetch --- def fetch_documents(role: str, user_id: str) -> list[str]: docs = [] # 1) participants for snap in fs.collection("participants").stream(): d = snap.to_dict() owner_id = snap.id if role == "incubatee" and owner_id != user_id: continue if role == "consultant" and user_id not in d.get("assignedConsultants", []): continue name = d.get('beneficiaryName', 'Unknown') ent = d.get('enterpriseName', 'Unknown') sector = d.get('sector', 'Unknown') stage = d.get('stage', 'Unknown') devtype = d.get('developmentType', 'Unknown') docs.append(f"{name} ({ent}), sector: {sector}, stage: {stage}, type: {devtype}.") # 2) consultants for snap in fs.collection("consultants").stream(): d = snap.to_dict() if role == "consultant" and snap.id != user_id: continue name = d.get("name", "Unknown") expertise = ", ".join(d.get("expertise", [])) or "no listed expertise" rating = d.get("rating", "no rating") docs.append(f"Consultant {name} with expertise in {expertise} and rating {rating}.") # 3) programs if role in ["admin", "operations", "funder", "incubatee"]: for snap in fs.collection("programs").stream(): d = snap.to_dict() docs.append(f"Program {d.get('name')} ({d.get('status')}): {d.get('type')} - Budget {d.get('budget')}") # 4) interventions if role in ["admin", "operations", "incubatee"]: for snap in fs.collection("interventions").stream(): d = snap.to_dict() for item in d.get('interventions', []): title = item.get("title") area = d.get("areaOfSupport", "General") if title: docs.append(f"Intervention: {title} under {area}.") # 5) assignedInterventions for snap in fs.collection("assignedInterventions").stream(): d = snap.to_dict() if role == "consultant" and user_id not in d.get("consultantId", []): continue if role == "incubatee" and d.get("participantId") != user_id: continue title = d.get("interventionTitle", "Unknown") sme = d.get("smeName", "Unknown") status = d.get("status", "Unknown") docs.append(f"Assigned intervention '{title}' for {sme} ({status})") # 6) feedbacks for snap in fs.collection("feedbacks").stream(): d = snap.to_dict() if role == "consultant" and d.get("consultantId") != user_id: continue intervention = d.get("interventionTitle", "Unknown") comment = d.get("comment") if comment: docs.append(f"Feedback on {intervention}: {comment}") # 7) complianceDocuments for snap in fs.collection("complianceDocuments").stream(): d = snap.to_dict() if role == "incubatee" and d.get("participantId") != user_id: continue docs.append(f"Compliance document '{d.get('documentType')}' for {d.get('participantName')} is {d.get('status')} (expires {d.get('expiryDate')})") # 8) interventionDatabase if role in ["admin", "operations", "director", "funder"]: for snap in fs.collection("interventionDatabase").stream(): d = snap.to_dict() title = d.get("interventionTitle", "Unknown") status = d.get("status", "Unknown") feedback = d.get("feedback", "") docs.append(f"Finalized intervention '{title}' ({status}): {feedback}") return docs # --- Embedding --- def get_embeddings(texts: list[str]) -> list[list[float]]: resp = client.models.embed_content(model="text-embedding-004", contents=texts) return [emb.values for emb in resp.embeddings] # --- Dynamic Index --- def build_faiss_index(docs: list[str]): embs = np.array(get_embeddings(docs), dtype="float32") dim = embs.shape[1] index = faiss.IndexFlatIP(dim) index.add(embs) return index # --- Retrieval Helper --- def retrieve_and_respond(user_query: str, role: str, user_id: str) -> str: docs = fetch_documents(role, user_id) if not docs: return "No relevant data found for your role or access level." index = build_faiss_index(docs) q_emb = np.array(get_embeddings([user_query]), dtype="float32") _, idxs = index.search(q_emb, 3) ctx = "\n\n".join(docs[i] for i in idxs[0]) prompt = f"Use the context below to answer:\n\n{ctx}\n\nQuestion: {user_query}\nAnswer:" chat = client.chats.create(model="gemini-2.0-flash-thinking-exp") resp = chat.send_message(prompt) return resp.text # --------- Helpers for Bank-Statement Processing --------- def read_pdf_pages(file_obj): file_obj.seek(0) reader = pypdf.PdfReader(file_obj) return reader, len(reader.pages) def extract_page_text(reader, page_num): if page_num < len(reader.pages): return reader.pages[page_num].extract_text() or "" return "" def process_with_gemini(text: str) -> str: prompt = """Analyze this bank statement and extract transactions in JSON format with these fields: - Date (format DD/MM/YYYY) - Description - Amount (just the integer value) - Type (is 'income' if 'credit amount', else 'expense') - Customer Name (Only If Type is 'income' and if no name is extracted write 'general income' and if type is not 'income' write 'expense') - City (In address of bank statement) - Category_of_expense (a string, if transaction 'Type' is 'expense' categorize it based on description into: Water and electricity, Salaries and wages, Repairs & Maintenance, Motor vehicle expenses, Projects Expenses, Hardware expenses, Refunds, Accounting fees, Loan interest, Bank charges, Insurance, SARS PAYE UIF, Advertising & Marketing, Logistics and distribution, Fuel, Website hosting fees, Rentals, Subscriptions, Computer internet and Telephone, Staff training, Travel and accommodation, Depreciation, Other expenses. If no category matches, default to 'Other expenses'. If 'Type' is 'income' set Destination_of_funds to 'income'.) - ignore opening or closing balances, charts and analysis. Return ONLY valid JSON with this structure: { "transactions": [ { "Date": "string", "Description": "string", "Customer_name": "string", "City": "string", "Amount": number, "Type": "string", "Category_of_expense": "string" } ] }""" try: resp = client.models.generate_content(model='gemini-2.0-flash-thinking-exp', contents=[prompt, text]) time.sleep(6) # match your Streamlit rate-limit workaround return resp.text except Exception as e: # retry once on 504 if hasattr(e, "response") and getattr(e.response, "status_code", None) == 504: time.sleep(6) resp = client.models.generate_content(model='gemini-2.0-flash-thinking-exp', contents=[prompt, text]) return resp.text raise def process_pdf_pages(pdf_file): """ Reads each page of the given PDF file, sends it through Gemini, extracts the JSON “transactions” array, and returns the full list. """ reader, total_pages = read_pdf_pages(pdf_file) all_txns = [] for pg in range(total_pages): txt = extract_page_text(reader, pg).strip() if not txt: continue # 1) Call Gemini try: raw = process_with_gemini(txt) except Exception: # Skip this page on any error (including retries inside process_with_gemini) continue # 2) Locate the JSON payload start = raw.find("{") end = raw.rfind("}") + 1 if start < 0 or end <= 0: continue # 3) Clean up any markdown fences and parse js = raw[start:end].replace("```json", "").replace("```", "") try: data = json.loads(js) except json.JSONDecodeError: continue # 4) Append all found transactions txns = data.get("transactions", []) if isinstance(txns, list): all_txns.extend(txns) return all_txns # --------- Chat Endpoint --------- @app.route("/chat", methods=["POST"]) def chat_endpoint(): data = request.get_json(force=True) q = data.get("user_query") role = data.get("role") user_id = data.get("user_id") if not q or not role or not user_id: return jsonify({"error": "Missing user_query, role, or user_id"}), 400 try: reply = retrieve_and_respond(q, role.lower(), user_id) return jsonify({"reply": reply}) except Exception as e: return jsonify({"error": str(e)}), 500 # --------- Endpoint: Upload & Store Bank Statements --------- @app.route("/upload_statements", methods=["POST"]) def upload_statements(): """ Expects multipart/form-data: - 'business_id': string - 'files': one or more PDFs Stores each PDF in Storage, extracts transactions, and writes them to Firestore (collection 'transactions') with a 'business_id' tag. """ business_id = request.form.get("business_id", "").strip() if not business_id: return jsonify({"error": "Missing business_id"}), 400 if "files" not in request.files: return jsonify({"error": "No files part; upload under key 'files'"}), 400 files = request.files.getlist("files") if not files: return jsonify({"error": "No files uploaded"}), 400 stored_count = 0 for f in files: filename = f.filename or "statement.pdf" # upload raw PDF to storage dest_path = f"{business_id}/bank_statements/{datetime.utcnow().isoformat()}_{filename}" blob = bucket.blob(dest_path) f.seek(0) blob.upload_from_file(f, content_type=f.content_type) # rewind for processing f.seek(0) # extract + store transactions txns= process_pdf_pages(f) for txn in txns: try: dt = datetime.strptime(txn["Date"], "%d/%m/%Y") except Exception: dt = datetime.utcnow() record = { "business_id": business_id, "Date": datetime.strptime(txn["Date"], "%d/%m/%Y"), "Description": txn.get("Description", ""), "Amount": txn.get("Amount", 0), "Type": txn.get("Type", "expense"), "Customer_name": txn.get("Customer_name", "general income" if txn.get("Type")=="income" else "expense"), "City": txn.get("City", ""), "Category_of_expense": txn.get("Category_of_expense", "") } fs.collection("transactions").add(record) stored_count += 1 return jsonify({"message": f"Stored {stored_count} transactions"}), 200 # --------- Endpoint: Retrieve or Generate Financial Statement --------- @app.route("/financial_statement", methods=["POST"]) def financial_statement(): """ Expects JSON: { "business_id": "...", "start_date": "YYYY-MM-DD", "end_date": "YYYY-MM-DD", "statement_type": "Income Statement"|"Cashflow Statement"|"Balance Sheet" } If a cached report exists for that exact (business_id, start,end), returns it. Otherwise generates via Gemini, returns it, and caches it in Firestore. """ data = request.get_json(force=True) or {} biz = data.get("business_id", "").strip() sd = data.get("start_date", "") ed = data.get("end_date", "") stype = data.get("statement_type", "Income Statement") if not (biz and sd and ed): return jsonify({"error": "Missing one of business_id, start_date, end_date"}), 400 # parse iso dates try: dt_start = datetime.fromisoformat(sd) dt_end = datetime.fromisoformat(ed) except ValueError: return jsonify({"error": "Dates must be YYYY-MM-DD"}), 400 # check cache doc_id = f"{biz}__{sd}__{ed}__{stype.replace(' ','_')}" doc_ref = fs.collection("financial_statements").document(doc_id) cached = doc_ref.get() if cached.exists: return jsonify({"report": cached.to_dict()["report"], "cached": True}), 200 # fetch transactions snaps = ( fs.collection("transactions") .where(filter=FieldFilter("business_id", "==", biz)) .where(filter=FieldFilter("Date", ">=", dt_start)) .where(filter=FieldFilter("Date", "<=", dt_end)) .stream() ) txns = [] for s in snaps: d = s.to_dict() ts = d.get("Date") date_str = ts.strftime("%d/%m/%Y") if hasattr(ts, "strftime") else str(ts) txns.append({ "Date": date_str, "Description": d.get("Description",""), "Amount": d.get("Amount",0), "Type": d.get("Type",""), "Customer_name": d.get("Customer_name",""), "City": d.get("City",""), "Category_of_expense": d.get("Category_of_expense","") }) if not txns: return jsonify({"error": "No transactions found for that period"}), 404 # generate with Gemini prompt = ( f"Based on the following transactions JSON data:\n" f"{json.dumps({'transactions': txns})}\n" f"Generate a detailed {stype} for the period from " f"{dt_start.strftime('%d/%m/%Y')} to {dt_end.strftime('%d/%m/%Y')} " f"Specific Formatting and Content Requirements:" f"Standard Accounting Structure (South Africa Focus): Organize the {stype} according to typical accounting practices followed in South Africa (e.g., for an Income Statement, clearly separate Revenue, Cost of Goods Sold, Gross Profit, Operating Expenses, and Net Income, in nice tables considering local terminology where applicable). If unsure of specific local variations, adhere to widely accepted international accounting structures." f"Clear Headings and Subheadings: Use distinct and informative headings and subheadings in English to delineate different sections of the report. Ensure these are visually prominent." f"Consistent Formatting: Maintain consistent formatting for monetary values (e.g., using 'R'for South African Rand if applicable and discernible from the data, comma separators for thousands), dates, and alignment." f"Totals and Subtotals: Clearly display totals for relevant categories and subtotals where appropriate to provide a clear understanding of the financial performance or position." f"Descriptive Line Items: Use clear and concise descriptions for each transaction or aggregated account based on the provided JSON data." f"Key Insights: Include a brief section (e.g., 'Key Highlights' or 'Summary') that identifies significant trends, notable figures, or key performance indicators derived from the data within the statement. This should be written in plain, understandable English, potentially highlighting aspects particularly relevant to the economic context of Zimbabwe if discernible from the data." f"Concise Summary: Provide a concluding summary paragraph that encapsulates the overall financial picture presented in the {stype}." f"Format the report in Markdown for better visual structure." f"Do not name the company if name is not there and return just the report and nothing else." f"subtotals, totals, key highlights, and a concise summary." ) chat = client.chats.create(model="gemini-2.0-flash") resp = chat.send_message(prompt) time.sleep(7) report = resp.text # cache it doc_ref.set({ "business_id": biz, "start_date": sd, "end_date": ed, "statement_type": stype, "report": report, "created_at": firestore.SERVER_TIMESTAMP }) return jsonify({"report": report, "cached": False}), 200 @app.route('/api/batch-evaluate', methods=['POST']) def batch_evaluate(): try: participants = request.json.get('participants', []) results = [] evaluator = GenericEvaluator() for item in participants: participant_id = item.get("participantId") participant_info = item.get("participantInfo", {}) prompt = evaluator.generate_prompt(participant_info) response = client.models.generate_content( model=model_name, contents=prompt ) evaluation = evaluator.parse_gemini_response(response.text) results.append({ "participantId": participant_id, "evaluation": evaluation }) return jsonify({ "status": "success", "evaluations": results }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @app.route('/api/shortlist', methods=['GET']) def get_shortlist(): try: # Placeholder logic return jsonify({ "status": "success", "shortlist": [] }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 # Lepharo AI Screening endpoint from google.api_core import exceptions as gexc # optional, if installed @app.route("/api/lepharo_evaluate", methods=["POST"]) def evaluate_participant(): # Validate JSON early (bad JSON should be 400, not 500) if not request.is_json: return jsonify({ "status": "error", "message": "Content-Type must be application/json", "requestId": getattr(g, "request_id", "-"), }), 400 data = request.get_json(silent=True) if not isinstance(data, dict): return jsonify({ "status": "error", "message": "Invalid JSON body", "requestId": getattr(g, "request_id", "-"), }), 400 participant_id = data.get("participantId") participant_info = data.get("participantInfo") or {} if not participant_id: return jsonify({"status": "error", "message": "Missing participantId"}), 400 if not isinstance(participant_info, dict): return jsonify({"status": "error", "message": "participantInfo must be an object"}), 400 try: evaluator = GenericEvaluator() prompt = evaluator.generate_prompt(participant_info) logger.info("EVAL id=%s participantId=%s prompt_chars=%s", getattr(g, "request_id", "-"), participant_id, len(prompt)) response = client.models.generate_content( model=model_name, contents=prompt ) txt = getattr(response, "text", "") or "" logger.info("EVAL id=%s participantId=%s gemini_text_chars=%s", getattr(g, "request_id", "-"), participant_id, len(txt)) evaluation = evaluator.parse_gemini_response(txt) # If Gemini returned something you couldn't parse, don’t hide it. # Return 502 so you can see it's an upstream/model-output problem. if isinstance(evaluation, dict) and evaluation.get("error"): logger.warning("EVAL_PARSE_FAIL id=%s participantId=%s err=%s", getattr(g, "request_id", "-"), participant_id, evaluation.get("error")) return jsonify({ "status": "error", "participantId": participant_id, "message": "Model output could not be parsed/validated", "details": evaluation, # contains error + raw_response/parsed_data "requestId": getattr(g, "request_id", "-"), }), 502 return jsonify({ "status": "success", "participantId": participant_id, "evaluation": evaluation, "requestId": getattr(g, "request_id", "-"), }), 200 except Exception as e: # Logs full traceback, but response stays safe logger.exception("EVAL_FAIL id=%s participantId=%s", getattr(g, "request_id", "-"), participant_id) return jsonify({ "status": "error", "participantId": participant_id, "message": "Evaluation failed", "requestId": getattr(g, "request_id", "-"), }), 500 @app.route('/api/lepharo_batch-evaluate', methods=['POST']) def lepharo_batch_evaluate(): try: participants = request.json.get('participants', []) results = [] evaluator = LepharoEvaluator() for item in participants: participant_id = item.get("participantId") participant_info = item.get("participantInfo", {}) prompt = evaluator.generate_prompt(participant_info) response = client.models.generate_content( model=model_name, contents=prompt ) evaluation = evaluator.parse_gemini_response(response.text) results.append({ "participantId": participant_id, "evaluation": evaluation }) return jsonify({ "status": "success", "evaluations": results }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @app.route('/api/lepharo_shortlist', methods=['GET']) def lepharo_get_shortlist(): try: # Placeholder logic - you can implement your shortlisting logic here return jsonify({ "status": "success", "shortlist": [] }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 # --------- Run the App --------- if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)