Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -22,7 +22,6 @@ def configure_gemini(api_key):
|
|
| 22 |
"""
|
| 23 |
st.info("Configuring Gemini API for transaction extraction...")
|
| 24 |
genai.configure(api_key=api_key)
|
| 25 |
-
# Using the model specified by the user for this task
|
| 26 |
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
|
| 27 |
|
| 28 |
def configure_gemini1(api_key):
|
|
@@ -31,7 +30,6 @@ def configure_gemini1(api_key):
|
|
| 31 |
"""
|
| 32 |
st.info("Configuring Gemini API for report generation...")
|
| 33 |
genai.configure(api_key=api_key)
|
| 34 |
-
# Using the state-of-the-art model for high-quality report formatting
|
| 35 |
return genai.GenerativeModel('gemini-2.5-pro')
|
| 36 |
|
| 37 |
def read_pdf_pages(file_obj):
|
|
@@ -77,12 +75,10 @@ def process_with_gemini(model, text):
|
|
| 77 |
}"""
|
| 78 |
try:
|
| 79 |
response = model.generate_content([prompt, text])
|
| 80 |
-
time.sleep(6)
|
| 81 |
return response.text
|
| 82 |
except exceptions.GoogleAPICallError as e:
|
| 83 |
st.error(f"A Google API call error occurred during transaction extraction: {e}")
|
| 84 |
-
if "context length" in str(e):
|
| 85 |
-
st.warning("The text on a single PDF page may be too long for the extraction model.")
|
| 86 |
return None
|
| 87 |
except Exception as e:
|
| 88 |
st.error(f"An unexpected error occurred during Gemini transaction extraction: {e}")
|
|
@@ -90,134 +86,81 @@ def process_with_gemini(model, text):
|
|
| 90 |
|
| 91 |
def process_pdf_pages(model, pdf_reader, total_pages, progress_callback=None):
|
| 92 |
all_transactions = []
|
| 93 |
-
st.info(f"Starting page-by-page PDF processing for {total_pages} pages...")
|
| 94 |
-
|
| 95 |
for page_num in range(total_pages):
|
| 96 |
if progress_callback:
|
| 97 |
progress_callback(page_num / total_pages, f"Processing page {page_num + 1} of {total_pages}")
|
| 98 |
-
|
| 99 |
page_text = extract_page_text(pdf_reader, page_num)
|
| 100 |
if not page_text.strip():
|
| 101 |
continue
|
| 102 |
-
|
| 103 |
-
st.info(f"Sending page {page_num + 1} text to Gemini for transaction extraction...")
|
| 104 |
json_response = process_with_gemini(model, page_text)
|
| 105 |
-
|
| 106 |
if json_response:
|
| 107 |
-
# A more robust regex to find the JSON block
|
| 108 |
match = re.search(r'\{.*\}', json_response, re.DOTALL)
|
| 109 |
if not match:
|
| 110 |
-
st.warning(f"No valid JSON object found in Gemini response for page {page_num + 1}.")
|
| 111 |
continue
|
| 112 |
-
|
| 113 |
json_str = match.group(0)
|
| 114 |
try:
|
| 115 |
data = json.loads(json_str)
|
| 116 |
transactions = data.get('transactions', [])
|
| 117 |
if transactions:
|
| 118 |
-
st.info(f"Successfully extracted {len(transactions)} transactions from page {page_num + 1}.")
|
| 119 |
all_transactions.extend(transactions)
|
| 120 |
except json.JSONDecodeError:
|
| 121 |
-
st.error(f"Failed to decode JSON from Gemini response for page {page_num + 1}.")
|
| 122 |
continue
|
| 123 |
-
else:
|
| 124 |
-
st.warning(f"Gemini returned no response for page {page_num + 1}.")
|
| 125 |
-
|
| 126 |
-
st.info(f"Finished processing all pages. Total transactions extracted: {len(all_transactions)}.")
|
| 127 |
return all_transactions
|
| 128 |
|
| 129 |
-
|
| 130 |
def aggregate_financial_data(transactions: list, statement_type: str):
|
| 131 |
-
"""
|
| 132 |
-
Aggregates transaction data using Pandas for high performance and accuracy.
|
| 133 |
-
This function does the heavy lifting locally, preparing a small summary for the LLM.
|
| 134 |
-
This version includes robust cleaning of the 'Amount' column.
|
| 135 |
-
"""
|
| 136 |
st.info(f"Performing local financial aggregation for {len(transactions)} transactions...")
|
| 137 |
if not transactions:
|
| 138 |
-
st.warning("No transactions to aggregate.")
|
| 139 |
return None
|
| 140 |
-
|
| 141 |
df = pd.DataFrame(transactions)
|
| 142 |
-
|
| 143 |
-
# --- Robust Data Cleaning and Preparation ---
|
| 144 |
if 'Amount' not in df.columns:
|
| 145 |
-
st.error("'Amount' column not found in the transaction data. Cannot perform aggregation.")
|
| 146 |
return None
|
| 147 |
-
|
| 148 |
-
# 1. Ensure the 'Amount' column is treated as a string to use string operations.
|
| 149 |
-
df['Amount'] = df['Amount'].astype(str)
|
| 150 |
-
|
| 151 |
-
# 2. Use a regular expression to remove any character that is NOT a digit or a decimal point.
|
| 152 |
-
# This handles currency symbols, commas, spaces, etc.
|
| 153 |
-
df['Amount'] = df['Amount'].str.replace(r'[^\d.]', '', regex=True)
|
| 154 |
-
|
| 155 |
-
# 3. Now, it's safe to convert the cleaned string to a numeric type.
|
| 156 |
-
# Coerce errors will handle any empty strings that might result from the cleaning.
|
| 157 |
df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce').fillna(0)
|
| 158 |
-
|
| 159 |
-
# 4. Ensure 'Type' column is standardized to lowercase for consistent filtering.
|
| 160 |
df['Type'] = df['Type'].str.lower()
|
| 161 |
-
|
| 162 |
-
# --- Core Financial Calculations ---
|
| 163 |
total_income = df[df['Type'] == 'income']['Amount'].sum()
|
| 164 |
total_expenses = df[df['Type'] == 'expense']['Amount'].sum()
|
| 165 |
net_position = total_income - total_expenses
|
| 166 |
-
|
| 167 |
-
# --- Build the Aggregated Data Structure ---
|
| 168 |
aggregated_data = {
|
| 169 |
"total_income": round(total_income, 2),
|
| 170 |
"total_expenses": round(total_expenses, 2),
|
| 171 |
"net_position": round(net_position, 2),
|
| 172 |
"transaction_count": len(df)
|
| 173 |
}
|
| 174 |
-
|
| 175 |
-
# --- Statement-Specific Aggregations ---
|
| 176 |
if statement_type == "Income Statement":
|
| 177 |
-
expense_breakdown = df[df['Type'] == 'expense'].groupby('Category_of_expense')['Amount'].sum().round(2).to_dict()
|
| 178 |
-
aggregated_data["
|
| 179 |
-
income_breakdown = df[df['Type'] == 'income'].groupby('Customer_name')['Amount'].sum().round(2).to_dict()
|
| 180 |
-
aggregated_data["income_breakdown"] = income_breakdown
|
| 181 |
-
elif statement_type == "Cashflow Statement":
|
| 182 |
-
aggregated_data["operating_cash_flow"] = round(net_position, 2)
|
| 183 |
-
aggregated_data["cash_inflows"] = round(total_income, 2)
|
| 184 |
-
aggregated_data["cash_outflows"] = round(total_expenses, 2)
|
| 185 |
-
elif statement_type == "Balance Sheet":
|
| 186 |
-
aggregated_data["notes"] = "Balance Sheets require asset and liability balances, not just transaction flows. This data can only show the net change in cash over the period."
|
| 187 |
-
|
| 188 |
st.success("Local financial aggregation complete.")
|
| 189 |
return aggregated_data
|
| 190 |
|
| 191 |
def generate_financial_report(model, aggregated_data, start_date, end_date, statement_type):
|
| 192 |
"""
|
| 193 |
-
Generates a financial report
|
| 194 |
-
|
|
|
|
| 195 |
"""
|
| 196 |
st.info(f"Preparing to generate {statement_type} with pre-aggregated data...")
|
| 197 |
-
prompt = f"""
|
| 198 |
-
Based on the following pre-aggregated financial summary JSON data:
|
| 199 |
-
{json.dumps(aggregated_data, indent=2)}
|
| 200 |
-
|
| 201 |
-
Generate a detailed {statement_type} report for the period from {start_date.strftime('%d/%m/%Y')} to {end_date.strftime('%d/%m/%Y')}. Present the report in a standard accounting format relevant to South Africa, but with improved readability and visual appeal.
|
| 202 |
|
| 203 |
-
|
|
|
|
|
|
|
| 204 |
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
Consistent Formatting: Maintain consistent formatting for monetary values (using "R" for South African Rand), dates, and alignment.
|
| 208 |
-
Totals and Subtotals: Clearly display totals for relevant categories and subtotals where appropriate.
|
| 209 |
-
Descriptive Line Items: Use the provided aggregated data to create clear line items.
|
| 210 |
-
Key Insights: Include a brief section (e.g., "Key Highlights" or "Summary") that identifies significant trends or key performance indicators derived from the provided summary data.
|
| 211 |
-
Concise Summary: Provide a concluding summary paragraph that encapsulates the overall financial picture.
|
| 212 |
-
Special Case for Balance Sheet: If the request is for a "Balance Sheet," explain professionally that a balance sheet cannot be generated from transaction data alone, as it requires a snapshot of assets, liabilities, and equity. Then, present the available cash flow information as a helpful alternative.
|
| 213 |
|
| 214 |
-
|
| 215 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 216 |
"""
|
| 217 |
try:
|
| 218 |
st.info("Sending request to Gemini for final report formatting...")
|
| 219 |
response = model.generate_content([prompt])
|
| 220 |
-
time.sleep(7) # Retaining original sleep time
|
| 221 |
st.success("Successfully received formatted financial report from Gemini.")
|
| 222 |
return response.text
|
| 223 |
except exceptions.GoogleAPICallError as e:
|
|
@@ -227,78 +170,11 @@ Do not name the company if a name is not there; refer to it as "The Business". R
|
|
| 227 |
st.error(f"An unexpected error occurred during Gemini report generation: {e}")
|
| 228 |
return None
|
| 229 |
|
| 230 |
-
# --- PDF Generation Logic (Unaltered as per your request) ---
|
| 231 |
-
class PDF_Generator(FPDF):
|
| 232 |
-
def add_html_element(self, tag, styles):
|
| 233 |
-
text = tag.get_text()
|
| 234 |
-
tag_name = tag.name.lower()
|
| 235 |
-
current_style = ''
|
| 236 |
-
if 'b' in styles or 'strong' in styles: current_style += 'B'
|
| 237 |
-
if 'i' in styles or 'em' in styles: current_style += 'I'
|
| 238 |
-
if not current_style: self.set_font('helvetica', '', self.font_size_pt)
|
| 239 |
-
if tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
|
| 240 |
-
level = int(tag_name[1])
|
| 241 |
-
font_size = {1: 18, 2: 16, 3: 14, 4: 12, 5: 11, 6: 10}.get(level, 10)
|
| 242 |
-
self.set_font('helvetica', 'B', font_size)
|
| 243 |
-
self.multi_cell(0, font_size * 0.5, text, align='L')
|
| 244 |
-
self.ln(font_size * 0.3)
|
| 245 |
-
self.set_font('helvetica', '', 10)
|
| 246 |
-
elif tag_name == 'p':
|
| 247 |
-
self.set_font('helvetica', current_style, 10)
|
| 248 |
-
self.multi_cell(0, 5, text, align='L')
|
| 249 |
-
self.ln(3)
|
| 250 |
-
elif tag_name == 'ul':
|
| 251 |
-
self.ln(2)
|
| 252 |
-
for item in tag.find_all('li', recursive=False):
|
| 253 |
-
self.set_font('helvetica', '', 10)
|
| 254 |
-
item_text = item.get_text()
|
| 255 |
-
self.cell(5, 5, chr(127))
|
| 256 |
-
self.multi_cell(0, 5, item_text, align='L')
|
| 257 |
-
self.ln(1)
|
| 258 |
-
self.ln(3)
|
| 259 |
-
elif tag_name == 'table':
|
| 260 |
-
self.ln(5)
|
| 261 |
-
self.process_table(tag)
|
| 262 |
-
self.ln(5)
|
| 263 |
-
elif tag_name == 'br': self.ln(5)
|
| 264 |
-
elif tag_name == 'hr':
|
| 265 |
-
self.ln(2)
|
| 266 |
-
self.line(self.get_x(), self.get_y(), self.w - self.r_margin, self.get_y())
|
| 267 |
-
self.ln(4)
|
| 268 |
-
else:
|
| 269 |
-
if text.strip():
|
| 270 |
-
self.set_font('helvetica', current_style, 10)
|
| 271 |
-
self.multi_cell(0, 5, text, align='L')
|
| 272 |
-
self.ln(1)
|
| 273 |
-
|
| 274 |
-
def process_table(self, table_tag):
|
| 275 |
-
rows = table_tag.find_all('tr')
|
| 276 |
-
if not rows: return
|
| 277 |
-
header_cells = rows[0].find_all(['th', 'td'])
|
| 278 |
-
num_cols = len(header_cells)
|
| 279 |
-
if num_cols == 0: return
|
| 280 |
-
effective_width = self.w - self.l_margin - self.r_margin
|
| 281 |
-
col_width = effective_width / num_cols
|
| 282 |
-
default_cell_height = 6
|
| 283 |
-
is_first_row = True
|
| 284 |
-
for row in rows:
|
| 285 |
-
cells = row.find_all(['th', 'td'])
|
| 286 |
-
if len(cells) != num_cols: continue
|
| 287 |
-
is_header_row = all(c.name == 'th' for c in cells) or (is_first_row and any(c.name == 'th' for c in cells))
|
| 288 |
-
for i, cell in enumerate(cells):
|
| 289 |
-
cell_text = cell.get_text().strip()
|
| 290 |
-
if is_header_row:
|
| 291 |
-
self.set_font('helvetica', 'B', 9)
|
| 292 |
-
self.set_fill_color(230, 230, 230)
|
| 293 |
-
fill = True
|
| 294 |
-
else:
|
| 295 |
-
self.set_font('helvetica', '', 9)
|
| 296 |
-
fill = False
|
| 297 |
-
self.multi_cell(col_width, default_cell_height, cell_text, border=1, align='L', fill=fill, new_x="RIGHT", new_y="TOP")
|
| 298 |
-
self.ln(default_cell_height)
|
| 299 |
-
is_first_row = False
|
| 300 |
-
|
| 301 |
def create_pdf_report(report_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 302 |
if not report_text:
|
| 303 |
st.warning("Report text is empty, skipping PDF generation.")
|
| 304 |
raise ValueError("Input report_text cannot be empty.")
|
|
@@ -307,16 +183,69 @@ def create_pdf_report(report_text):
|
|
| 307 |
cleaned_md = re.sub(r'```markdown|```', '', report_text, flags=re.MULTILINE).strip()
|
| 308 |
html_content = markdown.markdown(cleaned_md, extensions=['tables'])
|
| 309 |
soup = BeautifulSoup(html_content, 'html.parser')
|
| 310 |
-
|
|
|
|
| 311 |
pdf.set_auto_page_break(auto=True, margin=15)
|
| 312 |
pdf.set_left_margin(15)
|
| 313 |
pdf.set_right_margin(15)
|
| 314 |
pdf.add_page()
|
| 315 |
-
|
| 316 |
-
for element in soup.find_all(
|
| 317 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 318 |
st.info("Content added to PDF. Outputting PDF to buffer...")
|
| 319 |
-
|
|
|
|
|
|
|
|
|
|
| 320 |
st.success("PDF report generated successfully.")
|
| 321 |
return BytesIO(pdf_output)
|
| 322 |
except Exception as e:
|
|
@@ -340,7 +269,6 @@ def main():
|
|
| 340 |
if input_type == "Bulk Bank Statement Upload":
|
| 341 |
uploaded_files = st.file_uploader("Upload PDF bank statements", type="pdf", accept_multiple_files=True)
|
| 342 |
if uploaded_files:
|
| 343 |
-
st.info(f"User uploaded {len(uploaded_files)} PDF file(s).")
|
| 344 |
model = configure_gemini(api_key)
|
| 345 |
progress_bar = st.progress(0)
|
| 346 |
all_transactions = []
|
|
@@ -357,26 +285,20 @@ def main():
|
|
| 357 |
elif input_type == "CSV Upload":
|
| 358 |
uploaded_csv = st.file_uploader("Upload CSV of transactions", type="csv")
|
| 359 |
if uploaded_csv:
|
| 360 |
-
st.info(f"User uploaded CSV file: {uploaded_csv.name}.")
|
| 361 |
df = pd.read_csv(uploaded_csv)
|
| 362 |
df = df.loc[:, ~df.columns.str.startswith('Unnamed:')]
|
| 363 |
st.session_state['transactions'] = df.to_dict(orient='records')
|
| 364 |
st.success(f"Successfully loaded {len(st.session_state['transactions'])} transactions from CSV.")
|
| 365 |
|
| 366 |
if st.session_state['transactions']:
|
| 367 |
-
st.info("Consolidating and displaying all extracted transactions.")
|
| 368 |
df = pd.DataFrame(st.session_state['transactions'])
|
| 369 |
df['Date'] = pd.to_datetime(df['Date'], errors='coerce', dayfirst=True)
|
| 370 |
df.dropna(subset=['Date'], inplace=True)
|
| 371 |
if not df.empty:
|
| 372 |
-
min_date = df['Date'].min().date()
|
| 373 |
-
max_date = df['Date'].max().date()
|
| 374 |
-
st.session_state['min_date'] = min_date
|
| 375 |
-
st.session_state['max_date'] = max_date
|
| 376 |
st.write("### Extracted Transactions")
|
| 377 |
st.dataframe(df.astype(str))
|
| 378 |
-
else:
|
| 379 |
-
st.info("No transactions loaded yet. Upload files to begin.")
|
| 380 |
|
| 381 |
st.write("### Generate Financial Report")
|
| 382 |
col1, col2 = st.columns(2)
|
|
@@ -384,7 +306,7 @@ def main():
|
|
| 384 |
start_date = st.date_input("Start Date", st.session_state['min_date'])
|
| 385 |
with col2:
|
| 386 |
end_date = st.date_input("End Date", st.session_state['max_date'])
|
| 387 |
-
statement_type = st.selectbox("Select Financial Statement", ["Income Statement"
|
| 388 |
|
| 389 |
if st.button("Generate Financial Report"):
|
| 390 |
if not st.session_state['transactions']:
|
|
@@ -418,8 +340,6 @@ def main():
|
|
| 418 |
file_name=f"{statement_type.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d')}.pdf",
|
| 419 |
mime="application/pdf"
|
| 420 |
)
|
| 421 |
-
else:
|
| 422 |
-
st.error("Failed to generate the financial report from the aggregated data.")
|
| 423 |
except Exception as e:
|
| 424 |
st.error(f"An unexpected error occurred during the report generation process: {e}")
|
| 425 |
st.exception(e)
|
|
|
|
| 22 |
"""
|
| 23 |
st.info("Configuring Gemini API for transaction extraction...")
|
| 24 |
genai.configure(api_key=api_key)
|
|
|
|
| 25 |
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
|
| 26 |
|
| 27 |
def configure_gemini1(api_key):
|
|
|
|
| 30 |
"""
|
| 31 |
st.info("Configuring Gemini API for report generation...")
|
| 32 |
genai.configure(api_key=api_key)
|
|
|
|
| 33 |
return genai.GenerativeModel('gemini-2.5-pro')
|
| 34 |
|
| 35 |
def read_pdf_pages(file_obj):
|
|
|
|
| 75 |
}"""
|
| 76 |
try:
|
| 77 |
response = model.generate_content([prompt, text])
|
| 78 |
+
time.sleep(6)
|
| 79 |
return response.text
|
| 80 |
except exceptions.GoogleAPICallError as e:
|
| 81 |
st.error(f"A Google API call error occurred during transaction extraction: {e}")
|
|
|
|
|
|
|
| 82 |
return None
|
| 83 |
except Exception as e:
|
| 84 |
st.error(f"An unexpected error occurred during Gemini transaction extraction: {e}")
|
|
|
|
| 86 |
|
| 87 |
def process_pdf_pages(model, pdf_reader, total_pages, progress_callback=None):
|
| 88 |
all_transactions = []
|
|
|
|
|
|
|
| 89 |
for page_num in range(total_pages):
|
| 90 |
if progress_callback:
|
| 91 |
progress_callback(page_num / total_pages, f"Processing page {page_num + 1} of {total_pages}")
|
|
|
|
| 92 |
page_text = extract_page_text(pdf_reader, page_num)
|
| 93 |
if not page_text.strip():
|
| 94 |
continue
|
|
|
|
|
|
|
| 95 |
json_response = process_with_gemini(model, page_text)
|
|
|
|
| 96 |
if json_response:
|
|
|
|
| 97 |
match = re.search(r'\{.*\}', json_response, re.DOTALL)
|
| 98 |
if not match:
|
|
|
|
| 99 |
continue
|
|
|
|
| 100 |
json_str = match.group(0)
|
| 101 |
try:
|
| 102 |
data = json.loads(json_str)
|
| 103 |
transactions = data.get('transactions', [])
|
| 104 |
if transactions:
|
|
|
|
| 105 |
all_transactions.extend(transactions)
|
| 106 |
except json.JSONDecodeError:
|
|
|
|
| 107 |
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
| 108 |
return all_transactions
|
| 109 |
|
|
|
|
| 110 |
def aggregate_financial_data(transactions: list, statement_type: str):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 111 |
st.info(f"Performing local financial aggregation for {len(transactions)} transactions...")
|
| 112 |
if not transactions:
|
|
|
|
| 113 |
return None
|
|
|
|
| 114 |
df = pd.DataFrame(transactions)
|
|
|
|
|
|
|
| 115 |
if 'Amount' not in df.columns:
|
|
|
|
| 116 |
return None
|
| 117 |
+
df['Amount'] = df['Amount'].astype(str).str.replace(r'[^\d.]', '', regex=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 118 |
df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce').fillna(0)
|
|
|
|
|
|
|
| 119 |
df['Type'] = df['Type'].str.lower()
|
|
|
|
|
|
|
| 120 |
total_income = df[df['Type'] == 'income']['Amount'].sum()
|
| 121 |
total_expenses = df[df['Type'] == 'expense']['Amount'].sum()
|
| 122 |
net_position = total_income - total_expenses
|
|
|
|
|
|
|
| 123 |
aggregated_data = {
|
| 124 |
"total_income": round(total_income, 2),
|
| 125 |
"total_expenses": round(total_expenses, 2),
|
| 126 |
"net_position": round(net_position, 2),
|
| 127 |
"transaction_count": len(df)
|
| 128 |
}
|
|
|
|
|
|
|
| 129 |
if statement_type == "Income Statement":
|
| 130 |
+
aggregated_data["expense_breakdown"] = df[df['Type'] == 'expense'].groupby('Category_of_expense')['Amount'].sum().round(2).to_dict()
|
| 131 |
+
aggregated_data["income_breakdown"] = df[df['Type'] == 'income'].groupby('Customer_name')['Amount'].sum().round(2).to_dict()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 132 |
st.success("Local financial aggregation complete.")
|
| 133 |
return aggregated_data
|
| 134 |
|
| 135 |
def generate_financial_report(model, aggregated_data, start_date, end_date, statement_type):
|
| 136 |
"""
|
| 137 |
+
Generates a financial report using a simplified, high-level prompt that
|
| 138 |
+
trusts the model to create the correct structure and avoids using any
|
| 139 |
+
Markdown characters that could break rendering.
|
| 140 |
"""
|
| 141 |
st.info(f"Preparing to generate {statement_type} with pre-aggregated data...")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 142 |
|
| 143 |
+
# This is the final, simplified, high-level prompt with no special characters.
|
| 144 |
+
prompt = f"""
|
| 145 |
+
You are an expert financial analyst. Your task is to generate a professional Income Statement in Markdown format using the pre-aggregated JSON data provided below.
|
| 146 |
|
| 147 |
+
JSON Data:
|
| 148 |
+
{json.dumps(aggregated_data, indent=2)}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 149 |
|
| 150 |
+
Instructions:
|
| 151 |
+
Your response must be a complete financial report in Markdown.
|
| 152 |
+
The main title of the report is "Income Statement".
|
| 153 |
+
The reporting period is from {start_date.strftime('%d %B %Y')} to {end_date.strftime('%d %B %Y')}.
|
| 154 |
+
The currency is South African Rand (ZAR).
|
| 155 |
+
The report must contain sections for Revenue, Operating Expenses, and Net Income or Loss. Each of these sections must be a clear table.
|
| 156 |
+
The report must also include a "Key Highlights" section with bullet points and a final "Summary" paragraph.
|
| 157 |
+
Use the provided JSON data for all financial figures.
|
| 158 |
+
For the Net Income or Loss table, if the net position is negative, display the amount in parentheses.
|
| 159 |
+
Separate the major sections with a horizontal rule.
|
| 160 |
"""
|
| 161 |
try:
|
| 162 |
st.info("Sending request to Gemini for final report formatting...")
|
| 163 |
response = model.generate_content([prompt])
|
|
|
|
| 164 |
st.success("Successfully received formatted financial report from Gemini.")
|
| 165 |
return response.text
|
| 166 |
except exceptions.GoogleAPICallError as e:
|
|
|
|
| 170 |
st.error(f"An unexpected error occurred during Gemini report generation: {e}")
|
| 171 |
return None
|
| 172 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
def create_pdf_report(report_text):
|
| 174 |
+
"""
|
| 175 |
+
Creates a PDF from markdown text. Includes the critical fix for the
|
| 176 |
+
'bytearray' object has no attribute 'encode' error.
|
| 177 |
+
"""
|
| 178 |
if not report_text:
|
| 179 |
st.warning("Report text is empty, skipping PDF generation.")
|
| 180 |
raise ValueError("Input report_text cannot be empty.")
|
|
|
|
| 183 |
cleaned_md = re.sub(r'```markdown|```', '', report_text, flags=re.MULTILINE).strip()
|
| 184 |
html_content = markdown.markdown(cleaned_md, extensions=['tables'])
|
| 185 |
soup = BeautifulSoup(html_content, 'html.parser')
|
| 186 |
+
|
| 187 |
+
pdf = FPDF()
|
| 188 |
pdf.set_auto_page_break(auto=True, margin=15)
|
| 189 |
pdf.set_left_margin(15)
|
| 190 |
pdf.set_right_margin(15)
|
| 191 |
pdf.add_page()
|
| 192 |
+
|
| 193 |
+
for element in soup.find_all(True):
|
| 194 |
+
if element.name in ['h1', 'h2', 'h3']:
|
| 195 |
+
level = int(element.name[1])
|
| 196 |
+
font_size = {1: 16, 2: 14, 3: 12}.get(level)
|
| 197 |
+
pdf.set_font('helvetica', 'B', font_size)
|
| 198 |
+
pdf.multi_cell(0, 10, element.get_text().strip())
|
| 199 |
+
pdf.ln(level * 2)
|
| 200 |
+
elif element.name == 'p':
|
| 201 |
+
pdf.set_font('helvetica', '', 11)
|
| 202 |
+
pdf.multi_cell(0, 6, element.get_text().strip())
|
| 203 |
+
pdf.ln(4)
|
| 204 |
+
elif element.name == 'i':
|
| 205 |
+
pdf.set_font('helvetica', 'I', 11)
|
| 206 |
+
pdf.multi_cell(0, 6, element.get_text().strip())
|
| 207 |
+
pdf.ln(4)
|
| 208 |
+
elif element.name == 'hr':
|
| 209 |
+
pdf.line(pdf.get_x(), pdf.get_y(), pdf.w - pdf.r_margin, pdf.get_y())
|
| 210 |
+
pdf.ln(5)
|
| 211 |
+
elif element.name == 'ul':
|
| 212 |
+
pdf.ln(2)
|
| 213 |
+
for li in element.find_all('li'):
|
| 214 |
+
pdf.set_font('helvetica', '', 11)
|
| 215 |
+
pdf.multi_cell(0, 5, f" • {li.get_text().strip()}")
|
| 216 |
+
pdf.ln(1)
|
| 217 |
+
pdf.ln(4)
|
| 218 |
+
elif element.name == 'table':
|
| 219 |
+
header = [th.get_text().strip() for th in element.find_all('th')]
|
| 220 |
+
rows = [[td.get_text().strip() for td in tr.find_all('td')] for tr in element.find_all('tr')[1:]]
|
| 221 |
+
|
| 222 |
+
if header:
|
| 223 |
+
pdf.set_font('helvetica', 'B', 10)
|
| 224 |
+
pdf.set_fill_color(230, 230, 230)
|
| 225 |
+
col_widths = [ (pdf.w - pdf.l_margin - pdf.r_margin) * 0.6, (pdf.w - pdf.l_margin - pdf.r_margin) * 0.4 ]
|
| 226 |
+
for i, header_text in enumerate(header):
|
| 227 |
+
pdf.cell(col_widths[i], 8, header_text, border=1, fill=True, align='C')
|
| 228 |
+
pdf.ln()
|
| 229 |
+
|
| 230 |
+
pdf.set_font('helvetica', '', 10)
|
| 231 |
+
for row in rows:
|
| 232 |
+
is_total_row = any('Total' in cell for cell in row)
|
| 233 |
+
if is_total_row:
|
| 234 |
+
pdf.set_font('helvetica', 'B', 10)
|
| 235 |
+
|
| 236 |
+
pdf.cell(col_widths[0], 7, row[0], border=1)
|
| 237 |
+
pdf.cell(col_widths[1], 7, row[1], border=1, align='R')
|
| 238 |
+
pdf.ln()
|
| 239 |
+
|
| 240 |
+
if is_total_row:
|
| 241 |
+
pdf.set_font('helvetica', '', 10)
|
| 242 |
+
pdf.ln(6)
|
| 243 |
+
|
| 244 |
st.info("Content added to PDF. Outputting PDF to buffer...")
|
| 245 |
+
|
| 246 |
+
# --- CRITICAL FIX FOR PDF GENERATION ---
|
| 247 |
+
pdf_output = pdf.output()
|
| 248 |
+
|
| 249 |
st.success("PDF report generated successfully.")
|
| 250 |
return BytesIO(pdf_output)
|
| 251 |
except Exception as e:
|
|
|
|
| 269 |
if input_type == "Bulk Bank Statement Upload":
|
| 270 |
uploaded_files = st.file_uploader("Upload PDF bank statements", type="pdf", accept_multiple_files=True)
|
| 271 |
if uploaded_files:
|
|
|
|
| 272 |
model = configure_gemini(api_key)
|
| 273 |
progress_bar = st.progress(0)
|
| 274 |
all_transactions = []
|
|
|
|
| 285 |
elif input_type == "CSV Upload":
|
| 286 |
uploaded_csv = st.file_uploader("Upload CSV of transactions", type="csv")
|
| 287 |
if uploaded_csv:
|
|
|
|
| 288 |
df = pd.read_csv(uploaded_csv)
|
| 289 |
df = df.loc[:, ~df.columns.str.startswith('Unnamed:')]
|
| 290 |
st.session_state['transactions'] = df.to_dict(orient='records')
|
| 291 |
st.success(f"Successfully loaded {len(st.session_state['transactions'])} transactions from CSV.")
|
| 292 |
|
| 293 |
if st.session_state['transactions']:
|
|
|
|
| 294 |
df = pd.DataFrame(st.session_state['transactions'])
|
| 295 |
df['Date'] = pd.to_datetime(df['Date'], errors='coerce', dayfirst=True)
|
| 296 |
df.dropna(subset=['Date'], inplace=True)
|
| 297 |
if not df.empty:
|
| 298 |
+
st.session_state['min_date'] = df['Date'].min().date()
|
| 299 |
+
st.session_state['max_date'] = df['Date'].max().date()
|
|
|
|
|
|
|
| 300 |
st.write("### Extracted Transactions")
|
| 301 |
st.dataframe(df.astype(str))
|
|
|
|
|
|
|
| 302 |
|
| 303 |
st.write("### Generate Financial Report")
|
| 304 |
col1, col2 = st.columns(2)
|
|
|
|
| 306 |
start_date = st.date_input("Start Date", st.session_state['min_date'])
|
| 307 |
with col2:
|
| 308 |
end_date = st.date_input("End Date", st.session_state['max_date'])
|
| 309 |
+
statement_type = st.selectbox("Select Financial Statement", ["Income Statement"])
|
| 310 |
|
| 311 |
if st.button("Generate Financial Report"):
|
| 312 |
if not st.session_state['transactions']:
|
|
|
|
| 340 |
file_name=f"{statement_type.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d')}.pdf",
|
| 341 |
mime="application/pdf"
|
| 342 |
)
|
|
|
|
|
|
|
| 343 |
except Exception as e:
|
| 344 |
st.error(f"An unexpected error occurred during the report generation process: {e}")
|
| 345 |
st.exception(e)
|