| import pandas as pd
|
| from datetime import datetime, timedelta
|
| from expense_tracker.utils import MongoDBClient
|
| from bson import ObjectId
|
| import pdfplumber
|
| import re
|
| import io
|
| import os
|
| from dotenv import load_dotenv
|
|
|
| load_dotenv()
|
|
|
|
|
| def process_recurring_incomes(user_id):
|
| """
|
| Checks all recurring incomes for a user and creates new transactions
|
| if the recurrence interval has passed since the last run.
|
| """
|
| db = MongoDBClient.get_client()
|
| user = db.users.find_one({'_id': ObjectId(user_id)}, {'financial_data.incomes': 1})
|
|
|
| if not user or 'financial_data' not in user or 'incomes' not in user['financial_data']:
|
| return 0
|
|
|
| incomes = user['financial_data']['incomes']
|
| generated_count = 0
|
| updated_incomes = []
|
|
|
| for income in incomes:
|
| if not income.get('is_recurring'):
|
| updated_incomes.append(income)
|
| continue
|
|
|
| last_date = income.get('last_run_date')
|
| if not last_date:
|
| last_date = income.get('date')
|
|
|
|
|
| if isinstance(last_date, str):
|
| try:
|
| last_date = datetime.strptime(last_date, '%Y-%m-%d')
|
| except:
|
| updated_incomes.append(income)
|
| continue
|
|
|
| interval = income.get('recurrence_interval')
|
| if not interval:
|
| updated_incomes.append(income)
|
| continue
|
|
|
| next_due_date = calculate_next_date(last_date, interval)
|
| current_date = datetime.now()
|
|
|
| new_instances = []
|
| last_inst_date = last_date
|
|
|
|
|
| while next_due_date <= current_date:
|
| new_tx = income.copy()
|
| new_tx['_id'] = ObjectId()
|
| new_tx['date'] = next_due_date
|
| new_tx['is_recurring'] = False
|
| new_tx['recurrence_interval'] = None
|
| new_tx['created_at'] = datetime.now()
|
| new_tx['title'] = f"{income.get('title')} (Recurring)"
|
| new_tx['parent_id'] = str(income.get('_id'))
|
|
|
| new_instances.append(new_tx)
|
| generated_count += 1
|
| last_inst_date = next_due_date
|
| next_due_date = calculate_next_date(last_inst_date, interval)
|
|
|
|
|
| income['last_run_date'] = last_inst_date
|
| updated_incomes.append(income)
|
|
|
|
|
| if new_instances:
|
| updated_incomes.extend(new_instances)
|
|
|
|
|
| if generated_count > 0:
|
| db.users.update_one(
|
| {'_id': ObjectId(user_id)},
|
| {'$set': {'financial_data.incomes': updated_incomes}}
|
| )
|
|
|
| return generated_count
|
|
|
| def calculate_next_date(start_date, interval):
|
| if interval == 'daily':
|
| return start_date + timedelta(days=1)
|
| elif interval == 'weekly':
|
| return start_date + timedelta(weeks=1)
|
| elif interval == 'monthly':
|
|
|
|
|
| return start_date + timedelta(days=30)
|
| elif interval == 'yearly':
|
| return start_date + timedelta(days=365)
|
| return start_date + timedelta(days=36500)
|
|
|
| def parse_and_import_transactions(file, user_id):
|
| """
|
| Parses a CSV or Excel file and imports transaction records (Income/Expense).
|
| Expected columns: Type, Title, Amount, Category, Date
|
| If Type is missing, positive amount = Income, negative amount = Expense (optional logic)
|
| """
|
| try:
|
| if file.name.endswith('.csv'):
|
| df = pd.read_csv(file)
|
| elif file.name.endswith(('.xls', '.xlsx')):
|
| df = pd.read_excel(file)
|
| elif file.name.endswith('.pdf'):
|
| df = parse_pdf_transactions(file)
|
| else:
|
| return 0, "Unsupported file format. Please upload CSV, Excel, or PDF."
|
|
|
| if df is None or df.empty:
|
| return 0, "Could not extract transactions from file."
|
|
|
|
|
| df.columns = [c.lower().strip() for c in df.columns]
|
|
|
|
|
| required_cols = ['title', 'amount', 'date']
|
| missing_cols = [col for col in required_cols if col not in df.columns]
|
| if missing_cols:
|
| return 0, f"Missing columns: {', '.join(missing_cols)}"
|
|
|
| db = MongoDBClient.get_client()
|
| count = 0
|
| income_docs = []
|
| expense_docs = []
|
|
|
| for _, row in df.iterrows():
|
| try:
|
| amount = float(row['amount'])
|
| title = row['title']
|
| category = row.get('category', 'Uncategorized')
|
| date_str = row.get('date')
|
|
|
| try:
|
| date = pd.to_datetime(date_str).to_pydatetime()
|
| except:
|
| date = datetime.now()
|
|
|
|
|
| tran_type = None
|
| if 'type' in df.columns:
|
| val = str(row['type']).lower()
|
| if 'income' in val: tran_type = 'income'
|
| elif 'expense' in val: tran_type = 'expense'
|
|
|
| if not tran_type:
|
| if amount >= 0: tran_type = 'income'
|
| else:
|
| tran_type = 'expense'
|
| amount = abs(amount)
|
|
|
| doc = {
|
| '_id': ObjectId(),
|
| 'title': title,
|
| 'amount': abs(amount),
|
| 'category': category,
|
| 'date': date,
|
| 'created_at': datetime.now(),
|
| 'source': 'import'
|
| }
|
|
|
| if tran_type == 'income':
|
| income_docs.append(doc)
|
| else:
|
| expense_docs.append(doc)
|
|
|
| count += 1
|
| except Exception:
|
| continue
|
|
|
|
|
| all_docs = income_docs + expense_docs
|
| to_categorize = [d for d in all_docs if d.get('category') in ['Uncategorized', '', None]]
|
|
|
| if to_categorize:
|
| try:
|
| from .category_classifier import batch_classify_transactions
|
| batch_input = []
|
| for d in to_categorize:
|
|
|
| d_type = 'Income' if any(id(d) == id(inc) for inc in income_docs) else 'Expense'
|
| batch_input.append({'title': d['title'], 'type': d_type})
|
|
|
| results = batch_classify_transactions(batch_input)
|
| for idx, res in enumerate(results):
|
| to_categorize[idx]['category'] = res['category']
|
| except Exception as e:
|
| print(f"Error during import batch categorization: {e}")
|
|
|
|
|
| if income_docs:
|
| db.users.update_one(
|
| {'_id': ObjectId(user_id)},
|
| {'$push': {'financial_data.incomes': {'$each': income_docs}}}
|
| )
|
|
|
| if expense_docs:
|
| db.users.update_one(
|
| {'_id': ObjectId(user_id)},
|
| {'$push': {'financial_data.expenses': {'$each': expense_docs}}}
|
| )
|
|
|
| return count, None
|
| except Exception as e:
|
| import traceback
|
| traceback.print_exc()
|
| return 0, str(e)
|
|
|
| def parse_pdf_transactions(file):
|
| """
|
| Extracts transactions from PDF using deterministic table extraction (pdfplumber).
|
| Fallback to text regex if tables are not found (TODO).
|
| """""
|
| try:
|
| transactions = []
|
|
|
|
|
| with pdfplumber.open(file) as pdf:
|
| for page in pdf.pages:
|
| tables = page.extract_tables()
|
|
|
| for table in tables:
|
| if not table: continue
|
|
|
|
|
|
|
| header_map = {}
|
| header_row_idx = -1
|
|
|
| for idx, row in enumerate(table):
|
|
|
| row_text = [str(cell).lower().strip() if cell else '' for cell in row]
|
|
|
|
|
| if any(k in row_text for k in ['date', 'txn date', 'transaction date']):
|
| header_row_idx = idx
|
|
|
|
|
| for col_idx, cell in enumerate(row_text):
|
| if 'date' in cell: header_map['date'] = col_idx
|
| elif any(k in cell for k in ['title', 'description', 'particulars', 'details', 'narrative', 'transaction']): header_map['title'] = col_idx
|
| elif any(k in cell for k in ['debit', 'withdrawal', 'dr']): header_map['debit'] = col_idx
|
| elif any(k in cell for k in ['credit', 'deposit', 'cr']): header_map['credit'] = col_idx
|
| elif 'amount' in cell: header_map['amount'] = col_idx
|
| elif 'type' in cell: header_map['type'] = col_idx
|
| elif 'category' in cell: header_map['category'] = col_idx
|
| elif 'balance' in cell: header_map['balance'] = col_idx
|
|
|
| break
|
|
|
| if header_row_idx == -1:
|
|
|
| continue
|
|
|
|
|
| for row in table[header_row_idx+1:]:
|
| if not row: continue
|
|
|
| try:
|
|
|
| date_str = None
|
| if 'date' in header_map and header_map['date'] < len(row):
|
| date_str = row[header_map['date']]
|
|
|
| if not date_str: continue
|
|
|
|
|
|
|
| date_str = str(date_str).replace('\n', ' ').strip()
|
| date_obj = None
|
| for fmt in ['%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d', '%d %b %Y', '%m/%d/%Y']:
|
| try:
|
| date_obj = datetime.strptime(date_str, fmt)
|
| break
|
| except: pass
|
|
|
| if not date_obj: continue
|
|
|
|
|
| title = "Transaction"
|
| if 'title' in header_map and header_map['title'] < len(row):
|
| t_val = row[header_map['title']]
|
| if t_val: title = str(t_val).replace('\n', ' ').strip()
|
|
|
|
|
| category = "Uncategorized"
|
| if 'category' in header_map and header_map['category'] < len(row):
|
| c_val = row[header_map['category']]
|
| if c_val: category = str(c_val).replace('\n', ' ').strip()
|
|
|
|
|
| amount = 0.0
|
| tran_type = 'expense'
|
|
|
|
|
| if 'debit' in header_map and 'credit' in header_map:
|
| debit_val = row[header_map['debit']] if header_map['debit'] < len(row) else None
|
| credit_val = row[header_map['credit']] if header_map['credit'] < len(row) else None
|
|
|
|
|
| def clean_amt(val):
|
| if not val: return 0.0
|
| v = str(val).replace(',', '').replace('$', '').replace('£', '').replace(' ', '')
|
| if not v: return 0.0
|
| try: return float(v)
|
| except: return 0.0
|
|
|
| d_amt = clean_amt(debit_val)
|
| c_amt = clean_amt(credit_val)
|
|
|
| if c_amt > 0:
|
| amount = c_amt
|
| tran_type = 'income'
|
| elif d_amt > 0:
|
| amount = d_amt
|
| tran_type = 'expense'
|
| else:
|
| continue
|
|
|
|
|
| elif 'amount' in header_map and header_map['amount'] < len(row):
|
| amt_val = row[header_map['amount']]
|
| if not amt_val: continue
|
|
|
|
|
| s_val = str(amt_val).replace(',', '').replace('$', '').replace(' ', '')
|
| is_neg = False
|
| if '(' in s_val and ')' in s_val:
|
| is_neg = True
|
| s_val = s_val.replace('(', '').replace(')', '')
|
| elif s_val.startswith('-'):
|
| is_neg = True
|
| s_val = s_val.replace('-', '')
|
|
|
| try:
|
| amount = float(s_val)
|
| except: continue
|
|
|
| if is_neg:
|
| tran_type = 'expense'
|
| else:
|
|
|
|
|
| if 'type' in header_map:
|
| t_col = str(row[header_map['type']]).lower()
|
| if any(k in t_col for k in ['cr', 'credit', 'deposit', 'income']):
|
| tran_type = 'income'
|
| elif any(k in t_col for k in ['dr', 'debit', 'withdrawal', 'expense']):
|
| tran_type = 'expense'
|
| else:
|
| tran_type = 'expense'
|
| else:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if 'cr' in str(amt_val).lower():
|
| tran_type = 'income'
|
| elif 'dr' in str(amt_val).lower():
|
| tran_type = 'expense'
|
| else:
|
|
|
|
|
|
|
| tran_type = 'expense'
|
|
|
|
|
|
|
| transactions.append({
|
| 'date': date_obj.strftime('%Y-%m-%d'),
|
| 'title': title,
|
| 'amount': amount,
|
| 'type': tran_type,
|
| 'category': category
|
| })
|
|
|
|
|
| except Exception as e:
|
|
|
| continue
|
|
|
| if not transactions:
|
|
|
| file.seek(0)
|
| with pdfplumber.open(file) as pdf:
|
| for page in pdf.pages:
|
| text = page.extract_text()
|
| if not text: continue
|
|
|
| lines = text.split('\n')
|
| for line in lines:
|
| line = line.strip()
|
| if not line: continue
|
|
|
|
|
|
|
| match = re.search(r'^(\d{1,2}[/\.-]\d{1,2}[/\.-]\d{2,4})\s+(.+?)\s+(-?[\d,]+\.\d{2}[CRcr]*)', line)
|
|
|
|
|
|
|
| if not match:
|
|
|
|
|
|
|
|
|
|
|
| match_v2 = re.search(r'^(.+?)\s+([\d,]+\.\d+)\s+(.+?)\s+(\d{4}-\d{2}-\d{2})\s+([A-Za-z]+)$', line)
|
|
|
| if match_v2:
|
| try:
|
| title = match_v2.group(1).strip()
|
| amount = float(match_v2.group(2).replace(',', ''))
|
| category = match_v2.group(3).strip()
|
| date_str = match_v2.group(4)
|
| type_str = match_v2.group(5).lower()
|
|
|
| if title.lower() == 'title' and amount == 0: continue
|
| if 'amount' in match_v2.group(2).lower(): continue
|
|
|
| date_obj = datetime.strptime(date_str, '%Y-%m-%d')
|
|
|
| transactions.append({
|
| 'date': date_obj.strftime('%Y-%m-%d'),
|
| 'title': title,
|
| 'amount': abs(amount),
|
| 'type': 'income' if 'income' in type_str else 'expense',
|
| 'category': category
|
| })
|
| continue
|
| except: pass
|
|
|
| if not match:
|
|
|
| match = re.search(r'^(\d{1,2}\s+[A-Za-z]{3}\s+\d{2,4})\s+(.+?)\s+(-?[\d,]+\.\d{2}[CRcr]*)', line)
|
|
|
| if match:
|
| date_str = match.group(1)
|
| desc_str = match.group(2)
|
| amt_str = match.group(3)
|
|
|
|
|
| date_obj = None
|
| for fmt in ['%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d', '%d.%m.%Y', '%d %b %Y', '%d %b %y']:
|
| try:
|
| date_obj = datetime.strptime(date_str, fmt)
|
| break
|
| except: pass
|
|
|
| if not date_obj: continue
|
|
|
|
|
| try:
|
| is_credit = False
|
| s_val = amt_str.replace(',', '').replace('$', '').replace(' ', '')
|
| if 'cr' in s_val.lower():
|
| is_credit = True
|
| s_val = s_val.lower().replace('cr', '')
|
|
|
| amount = float(s_val)
|
|
|
| tran_type = 'income' if (amount > 0 and is_credit) else 'expense'
|
|
|
|
|
|
|
|
|
| transactions.append({
|
| 'date': date_obj.strftime('%Y-%m-%d'),
|
| 'title': desc_str.strip(),
|
| 'amount': abs(amount),
|
| 'type': tran_type,
|
| 'category': 'Uncategorized'
|
| })
|
| except: continue
|
|
|
|
|
| if not transactions:
|
| return None
|
|
|
|
|
| try:
|
| from .category_classifier import batch_classify_transactions
|
|
|
| batch_input = []
|
| for tx in transactions:
|
| batch_input.append({
|
| 'title': tx['title'],
|
| 'type': tx['type'].capitalize() if tx.get('type') else 'Expense'
|
| })
|
|
|
| results = batch_classify_transactions(batch_input)
|
| for idx, res in enumerate(results):
|
| transactions[idx]['category'] = res['category']
|
| except Exception as e:
|
| print(f"Error during PDF batch categorization: {e}")
|
|
|
| return pd.DataFrame(transactions)
|
|
|
| except Exception as e:
|
| import traceback
|
| traceback.print_exc()
|
| return None
|
|
|