File size: 9,872 Bytes
af365fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import gradio as gr
import pandas as pd
import json
from datetime import datetime
from typing import Tuple, Dict, Any
import os
import logging

try:
    from langchain.llms import HuggingFaceHub
    from langchain.prompts import PromptTemplate
    from langchain.chains import LLMChain
except ImportError:
    # Fallback: try OpenAI or basic mock
    pass

from hf_storage import HFHubLedger

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ExpenseManager:
    """Manages ledger entries and DataFrame operations."""

    def __init__(self):
        """Initialize the expense manager with an empty DataFrame."""
        self.df = pd.DataFrame(
            columns=["Date", "Description", "Category", "Amount"]
        )
        self.df["Date"] = pd.to_datetime(self.df["Date"])
        self.df["Amount"] = pd.to_numeric(self.df["Amount"])

    def add_entry(self, date: str, description: str, category: str, amount: float) -> bool:
        """Add a new expense entry to the ledger."""
        try:
            new_entry = pd.DataFrame({
                "Date": [pd.to_datetime(date)],
                "Description": [description],
                "Category": [category],
                "Amount": [float(amount)]
            })
            self.df = pd.concat([self.df, new_entry], ignore_index=True)
            self.df = self.df.sort_values("Date", ascending=False).reset_index(drop=True)
            return True
        except Exception as e:
            print(f"Error adding entry: {e}")
            return False

    def get_dataframe(self) -> pd.DataFrame:
        """Return the current DataFrame."""
        return self.df.copy()

    def get_total_spending(self) -> float:
        """Calculate and return total spending."""
        if self.df.empty:
            return 0.0
        return self.df["Amount"].sum()

    def get_category_summary(self) -> Dict[str, float]:
        """Get spending summary by category."""
        if self.df.empty:
            return {}
        return self.df.groupby("Category")["Amount"].sum().to_dict()


def initialize_llm():
    """Initialize the LLM. Supports HuggingFace or OpenAI."""
    try:
        # Try HuggingFace
        api_token = os.getenv("HUGGINGFACEHUB_API_TOKEN")
        if api_token:
            llm = HuggingFaceHub(
                repo_id="mistralai/Mistral-7B-Instruct-v0.2",
                huggingfacehub_api_token=api_token,
                model_kwargs={"temperature": 0.1, "max_length": 200}
            )
            return llm
    except Exception as e:
        print(f"HuggingFace initialization failed: {e}")

    try:
        # Fallback to OpenAI
        from langchain.llms import OpenAI
        api_key = os.getenv("OPENAI_API_KEY")
        if api_key:
            return OpenAI(temperature=0.1, max_tokens=200)
    except Exception as e:
        print(f"OpenAI initialization failed: {e}")

    return None


def parse_expense_with_llm(user_input: str, llm) -> Dict[str, Any]:
    """
    Parse natural language input into structured expense data using LLM.
    Returns a dictionary with keys: date, description, category, amount
    """
    if not llm:
        return parse_expense_fallback(user_input)

    prompt_template = PromptTemplate(
        input_variables=["user_input"],
        template="""Parse the following expense entry and extract the information into a JSON object.
        
User input: {user_input}

Return ONLY a valid JSON object with these fields (use today's date if not specified):
- date (YYYY-MM-DD format)
- description (what was purchased)
- category (e.g., Food, Transportation, Utilities, Entertainment, Other)
- amount (numeric value without currency symbol)

JSON:"""
    )

    chain = LLMChain(llm=llm, prompt=prompt_template)
    response = chain.run(user_input=user_input)

    try:
        # Extract JSON from response
        json_str = response.strip()
        # Find JSON object in response
        start_idx = json_str.find("{")
        end_idx = json_str.rfind("}") + 1
        if start_idx != -1 and end_idx > start_idx:
            json_str = json_str[start_idx:end_idx]
        parsed = json.loads(json_str)
        return parsed
    except json.JSONDecodeError as e:
        print(f"JSON parsing error: {e}")
        return parse_expense_fallback(user_input)


def parse_expense_fallback(user_input: str) -> Dict[str, Any]:
    """
    Fallback parser using regex and heuristics when LLM is unavailable.
    """
    import re

    result = {
        "date": datetime.now().strftime("%Y-%m-%d"),
        "description": user_input,
        "category": "Other",
        "amount": 0.0
    }

    # Try to extract amount
    amount_pattern = r"\$?(\d+(?:\.\d{2})?)"
    amount_match = re.search(amount_pattern, user_input)
    if amount_match:
        result["amount"] = float(amount_match.group(1))

    # Simple category detection
    categories = {
        "Food": ["food", "lunch", "dinner", "breakfast", "coffee", "restaurant", "burrito", "pizza", "eat"],
        "Transportation": ["gas", "uber", "lyft", "taxi", "bus", "train", "parking", "car"],
        "Utilities": ["electric", "water", "gas", "internet", "phone", "utility"],
        "Entertainment": ["movie", "concert", "game", "book", "music"],
        "Rent": ["rent", "apartment", "mortgage"],
    }

    user_lower = user_input.lower()
    for category, keywords in categories.items():
        if any(keyword in user_lower for keyword in keywords):
            result["category"] = category
            break

    return result


