|
|
import os |
|
|
import json |
|
|
import time |
|
|
from datetime import datetime |
|
|
from io import BytesIO |
|
|
from google.cloud.firestore_v1.base_query import FieldFilter |
|
|
import pypdf |
|
|
import firebase_admin |
|
|
import numpy as np |
|
|
import faiss |
|
|
import pickle |
|
|
from flask import Flask, request, jsonify |
|
|
from flask_cors import CORS |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from firebase_admin import credentials, firestore, storage |
|
|
from google import genai |
|
|
|
|
|
import os |
|
|
import json |
|
|
import pickle |
|
|
import numpy as np |
|
|
from flask import Flask, request, jsonify |
|
|
from flask_cors import CORS |
|
|
from dotenv import load_dotenv |
|
|
from firebase_admin import credentials, firestore, storage, initialize_app |
|
|
from google import genai |
|
|
import faiss |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
CORS(app) |
|
|
|
|
|
|
|
|
cred_json = os.environ.get("FIREBASE") |
|
|
if not cred_json: |
|
|
raise RuntimeError("Missing FIREBASE env var") |
|
|
cred = credentials.Certificate(json.loads(cred_json)) |
|
|
initialize_app(cred, {"storageBucket": os.environ.get("Firebase_Storage")}) |
|
|
|
|
|
fs = firestore.client() |
|
|
bucket = storage.bucket() |
|
|
|
|
|
|
|
|
client = genai.Client(api_key=os.getenv("Gemini")) |
|
|
model_name = "gemini-2.0-flash" |
|
|
|
|
|
import logging |
|
|
import uuid |
|
|
import time |
|
|
from flask import g, request, jsonify |
|
|
|
|
|
|
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() |
|
|
|
|
|
logging.basicConfig( |
|
|
level=LOG_LEVEL, |
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s", |
|
|
) |
|
|
logger = logging.getLogger("api") |
|
|
|
|
|
|
|
|
@app.before_request |
|
|
def _start_timer(): |
|
|
g.request_id = request.headers.get("X-Request-Id", str(uuid.uuid4())) |
|
|
g.t0 = time.time() |
|
|
|
|
|
|
|
|
body_preview = None |
|
|
try: |
|
|
if request.is_json: |
|
|
j = request.get_json(silent=True) |
|
|
if isinstance(j, dict): |
|
|
body_preview = {"keys": list(j.keys())} |
|
|
else: |
|
|
body_preview = {"type": str(type(j))} |
|
|
else: |
|
|
body_preview = {"content_type": request.content_type} |
|
|
except Exception: |
|
|
body_preview = {"parse": "failed"} |
|
|
|
|
|
logger.info( |
|
|
"REQ id=%s %s %s ip=%s ua=%s body=%s", |
|
|
g.request_id, |
|
|
request.method, |
|
|
request.path, |
|
|
request.headers.get("X-Forwarded-For", request.remote_addr), |
|
|
request.user_agent.string, |
|
|
body_preview, |
|
|
) |
|
|
|
|
|
@app.after_request |
|
|
def _log_response(resp): |
|
|
dt_ms = int((time.time() - getattr(g, "t0", time.time())) * 1000) |
|
|
logger.info( |
|
|
"RES id=%s %s %s status=%s ms=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
request.method, |
|
|
request.path, |
|
|
resp.status_code, |
|
|
dt_ms, |
|
|
) |
|
|
resp.headers["X-Request-Id"] = getattr(g, "request_id", "-") |
|
|
return resp |
|
|
|
|
|
|
|
|
from werkzeug.exceptions import HTTPException |
|
|
|
|
|
@app.errorhandler(HTTPException) |
|
|
def handle_http_exception(err): |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": err.description, |
|
|
}), err.code |
|
|
|
|
|
@app.errorhandler(Exception) |
|
|
def _unhandled_exception(err): |
|
|
logger.exception( |
|
|
"UNHANDLED id=%s path=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
request.path, |
|
|
) |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": "Internal server error", |
|
|
}), 500 |
|
|
|
|
|
interventions_offered = { |
|
|
"Marketing Support": [ |
|
|
"Domain & Email Registration", |
|
|
"Website Development & Hosting", |
|
|
"Logo", |
|
|
"Social Media Setup & Page", |
|
|
"Industry Memberships", |
|
|
"Company Profile", |
|
|
"Email Signature", |
|
|
"Business Cards", |
|
|
"Branded Banner", |
|
|
"Pamphlets/Brochures", |
|
|
"Market Linkage", |
|
|
"Marketing Plan", |
|
|
"Digital Marketing Support", |
|
|
"Marketing Mentoring" |
|
|
], |
|
|
"Financial Management": [ |
|
|
"Management Accounts", |
|
|
"Financial Management Templates", |
|
|
"Record Keeping", |
|
|
"Business Plan/Proposal", |
|
|
"Funding Linkages", |
|
|
"Financial Literacy Training", |
|
|
"Tax Compliance Support", |
|
|
"Access to Financial Software", |
|
|
"Financial Management Mentorship", |
|
|
"Grant Application Support", |
|
|
"Cost Management Strategies", |
|
|
"Financial Reporting Standards", |
|
|
"Product Costing" |
|
|
], |
|
|
"Compliance": [ |
|
|
"Insurance", |
|
|
"CIPC and Annual Returns Registration", |
|
|
"UIF Registration", |
|
|
"VAT Registration", |
|
|
"Risk Management Plan", |
|
|
"HRM Support (i.e., Templates)", |
|
|
"Guidance - Food Compliance (Webinar)", |
|
|
"PAYE Compliance", |
|
|
"COIDA Compliance", |
|
|
"Certificate of Acceptability" |
|
|
], |
|
|
"Business Strategy & Leadership": [ |
|
|
"Executive Mentoring", |
|
|
"Business Ops Plan", |
|
|
"Strategic Plan", |
|
|
"Business Communication (How to Pitch)", |
|
|
"Digital Transformation", |
|
|
"Leadership and Personal Development", |
|
|
"Design Thinking", |
|
|
"Productivity Training" |
|
|
], |
|
|
"Skills Development & Training": [ |
|
|
"Excel Skills Training", |
|
|
"Industry Seminars", |
|
|
"Fireside Chat", |
|
|
"Industry Courses/Training", |
|
|
"AI Tools Training", |
|
|
"PowerPoint Presentation Training" |
|
|
], |
|
|
"Operations & Tools": [ |
|
|
"Tools and Equipment", |
|
|
"Data Support", |
|
|
"Technology Application Support", |
|
|
"CRM Solutions" |
|
|
], |
|
|
"Health & Safety": [ |
|
|
"OHS Audit", |
|
|
"Health & Safety Training" |
|
|
], |
|
|
"Customer Experience & Sales": [ |
|
|
"Customer Service – Enhancing service quality to improve client satisfaction and retention", |
|
|
"Technology Readiness and Systems Integration", |
|
|
"Sales and Marketing (including Export Readiness)" |
|
|
] |
|
|
} |
|
|
|
|
|
class GenericEvaluator: |
|
|
def __init__(self, available_interventions=None): |
|
|
self.available_interventions = available_interventions or interventions_offered |
|
|
|
|
|
def generate_prompt(self, participant_info: dict) -> str: |
|
|
|
|
|
interventions_json = json.dumps(self.available_interventions, indent=2) |
|
|
|
|
|
prompt = f""" |
|
|
You are an expert evaluator for a small business incubator in South Africa, reviewing candidate applications. Use your expertise, critical thinking, and judgment to assess the following applicant. There are no predefined criteria or weights — your evaluation should be holistic and based on the information provided. |
|
|
|
|
|
Participant Info: |
|
|
{json.dumps(participant_info, indent=2)} |
|
|
|
|
|
Based on your assessment, provide: |
|
|
1. "AI Recommendation": either "Accept" or "Reject" |
|
|
2. "AI Score": a score out of 100 reflecting overall business quality or readiness |
|
|
3. "Justification": a brief explanation for your decision (3-5 sentences) |
|
|
4. "Recommended Interventions": Select 3-5 appropriate intervention categories and specific interventions that would most benefit this business. |
|
|
|
|
|
Available interventions: |
|
|
{interventions_json} |
|
|
|
|
|
Return your output strictly as a JSON dictionary with these keys: |
|
|
- "AI Recommendation" (string: "Accept" or "Reject") |
|
|
- "AI Score" (integer between 0-100) |
|
|
- "Justification" (string) |
|
|
- "Recommended Interventions" (object with category names as keys and arrays of specific interventions as values) |
|
|
|
|
|
Example format for "Recommended Interventions": |
|
|
{{ |
|
|
"Branding & Digital Presence": [ |
|
|
"Website Development & Hosting", |
|
|
"Digital Marketing Support" |
|
|
], |
|
|
"Financial Management & Compliance": [ |
|
|
"Business Plan/Proposal", |
|
|
"Financial Literacy Training" |
|
|
] |
|
|
}} |
|
|
""" |
|
|
return prompt |
|
|
|
|
|
def parse_gemini_response(self, response_text: str) -> dict: |
|
|
try: |
|
|
|
|
|
response_text = response_text.strip() |
|
|
|
|
|
|
|
|
start_idx = response_text.find('{') |
|
|
end_idx = response_text.rfind('}') |
|
|
|
|
|
if start_idx >= 0 and end_idx > start_idx: |
|
|
json_str = response_text[start_idx:end_idx+1] |
|
|
result = json.loads(json_str) |
|
|
|
|
|
|
|
|
required_fields = ["AI Recommendation", "AI Score", "Justification", "Recommended Interventions"] |
|
|
missing_fields = [field for field in required_fields if field not in result] |
|
|
|
|
|
if missing_fields: |
|
|
return { |
|
|
"error": f"Missing required fields: {', '.join(missing_fields)}", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
if result["AI Recommendation"] not in ["Accept", "Reject"]: |
|
|
return { |
|
|
"error": "AI Recommendation must be either 'Accept' or 'Reject'", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
score = int(result["AI Score"]) |
|
|
if not 0 <= score <= 100: |
|
|
return { |
|
|
"error": "AI Score must be between 0 and 100", |
|
|
"parsed_data": result |
|
|
} |
|
|
except (ValueError, TypeError): |
|
|
return { |
|
|
"error": "AI Score must be a valid integer", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
interventions = result.get("Recommended Interventions", {}) |
|
|
if not isinstance(interventions, dict): |
|
|
return { |
|
|
"error": "Recommended Interventions must be an object/dictionary", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
return result |
|
|
else: |
|
|
return {"error": "No valid JSON found in response", "raw_response": response_text} |
|
|
except json.JSONDecodeError as e: |
|
|
return {"error": f"JSON parsing error: {str(e)}", "raw_response": response_text} |
|
|
except Exception as e: |
|
|
return {"error": f"Unexpected error: {str(e)}", "raw_response": response_text} |
|
|
|
|
|
|
|
|
lepharo_interventions_offered = { |
|
|
"ROM (Recruitment, Onboarding, and Maintenance)": [ |
|
|
"Gap Analysis", |
|
|
"SMME Onboarding Induction", |
|
|
"Compliance Document Verification", |
|
|
"Developmental Plan" |
|
|
], |
|
|
"HSE (Health, Safety & Environment) and Labour Compliance": [ |
|
|
"UIF Compliance Training", |
|
|
"UIF Registration", |
|
|
"COID Compliance Training", |
|
|
"COID Registration", |
|
|
"COID Annual Renewal", |
|
|
"Employment Contract Collection", |
|
|
"ID Copy Collection", |
|
|
"Health & Safety File", |
|
|
"HSE & Labour Newsletter", |
|
|
"Risk Management Information Session", |
|
|
"HSE/Labour Compliance Workshop" |
|
|
], |
|
|
"Financial Compliance": [ |
|
|
"Business Planning", |
|
|
"Budgeting & Financial Planning", |
|
|
"Bookkeeping & Accounting", |
|
|
"Taxation & Compliance Advisory", |
|
|
"Financial Analysis & Reporting", |
|
|
"Funding Linkage" |
|
|
], |
|
|
"PDS (Personal Development Services)": [ |
|
|
"Personal Insight Assessment", |
|
|
"Psychometric Assessment", |
|
|
"Personal Recommendation Report", |
|
|
"Leadership Fundamentals Module", |
|
|
"Communication Skills Module", |
|
|
"Emotional Intelligence Module", |
|
|
"Leadership Project", |
|
|
"Mentorship Session" |
|
|
], |
|
|
"Market Linkages": [ |
|
|
"Stakeholder Company Sourcing", |
|
|
"RFP/RFQ Response Support", |
|
|
"Procurement Opportunity Identification", |
|
|
"SMME Engagement Support", |
|
|
"Open Day/Exhibition Participation", |
|
|
"Aftercare Support" |
|
|
], |
|
|
"Legal Advisory Services": [ |
|
|
"Commercial Law Advisory", |
|
|
"Labour Law Advisory", |
|
|
"Business Law Advisory", |
|
|
"Intellectual Property Advisory", |
|
|
"BBBEE Compliance Support", |
|
|
"Debt Collection Advisory", |
|
|
"Company Tax Compliance Advisory", |
|
|
"Digital Economy Legal Advisory", |
|
|
"Cross-Border Transaction Advisory" |
|
|
], |
|
|
"Wellness Services": [ |
|
|
"Soft Skills Training", |
|
|
"Counselling Session", |
|
|
"Grief Support", |
|
|
"Health Risk Assessment", |
|
|
"Employee Wellness Newsletter" |
|
|
], |
|
|
"Training Academy – NVC (New Venture Creation)": [ |
|
|
"Maths in Business Module", |
|
|
"Business Communication Module (1st Language)", |
|
|
"Business Communication Module (2nd Language)", |
|
|
"New Venture Creation Module", |
|
|
"Leadership Skills Module", |
|
|
"Business Ethics Module", |
|
|
"Business Finance Management Module", |
|
|
"Marketing Skills Module" |
|
|
], |
|
|
"Training Academy – QMS": [ |
|
|
"QMS Certification Training", |
|
|
"ISO Standards Workshop" |
|
|
], |
|
|
"Marketing and Communication": [ |
|
|
"Logo Design", |
|
|
"Website Development", |
|
|
"Domain Hosting", |
|
|
"Company Profile Design", |
|
|
"Business Cards", |
|
|
"Branded Golf Shirts", |
|
|
"Pull-Up Banner", |
|
|
"Marketing Collateral", |
|
|
"Event Planning" |
|
|
] |
|
|
} |
|
|
|
|
|
class LepharoEvaluator: |
|
|
def __init__(self, available_interventions=None): |
|
|
self.available_interventions = available_interventions or lepharo_interventions_offered |
|
|
|
|
|
def generate_prompt(self, participant_info: dict) -> str: |
|
|
|
|
|
interventions_json = json.dumps(self.available_interventions, indent=2) |
|
|
|
|
|
prompt = f""" |
|
|
You are an expert evaluator for Lepharo, a business development and compliance support organization in South Africa, reviewing candidate applications. Use your expertise, critical thinking, and judgment to assess the following applicant based on their business needs and development stage. There are no predefined criteria or weights — your evaluation should be holistic and based on the information provided. |
|
|
|
|
|
Participant Info: |
|
|
{json.dumps(participant_info, indent=2)} |
|
|
|
|
|
Based on your assessment, provide: |
|
|
1. "AI Recommendation": either "Accept" or "Reject" |
|
|
2. "AI Score": a score out of 100 reflecting overall business quality or readiness |
|
|
3. "Justification": a brief explanation for your decision (3-5 sentences) |
|
|
4. "Recommended Interventions": Select 3-5 appropriate intervention categories and specific areas of support that would most benefit this business. |
|
|
|
|
|
Available interventions: |
|
|
{interventions_json} |
|
|
|
|
|
Return your output strictly as a JSON dictionary with these keys: |
|
|
- "AI Recommendation" (string: "Accept" or "Reject") |
|
|
- "AI Score" (integer between 0-100) |
|
|
- "Justification" (string) |
|
|
- "Recommended Interventions" (object with intervention names as keys and arrays of specific areas of support as values) |
|
|
- "intervention" (string: the primary intervention category recommended) |
|
|
- "areaOfSupport" (string: the primary area of support recommended) |
|
|
|
|
|
Example format for "Recommended Interventions": |
|
|
{{ |
|
|
"HSE (Health, Safety & Environment) and Labour Compliance": [ |
|
|
"UIF Registration", |
|
|
"Health & Safety File" |
|
|
], |
|
|
"Financial Compliance": [ |
|
|
"Business Planning", |
|
|
"Taxation & Compliance Advisory" |
|
|
] |
|
|
}} |
|
|
|
|
|
For "intervention" and "areaOfSupport", select the single most important intervention category and area of support for this participant. |
|
|
""" |
|
|
return prompt |
|
|
|
|
|
def parse_gemini_response(self, response_text: str) -> dict: |
|
|
try: |
|
|
|
|
|
response_text = response_text.strip() |
|
|
|
|
|
|
|
|
start_idx = response_text.find('{') |
|
|
end_idx = response_text.rfind('}') |
|
|
|
|
|
if start_idx >= 0 and end_idx > start_idx: |
|
|
json_str = response_text[start_idx:end_idx+1] |
|
|
result = json.loads(json_str) |
|
|
|
|
|
|
|
|
required_fields = ["AI Recommendation", "AI Score", "Justification", "Recommended Interventions", "intervention", "areaOfSupport"] |
|
|
missing_fields = [field for field in required_fields if field not in result] |
|
|
|
|
|
if missing_fields: |
|
|
return { |
|
|
"error": f"Missing required fields: {', '.join(missing_fields)}", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
if result["AI Recommendation"] not in ["Accept", "Reject"]: |
|
|
return { |
|
|
"error": "AI Recommendation must be either 'Accept' or 'Reject'", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
score = int(result["AI Score"]) |
|
|
if not 0 <= score <= 100: |
|
|
return { |
|
|
"error": "AI Score must be between 0 and 100", |
|
|
"parsed_data": result |
|
|
} |
|
|
except (ValueError, TypeError): |
|
|
return { |
|
|
"error": "AI Score must be a valid integer", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
interventions = result.get("Recommended Interventions", {}) |
|
|
if not isinstance(interventions, dict): |
|
|
return { |
|
|
"error": "Recommended Interventions must be an object/dictionary", |
|
|
"parsed_data": result |
|
|
} |
|
|
|
|
|
|
|
|
return result |
|
|
else: |
|
|
return {"error": "No valid JSON found in response", "raw_response": response_text} |
|
|
except json.JSONDecodeError as e: |
|
|
return {"error": f"JSON parsing error: {str(e)}", "raw_response": response_text} |
|
|
except Exception as e: |
|
|
return {"error": f"Unexpected error: {str(e)}", "raw_response": response_text} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INDEX_PATH = "vector.index" |
|
|
DOCS_PATH = "documents.pkl" |
|
|
|
|
|
|
|
|
def fetch_documents(role: str, user_id: str) -> list[str]: |
|
|
docs = [] |
|
|
|
|
|
|
|
|
for snap in fs.collection("participants").stream(): |
|
|
d = snap.to_dict() |
|
|
owner_id = snap.id |
|
|
if role == "incubatee" and owner_id != user_id: |
|
|
continue |
|
|
if role == "consultant" and user_id not in d.get("assignedConsultants", []): |
|
|
continue |
|
|
name = d.get('beneficiaryName', 'Unknown') |
|
|
ent = d.get('enterpriseName', 'Unknown') |
|
|
sector = d.get('sector', 'Unknown') |
|
|
stage = d.get('stage', 'Unknown') |
|
|
devtype = d.get('developmentType', 'Unknown') |
|
|
docs.append(f"{name} ({ent}), sector: {sector}, stage: {stage}, type: {devtype}.") |
|
|
|
|
|
|
|
|
for snap in fs.collection("consultants").stream(): |
|
|
d = snap.to_dict() |
|
|
if role == "consultant" and snap.id != user_id: |
|
|
continue |
|
|
name = d.get("name", "Unknown") |
|
|
expertise = ", ".join(d.get("expertise", [])) or "no listed expertise" |
|
|
rating = d.get("rating", "no rating") |
|
|
docs.append(f"Consultant {name} with expertise in {expertise} and rating {rating}.") |
|
|
|
|
|
|
|
|
if role in ["admin", "operations", "funder", "incubatee"]: |
|
|
for snap in fs.collection("programs").stream(): |
|
|
d = snap.to_dict() |
|
|
docs.append(f"Program {d.get('name')} ({d.get('status')}): {d.get('type')} - Budget {d.get('budget')}") |
|
|
|
|
|
|
|
|
if role in ["admin", "operations", "incubatee"]: |
|
|
for snap in fs.collection("interventions").stream(): |
|
|
d = snap.to_dict() |
|
|
for item in d.get('interventions', []): |
|
|
title = item.get("title") |
|
|
area = d.get("areaOfSupport", "General") |
|
|
if title: |
|
|
docs.append(f"Intervention: {title} under {area}.") |
|
|
|
|
|
|
|
|
for snap in fs.collection("assignedInterventions").stream(): |
|
|
d = snap.to_dict() |
|
|
if role == "consultant" and user_id not in d.get("consultantId", []): |
|
|
continue |
|
|
if role == "incubatee" and d.get("participantId") != user_id: |
|
|
continue |
|
|
title = d.get("interventionTitle", "Unknown") |
|
|
sme = d.get("smeName", "Unknown") |
|
|
status = d.get("status", "Unknown") |
|
|
docs.append(f"Assigned intervention '{title}' for {sme} ({status})") |
|
|
|
|
|
|
|
|
for snap in fs.collection("feedbacks").stream(): |
|
|
d = snap.to_dict() |
|
|
if role == "consultant" and d.get("consultantId") != user_id: |
|
|
continue |
|
|
intervention = d.get("interventionTitle", "Unknown") |
|
|
comment = d.get("comment") |
|
|
if comment: |
|
|
docs.append(f"Feedback on {intervention}: {comment}") |
|
|
|
|
|
|
|
|
for snap in fs.collection("complianceDocuments").stream(): |
|
|
d = snap.to_dict() |
|
|
if role == "incubatee" and d.get("participantId") != user_id: |
|
|
continue |
|
|
docs.append(f"Compliance document '{d.get('documentType')}' for {d.get('participantName')} is {d.get('status')} (expires {d.get('expiryDate')})") |
|
|
|
|
|
|
|
|
if role in ["admin", "operations", "director", "funder"]: |
|
|
for snap in fs.collection("interventionDatabase").stream(): |
|
|
d = snap.to_dict() |
|
|
title = d.get("interventionTitle", "Unknown") |
|
|
status = d.get("status", "Unknown") |
|
|
feedback = d.get("feedback", "") |
|
|
docs.append(f"Finalized intervention '{title}' ({status}): {feedback}") |
|
|
|
|
|
return docs |
|
|
|
|
|
|
|
|
def get_embeddings(texts: list[str]) -> list[list[float]]: |
|
|
resp = client.models.embed_content(model="text-embedding-004", contents=texts) |
|
|
return [emb.values for emb in resp.embeddings] |
|
|
|
|
|
|
|
|
def build_faiss_index(docs: list[str]): |
|
|
embs = np.array(get_embeddings(docs), dtype="float32") |
|
|
dim = embs.shape[1] |
|
|
index = faiss.IndexFlatIP(dim) |
|
|
index.add(embs) |
|
|
return index |
|
|
|
|
|
|
|
|
def retrieve_and_respond(user_query: str, role: str, user_id: str) -> str: |
|
|
docs = fetch_documents(role, user_id) |
|
|
if not docs: |
|
|
return "No relevant data found for your role or access level." |
|
|
|
|
|
index = build_faiss_index(docs) |
|
|
q_emb = np.array(get_embeddings([user_query]), dtype="float32") |
|
|
_, idxs = index.search(q_emb, 3) |
|
|
ctx = "\n\n".join(docs[i] for i in idxs[0]) |
|
|
prompt = f"Use the context below to answer:\n\n{ctx}\n\nQuestion: {user_query}\nAnswer:" |
|
|
chat = client.chats.create(model="gemini-2.0-flash-thinking-exp") |
|
|
resp = chat.send_message(prompt) |
|
|
return resp.text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_pdf_pages(file_obj): |
|
|
file_obj.seek(0) |
|
|
reader = pypdf.PdfReader(file_obj) |
|
|
return reader, len(reader.pages) |
|
|
|
|
|
def extract_page_text(reader, page_num): |
|
|
if page_num < len(reader.pages): |
|
|
return reader.pages[page_num].extract_text() or "" |
|
|
return "" |
|
|
|
|
|
def process_with_gemini(text: str) -> str: |
|
|
prompt = """Analyze this bank statement and extract transactions in JSON format with these fields: |
|
|
- Date (format DD/MM/YYYY) |
|
|
- Description |
|
|
- Amount (just the integer value) |
|
|
- Type (is 'income' if 'credit amount', else 'expense') |
|
|
- Customer Name (Only If Type is 'income' and if no name is extracted write 'general income' and if type is not 'income' write 'expense') |
|
|
- City (In address of bank statement) |
|
|
- Category_of_expense (a string, if transaction 'Type' is 'expense' categorize it based on description into: Water and electricity, Salaries and wages, Repairs & Maintenance, Motor vehicle expenses, Projects Expenses, Hardware expenses, Refunds, Accounting fees, Loan interest, Bank charges, Insurance, SARS PAYE UIF, Advertising & Marketing, Logistics and distribution, Fuel, Website hosting fees, Rentals, Subscriptions, Computer internet and Telephone, Staff training, Travel and accommodation, Depreciation, Other expenses. If no category matches, default to 'Other expenses'. If 'Type' is 'income' set Destination_of_funds to 'income'.) |
|
|
- ignore opening or closing balances, charts and analysis. |
|
|
|
|
|
Return ONLY valid JSON with this structure: |
|
|
{ |
|
|
"transactions": [ |
|
|
{ |
|
|
"Date": "string", |
|
|
"Description": "string", |
|
|
"Customer_name": "string", |
|
|
"City": "string", |
|
|
"Amount": number, |
|
|
"Type": "string", |
|
|
"Category_of_expense": "string" |
|
|
} |
|
|
] |
|
|
}""" |
|
|
try: |
|
|
|
|
|
resp = client.models.generate_content(model='gemini-2.0-flash-thinking-exp', contents=[prompt, text]) |
|
|
time.sleep(6) |
|
|
return resp.text |
|
|
except Exception as e: |
|
|
|
|
|
if hasattr(e, "response") and getattr(e.response, "status_code", None) == 504: |
|
|
time.sleep(6) |
|
|
resp = client.models.generate_content(model='gemini-2.0-flash-thinking-exp', contents=[prompt, text]) |
|
|
return resp.text |
|
|
raise |
|
|
|
|
|
def process_pdf_pages(pdf_file): |
|
|
""" |
|
|
Reads each page of the given PDF file, sends it through Gemini, |
|
|
extracts the JSON “transactions” array, and returns the full list. |
|
|
""" |
|
|
reader, total_pages = read_pdf_pages(pdf_file) |
|
|
all_txns = [] |
|
|
|
|
|
for pg in range(total_pages): |
|
|
txt = extract_page_text(reader, pg).strip() |
|
|
if not txt: |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
raw = process_with_gemini(txt) |
|
|
except Exception: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
start = raw.find("{") |
|
|
end = raw.rfind("}") + 1 |
|
|
if start < 0 or end <= 0: |
|
|
continue |
|
|
|
|
|
|
|
|
js = raw[start:end].replace("```json", "").replace("```", "") |
|
|
try: |
|
|
data = json.loads(js) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
|
|
|
txns = data.get("transactions", []) |
|
|
if isinstance(txns, list): |
|
|
all_txns.extend(txns) |
|
|
|
|
|
return all_txns |
|
|
|
|
|
|
|
|
@app.route("/chat", methods=["POST"]) |
|
|
def chat_endpoint(): |
|
|
data = request.get_json(force=True) |
|
|
q = data.get("user_query") |
|
|
role = data.get("role") |
|
|
user_id = data.get("user_id") |
|
|
|
|
|
if not q or not role or not user_id: |
|
|
return jsonify({"error": "Missing user_query, role, or user_id"}), 400 |
|
|
|
|
|
try: |
|
|
reply = retrieve_and_respond(q, role.lower(), user_id) |
|
|
return jsonify({"reply": reply}) |
|
|
except Exception as e: |
|
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/upload_statements", methods=["POST"]) |
|
|
def upload_statements(): |
|
|
""" |
|
|
Expects multipart/form-data: |
|
|
- 'business_id': string |
|
|
- 'files': one or more PDFs |
|
|
Stores each PDF in Storage, extracts transactions, and writes them |
|
|
to Firestore (collection 'transactions') with a 'business_id' tag. |
|
|
""" |
|
|
business_id = request.form.get("business_id", "").strip() |
|
|
if not business_id: |
|
|
return jsonify({"error": "Missing business_id"}), 400 |
|
|
|
|
|
if "files" not in request.files: |
|
|
return jsonify({"error": "No files part; upload under key 'files'"}), 400 |
|
|
|
|
|
files = request.files.getlist("files") |
|
|
if not files: |
|
|
return jsonify({"error": "No files uploaded"}), 400 |
|
|
|
|
|
stored_count = 0 |
|
|
for f in files: |
|
|
filename = f.filename or "statement.pdf" |
|
|
|
|
|
dest_path = f"{business_id}/bank_statements/{datetime.utcnow().isoformat()}_{filename}" |
|
|
blob = bucket.blob(dest_path) |
|
|
f.seek(0) |
|
|
blob.upload_from_file(f, content_type=f.content_type) |
|
|
|
|
|
f.seek(0) |
|
|
|
|
|
|
|
|
|
|
|
txns= process_pdf_pages(f) |
|
|
for txn in txns: |
|
|
try: |
|
|
dt = datetime.strptime(txn["Date"], "%d/%m/%Y") |
|
|
except Exception: |
|
|
dt = datetime.utcnow() |
|
|
record = { |
|
|
"business_id": business_id, |
|
|
"Date": datetime.strptime(txn["Date"], "%d/%m/%Y"), |
|
|
"Description": txn.get("Description", ""), |
|
|
"Amount": txn.get("Amount", 0), |
|
|
"Type": txn.get("Type", "expense"), |
|
|
"Customer_name": txn.get("Customer_name", |
|
|
"general income" if txn.get("Type")=="income" else "expense"), |
|
|
"City": txn.get("City", ""), |
|
|
"Category_of_expense": txn.get("Category_of_expense", "") |
|
|
} |
|
|
fs.collection("transactions").add(record) |
|
|
stored_count += 1 |
|
|
|
|
|
return jsonify({"message": f"Stored {stored_count} transactions"}), 200 |
|
|
|
|
|
|
|
|
|
|
|
@app.route("/financial_statement", methods=["POST"]) |
|
|
def financial_statement(): |
|
|
""" |
|
|
Expects JSON: |
|
|
{ |
|
|
"business_id": "...", |
|
|
"start_date": "YYYY-MM-DD", |
|
|
"end_date": "YYYY-MM-DD", |
|
|
"statement_type": "Income Statement"|"Cashflow Statement"|"Balance Sheet" |
|
|
} |
|
|
If a cached report exists for that exact (business_id, start,end), returns it. |
|
|
Otherwise generates via Gemini, returns it, and caches it in Firestore. |
|
|
""" |
|
|
data = request.get_json(force=True) or {} |
|
|
biz = data.get("business_id", "").strip() |
|
|
sd = data.get("start_date", "") |
|
|
ed = data.get("end_date", "") |
|
|
stype = data.get("statement_type", "Income Statement") |
|
|
|
|
|
if not (biz and sd and ed): |
|
|
return jsonify({"error": "Missing one of business_id, start_date, end_date"}), 400 |
|
|
|
|
|
|
|
|
try: |
|
|
dt_start = datetime.fromisoformat(sd) |
|
|
dt_end = datetime.fromisoformat(ed) |
|
|
except ValueError: |
|
|
return jsonify({"error": "Dates must be YYYY-MM-DD"}), 400 |
|
|
|
|
|
|
|
|
doc_id = f"{biz}__{sd}__{ed}__{stype.replace(' ','_')}" |
|
|
doc_ref = fs.collection("financial_statements").document(doc_id) |
|
|
cached = doc_ref.get() |
|
|
if cached.exists: |
|
|
return jsonify({"report": cached.to_dict()["report"], "cached": True}), 200 |
|
|
|
|
|
|
|
|
snaps = ( |
|
|
fs.collection("transactions") |
|
|
.where(filter=FieldFilter("business_id", "==", biz)) |
|
|
.where(filter=FieldFilter("Date", ">=", dt_start)) |
|
|
.where(filter=FieldFilter("Date", "<=", dt_end)) |
|
|
.stream() |
|
|
) |
|
|
txns = [] |
|
|
for s in snaps: |
|
|
d = s.to_dict() |
|
|
ts = d.get("Date") |
|
|
date_str = ts.strftime("%d/%m/%Y") if hasattr(ts, "strftime") else str(ts) |
|
|
txns.append({ |
|
|
"Date": date_str, |
|
|
"Description": d.get("Description",""), |
|
|
"Amount": d.get("Amount",0), |
|
|
"Type": d.get("Type",""), |
|
|
"Customer_name": d.get("Customer_name",""), |
|
|
"City": d.get("City",""), |
|
|
"Category_of_expense": d.get("Category_of_expense","") |
|
|
}) |
|
|
|
|
|
if not txns: |
|
|
return jsonify({"error": "No transactions found for that period"}), 404 |
|
|
|
|
|
|
|
|
prompt = ( |
|
|
f"Based on the following transactions JSON data:\n" |
|
|
f"{json.dumps({'transactions': txns})}\n" |
|
|
f"Generate a detailed {stype} for the period from " |
|
|
f"{dt_start.strftime('%d/%m/%Y')} to {dt_end.strftime('%d/%m/%Y')} " |
|
|
f"Specific Formatting and Content Requirements:" |
|
|
f"Standard Accounting Structure (South Africa Focus): Organize the {stype} according to typical accounting practices followed in South Africa (e.g., for an Income Statement, clearly separate Revenue, Cost of Goods Sold, Gross Profit, Operating Expenses, and Net Income, in nice tables considering local terminology where applicable). If unsure of specific local variations, adhere to widely accepted international accounting structures." |
|
|
f"Clear Headings and Subheadings: Use distinct and informative headings and subheadings in English to delineate different sections of the report. Ensure these are visually prominent." |
|
|
f"Consistent Formatting: Maintain consistent formatting for monetary values (e.g., using 'R'for South African Rand if applicable and discernible from the data, comma separators for thousands), dates, and alignment." |
|
|
f"Totals and Subtotals: Clearly display totals for relevant categories and subtotals where appropriate to provide a clear understanding of the financial performance or position." |
|
|
f"Descriptive Line Items: Use clear and concise descriptions for each transaction or aggregated account based on the provided JSON data." |
|
|
f"Key Insights: Include a brief section (e.g., 'Key Highlights' or 'Summary') that identifies significant trends, notable figures, or key performance indicators derived from the data within the statement. This should be written in plain, understandable English, potentially highlighting aspects particularly relevant to the economic context of Zimbabwe if discernible from the data." |
|
|
f"Concise Summary: Provide a concluding summary paragraph that encapsulates the overall financial picture presented in the {stype}." |
|
|
f"Format the report in Markdown for better visual structure." |
|
|
f"Do not name the company if name is not there and return just the report and nothing else." |
|
|
f"subtotals, totals, key highlights, and a concise summary." |
|
|
) |
|
|
chat = client.chats.create(model="gemini-2.0-flash") |
|
|
resp = chat.send_message(prompt) |
|
|
time.sleep(7) |
|
|
report = resp.text |
|
|
|
|
|
|
|
|
doc_ref.set({ |
|
|
"business_id": biz, |
|
|
"start_date": sd, |
|
|
"end_date": ed, |
|
|
"statement_type": stype, |
|
|
"report": report, |
|
|
"created_at": firestore.SERVER_TIMESTAMP |
|
|
}) |
|
|
|
|
|
return jsonify({"report": report, "cached": False}), 200 |
|
|
|
|
|
|
|
|
@app.route('/api/batch-evaluate', methods=['POST']) |
|
|
def batch_evaluate(): |
|
|
try: |
|
|
participants = request.json.get('participants', []) |
|
|
results = [] |
|
|
|
|
|
evaluator = GenericEvaluator() |
|
|
|
|
|
for item in participants: |
|
|
participant_id = item.get("participantId") |
|
|
participant_info = item.get("participantInfo", {}) |
|
|
prompt = evaluator.generate_prompt(participant_info) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, |
|
|
contents=prompt |
|
|
) |
|
|
|
|
|
evaluation = evaluator.parse_gemini_response(response.text) |
|
|
|
|
|
results.append({ |
|
|
"participantId": participant_id, |
|
|
"evaluation": evaluation |
|
|
}) |
|
|
|
|
|
return jsonify({ |
|
|
"status": "success", |
|
|
"evaluations": results |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/shortlist', methods=['GET']) |
|
|
def get_shortlist(): |
|
|
try: |
|
|
|
|
|
return jsonify({ |
|
|
"status": "success", |
|
|
"shortlist": [] |
|
|
}) |
|
|
except Exception as e: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
|
|
|
from google.api_core import exceptions as gexc |
|
|
|
|
|
@app.route("/api/lepharo_evaluate", methods=["POST"]) |
|
|
def evaluate_participant(): |
|
|
|
|
|
if not request.is_json: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": "Content-Type must be application/json", |
|
|
"requestId": getattr(g, "request_id", "-"), |
|
|
}), 400 |
|
|
|
|
|
data = request.get_json(silent=True) |
|
|
if not isinstance(data, dict): |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": "Invalid JSON body", |
|
|
"requestId": getattr(g, "request_id", "-"), |
|
|
}), 400 |
|
|
|
|
|
participant_id = data.get("participantId") |
|
|
participant_info = data.get("participantInfo") or {} |
|
|
|
|
|
if not participant_id: |
|
|
return jsonify({"status": "error", "message": "Missing participantId"}), 400 |
|
|
if not isinstance(participant_info, dict): |
|
|
return jsonify({"status": "error", "message": "participantInfo must be an object"}), 400 |
|
|
|
|
|
try: |
|
|
evaluator = GenericEvaluator() |
|
|
prompt = evaluator.generate_prompt(participant_info) |
|
|
|
|
|
logger.info("EVAL id=%s participantId=%s prompt_chars=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
participant_id, |
|
|
len(prompt)) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, |
|
|
contents=prompt |
|
|
) |
|
|
|
|
|
txt = getattr(response, "text", "") or "" |
|
|
logger.info("EVAL id=%s participantId=%s gemini_text_chars=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
participant_id, |
|
|
len(txt)) |
|
|
|
|
|
evaluation = evaluator.parse_gemini_response(txt) |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(evaluation, dict) and evaluation.get("error"): |
|
|
logger.warning("EVAL_PARSE_FAIL id=%s participantId=%s err=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
participant_id, |
|
|
evaluation.get("error")) |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"participantId": participant_id, |
|
|
"message": "Model output could not be parsed/validated", |
|
|
"details": evaluation, |
|
|
"requestId": getattr(g, "request_id", "-"), |
|
|
}), 502 |
|
|
|
|
|
return jsonify({ |
|
|
"status": "success", |
|
|
"participantId": participant_id, |
|
|
"evaluation": evaluation, |
|
|
"requestId": getattr(g, "request_id", "-"), |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logger.exception("EVAL_FAIL id=%s participantId=%s", |
|
|
getattr(g, "request_id", "-"), |
|
|
participant_id) |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"participantId": participant_id, |
|
|
"message": "Evaluation failed", |
|
|
"requestId": getattr(g, "request_id", "-"), |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/lepharo_batch-evaluate', methods=['POST']) |
|
|
def lepharo_batch_evaluate(): |
|
|
try: |
|
|
participants = request.json.get('participants', []) |
|
|
results = [] |
|
|
|
|
|
evaluator = LepharoEvaluator() |
|
|
|
|
|
for item in participants: |
|
|
participant_id = item.get("participantId") |
|
|
participant_info = item.get("participantInfo", {}) |
|
|
prompt = evaluator.generate_prompt(participant_info) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model_name, |
|
|
contents=prompt |
|
|
) |
|
|
|
|
|
evaluation = evaluator.parse_gemini_response(response.text) |
|
|
|
|
|
results.append({ |
|
|
"participantId": participant_id, |
|
|
"evaluation": evaluation |
|
|
}) |
|
|
|
|
|
return jsonify({ |
|
|
"status": "success", |
|
|
"evaluations": results |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/lepharo_shortlist', methods=['GET']) |
|
|
def lepharo_get_shortlist(): |
|
|
try: |
|
|
|
|
|
return jsonify({ |
|
|
"status": "success", |
|
|
"shortlist": [] |
|
|
}) |
|
|
except Exception as e: |
|
|
return jsonify({ |
|
|
"status": "error", |
|
|
"message": str(e) |
|
|
}), 500 |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.run(host="0.0.0.0", port=7860, debug=True) |