| import sqlite3 |
| import json |
| import uuid |
| import datetime |
| import logging |
| from flask import Flask, request, render_template_string, jsonify |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| |
| try: |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| logging.warning("Transformers library not found. Using fallback parser.") |
| TRANSFORMERS_AVAILABLE = False |
| AutoModelForCausalLM = None |
| AutoTokenizer = None |
|
|
| |
| app = Flask(__name__) |
|
|
| |
| model_name = "distilbert-base-uncased" |
| if TRANSFORMERS_AVAILABLE: |
| try: |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModelForCausalLM.from_pretrained(model_name) |
| logging.info(f"Loaded model: {model_name}") |
| except Exception as e: |
| logging.error(f"Failed to load model {model_name}: {e}") |
| tokenizer = None |
| model = None |
| else: |
| tokenizer = None |
| model = None |
|
|
| |
| conn = sqlite3.connect("erp.db", check_same_thread=False) |
| cursor = conn.cursor() |
|
|
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS chart_of_accounts ( |
| account_id TEXT PRIMARY KEY, |
| account_name TEXT NOT NULL, |
| account_type TEXT NOT NULL, |
| parent_id TEXT, |
| allow_budgeting BOOLEAN, |
| allow_posting BOOLEAN, |
| FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id) |
| ) |
| """) |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS journal_entries ( |
| entry_id TEXT PRIMARY KEY, |
| date TEXT NOT NULL, |
| debit_account_id TEXT NOT NULL, |
| credit_account_id TEXT NOT NULL, |
| amount REAL NOT NULL, |
| description TEXT, |
| FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id), |
| FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id) |
| ) |
| """) |
| conn.commit() |
|
|
| |
| ACCOUNT_RULES = { |
| "Asset": {"increase": "Debit", "decrease": "Credit"}, |
| "Liability": {"increase": "Credit", "decrease": "Debit"}, |
| "Equity": {"increase": "Credit", "decrease": "Debit"}, |
| "Revenue": {"increase": "Credit", "decrease": "Debit"}, |
| "Expense": {"increase": "Debit", "decrease": "Credit"} |
| } |
|
|
| |
| def initialize_chart_of_accounts(): |
| accounts = [ |
| ("1", "Assets", "Asset", None, True, False), |
| ("1.1", "Fixed Assets", "Asset", "1", True, False), |
| ("1.1.1", "Plant", "Asset", "1.1", True, True), |
| ("1.1.2", "Machinery", "Asset", "1.1", True, True), |
| ("1.1.3", "Building", "Asset", "1.1", True, True), |
| ("1.2", "Current Assets", "Asset", "1", True, False), |
| ("1.2.1", "Cash", "Asset", "1.2", True, True), |
| ("1.2.2", "Laptop", "Asset", "1.2", True, True), |
| ("2", "Liabilities", "Liability", None, True, False), |
| ("2.1", "Accounts Payable", "Liability", "2", True, True), |
| ("3", "Equity", "Equity", None, True, False), |
| ("3.1", "Owner's Capital", "Equity", "3", True, True), |
| ("4", "Revenue", "Revenue", None, True, False), |
| ("4.1", "Sales", "Revenue", "4", True, True), |
| ("5", "Expenses", "Expense", None, True, False), |
| ("5.1", "Operating Expenses", "Expense", "5", True, True) |
| ] |
| cursor.executemany(""" |
| INSERT OR REPLACE INTO chart_of_accounts |
| (account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, accounts) |
| conn.commit() |
| logging.info("Chart of accounts initialized.") |
|
|
| |
| def parse_prompt(prompt): |
| if model and tokenizer: |
| try: |
| input_text = f""" |
| Parse the following accounting prompt into a JSON object with: |
| - debit: {{account, type, amount}} |
| - credit: {{account, type, amount}} |
| - payment_method: 'cash' or 'credit' or null |
| Prompt: {prompt} |
| """ |
| inputs = tokenizer(input_text, return_tensors="pt") |
| outputs = model.generate(**inputs, max_length=300) |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
| return json.loads(response) |
| except Exception as e: |
| logging.warning(f"Model parsing failed: {e}. Using fallback parser.") |
|
|
| |
| prompt_lower = prompt.lower() |
| amount = None |
| for word in prompt_lower.split(): |
| if word.startswith("$"): |
| try: |
| amount = float(word[1:]) |
| break |
| except: |
| pass |
| |
| if not amount: |
| logging.error("No amount found in prompt.") |
| return None |
|
|
| if "laptop" in prompt_lower: |
| debit_account = "Laptop" |
| debit_type = "Asset" |
| if "cash" in prompt_lower: |
| credit_account = "Cash" |
| credit_type = "Asset" |
| payment_method = "cash" |
| elif "credit" in prompt_lower: |
| credit_account = "Accounts Payable" |
| credit_type = "Liability" |
| payment_method = "credit" |
| else: |
| return {"debit": {"account": "Laptop", "type": "Asset", "amount": amount}, "credit": None, "payment_method": None} |
| return { |
| "debit": {"account": debit_account, "type": debit_type, "amount": amount}, |
| "credit": {"account": credit_account, "type": credit_type, "amount": amount}, |
| "payment_method": payment_method |
| } |
| logging.error("Prompt not recognized.") |
| return None |
|
|
| |
| def generate_journal_entry(prompt, follow_up_response=None): |
| parsed = parse_prompt(prompt) |
| if not parsed: |
| return "Unable to parse prompt. Please provide more details." |
|
|
| debit_account = parsed["debit"]["account"] |
| amount = parsed["debit"]["amount"] |
| payment_method = parsed.get("payment_method") |
|
|
| |
| if not payment_method and not follow_up_response: |
| return {"status": "clarify", "message": "Wonderful, did you buy on credit? (Yes/No)", "original_prompt": prompt} |
|
|
| |
| credit_account = None |
| credit_type = None |
| if follow_up_response and follow_up_response.lower() == "yes": |
| credit_account = "Accounts Payable" |
| credit_type = "Liability" |
| elif payment_method == "cash": |
| credit_account = parsed["credit"]["account"] |
| credit_type = parsed["credit"]["type"] |
| elif payment_method == "credit": |
| credit_account = "Accounts Payable" |
| credit_type = "Liability" |
| else: |
| return "Invalid payment method specified." |
|
|
| |
| cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,)) |
| debit_result = cursor.fetchone() |
| cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,)) |
| credit_result = cursor.fetchone() |
|
|
| if not debit_result or not credit_result: |
| return "One or both accounts not found in chart of accounts." |
| if not debit_result[2] or not credit_result[2]: |
| return "Posting not allowed for one or both accounts." |
|
|
| |
| if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type: |
| return "Account type mismatch." |
|
|
| |
| entry_id = str(uuid.uuid4()) |
| date = datetime.datetime.now().isoformat() |
| cursor.execute(""" |
| INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, (entry_id, date, debit_result[0], credit_result[0], amount, prompt)) |
| conn.commit() |
| logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}") |
|
|
| return f"Journal Entry Created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}" |
|
|
| |
| def generate_t_account(account_name): |
| cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,)) |
| account_id = cursor.fetchone() |
| if not account_id: |
| logging.error(f"Account {account_name} not found.") |
| return "Account not found." |
| |
| account_id = account_id[0] |
| try: |
| cursor.execute(""" |
| SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ? |
| UNION |
| SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ? |
| ORDER BY date |
| """, (account_id, account_id)) |
| entries = cursor.fetchall() |
| logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}") |
| except sqlite3.Error as e: |
| logging.error(f"SQL error in generate_t_account: {e}") |
| return "Error retrieving T-account data." |
| |
| t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n" |
| debit_total = 0 |
| credit_total = 0 |
| for date, amount, desc, entry_type in entries: |
| if entry_type == "Debit": |
| t_account += f"${amount:<19} | {'':<20} | {desc}\n" |
| debit_total += amount |
| else: |
| t_account += f"{'':<20} | ${amount:<19} | {desc}\n" |
| credit_total += amount |
| t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n" |
| |
| return t_account |
|
|
| |
| HTML_TEMPLATE = """ |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>AI ERP System</title> |
| <style> |
| body { font-family: Arial, sans-serif; margin: 20px; } |
| h1 { color: #333; } |
| textarea, input { width: 100%; margin: 10px 0; } |
| pre { background: #f4f4f4; padding: 10px; white-space: pre-wrap; } |
| .error { color: red; } |
| </style> |
| </head> |
| <body> |
| <h1>AI ERP System</h1> |
| <h2>Enter Transaction Prompt</h2> |
| <form method="POST" action="/process_prompt"> |
| <textarea name="prompt" rows="4" placeholder="e.g., Bought a laptop for $200 on cash"></textarea> |
| <input type="submit" value="Submit Prompt"> |
| </form> |
| {% if result %} |
| <h2>Result</h2> |
| {% if result.status == 'clarify' %} |
| <p>{{ result.message }}</p> |
| <form method="POST" action="/process_follow_up"> |
| <input type="hidden" name="original_prompt" value="{{ result.original_prompt }}"> |
| <input type="text" name="follow_up" placeholder="Yes/No"> |
| <input type="submit" value="Submit Response"> |
| </form> |
| {% else %} |
| <pre>{{ result }}</pre> |
| {% endif %} |
| {% endif %} |
| <h2>View T-Account</h2> |
| <form method="POST" action="/t_account"> |
| <input type="text" name="account_name" placeholder="e.g., Laptop"> |
| <input type="submit" value="Generate T-Account"> |
| </form> |
| {% if t_account %} |
| <h2>T-Account</h2> |
| <pre>{{ t_account }}</pre> |
| {% endif %} |
| </body> |
| </html> |
| """ |
|
|
| |
| @app.route("/", methods=["GET", "POST"]) |
| def index(): |
| initialize_chart_of_accounts() |
| return render_template_string(HTML_TEMPLATE, result=None, t_account=None) |
|
|
| @app.route("/process_prompt", methods=["POST"]) |
| def process_prompt(): |
| prompt = request.form.get("prompt") |
| if not prompt: |
| return render_template_string(HTML_TEMPLATE, result="No prompt provided.", t_account=None) |
| |
| result = generate_journal_entry(prompt) |
| return render_template_string(HTML_TEMPLATE, result=result, t_account=None) |
|
|
| @app.route("/process_follow_up", methods=["POST"]) |
| def process_follow_up(): |
| original_prompt = request.form.get("original_prompt") |
| follow_up = request.form.get("follow_up") |
| if not original_prompt or not follow_up: |
| return render_template_string(HTML_TEMPLATE, result="Missing prompt or response.", t_account=None) |
| |
| result = generate_journal_entry(original_prompt, follow_up) |
| return render_template_string(HTML_TEMPLATE, result=result, t_account=None) |
|
|
| @app.route("/t_account", methods=["POST"]) |
| def t_account(): |
| account_name = request.form.get("account_name") |
| if not account_name: |
| return render_template_string(HTML_TEMPLATE, result=None, t_account="No account name provided.") |
| |
| t_account = generate_t_account(account_name) |
| return render_template_string(HTML_TEMPLATE, result=None, t_account=t_account) |
|
|
| |
| if __name__ == "__main__": |
| |
| initialize_chart_of_accounts() |
| |
| |
| app.run(host="0.0.0.0", port=7860, debug=False) |