def process_expense_entry(
    user_input: str,
    manager: ExpenseManager,
    llm,
    hf_ledger: HFHubLedger = None
) -> Tuple[pd.DataFrame, str, str]:
    """
    Process user input, parse it, add to ledger, and return updated table.
    """
    if not user_input.strip():
        return manager.get_dataframe(), "", "Please enter an expense description."

    try:
        # Parse the expense
        parsed = parse_expense_with_llm(user_input, llm)

        # Validate parsed data
        if not parsed.get("amount") or parsed["amount"] <= 0:
            return manager.get_dataframe(), "", "โŒ Error: Could not extract valid amount. Try again."

        # Add to ledger
        success = manager.add_entry(
            date=parsed.get("date", datetime.now().strftime("%Y-%m-%d")),
            description=parsed.get("description", user_input),
            category=parsed.get("category", "Other"),
            amount=float(parsed["amount"])
        )

        if success:
            # Sync to HF Hub if enabled
            if hf_ledger:
                hf_ledger.save(manager.df)
            
            total = manager.get_total_spending()
            message = f"โœ… Logged: ${parsed['amount']:.2f} - {parsed['description']}"
            return manager.get_dataframe(), "", message
        else:
            return manager.get_dataframe(), "", "โŒ Error adding entry. Please try again."

    except Exception as e:
        return manager.get_dataframe(), "", f"โŒ Error: {str(e)}"


def build_interface(manager, llm, hf_ledger: HFHubLedger):
    """Build the Gradio interface."""

    def log_expense_callback(user_input: str) -> Tuple[pd.DataFrame, str, str]:
        """Callback for log expense button."""
        df, cleared_input, message = process_expense_entry(user_input, manager, llm, hf_ledger)
        total = manager.get_total_spending()
        total_md = f"### ๐Ÿ’ฐ Total Spending: ${total:.2f}"
        return df, cleared_input, message, total_md

    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("# ๐Ÿ’ธ Personal Finance Manager")
        gr.Markdown("Log your expenses using natural language. The AI will parse and categorize them for you.")
        gr.Markdown(f"**Storage Status:** {hf_ledger.get_status()}")

        with gr.Row():
            with gr.Column(scale=3):
                user_input = gr.Textbox(
                    label="Describe your expense",
                    placeholder="e.g., 'Spent $15 on a burrito at Chipotle' or 'Paid $1200 for rent'",
                    lines=2
                )
            with gr.Column(scale=1):
                log_button = gr.Button("Log Expense", variant="primary", scale=1)

        status_output = gr.Textbox(
            label="Status",
            interactive=False,
            max_lines=1
        )

        total_display = gr.Markdown("### ๐Ÿ’ฐ Total Spending: $0.00")

        gr.Markdown("## ๐Ÿ“Š Ledger")
        ledger_table = gr.Dataframe(
            value=manager.get_dataframe(),
            interactive=False,
            label="Expense Entries",
            datatype=["str", "str", "str", "number"],
        )

        # Connect button click to callback
        log_button.click(
            fn=log_expense_callback,
            inputs=[user_input],
            outputs=[ledger_table, user_input, status_output, total_display]
        )

        # Allow Enter key to submit
        user_input.submit(
            fn=log_expense_callback,
            inputs=[user_input],
            outputs=[ledger_table, user_input, status_output, total_display]
        )

    return demo


def main():
    """Main entry point."""
    # Initialize HuggingFace Hub ledger
    hf_ledger = HFHubLedger()
    
    # Initialize components
    manager = ExpenseManager()
    
    # Load existing data from HF Hub if available
    if hf_ledger.df is not None and not hf_ledger.df.empty:
        manager.df = hf_ledger.df.copy()
        logger.info(f"Loaded {len(manager.df)} entries from persistent storage")
    
    llm = initialize_llm()

    if not llm:
        logger.warning("โš ๏ธ  Warning: LLM not available. Using fallback parser.")

    # Build and launch interface
    demo = build_interface(manager, llm, hf_ledger)
    demo.launch(share=False)


if __name__ == "__main__":
    main()