| import os |
| from typing import Dict |
| from typing import Generator |
| from langchain_huggingface import HuggingFaceEmbeddings, ChatHuggingFace, HuggingFaceEndpoint |
| from langchain_community.vectorstores import Chroma |
| from langchain_community.retrievers import BM25Retriever |
| from langchain_core.prompts import PromptTemplate |
| from langchain_core.documents import Document |
| from langchain_community.chat_models import ChatOllama |
|
|
| try: |
| from langchain.retrievers import EnsembleRetriever |
| except ImportError: |
| from langchain_community.retrievers import EnsembleRetriever |
|
|
| try: |
| from langchain.chains import ConversationalRetrievalChain |
| except ImportError: |
| from langchain_community.chains import ConversationalRetrievalChain |
|
|
| try: |
| from langchain.memory import ConversationBufferMemory |
| except ImportError: |
| from langchain_community.memory import ConversationBufferMemory |
|
|
| |
| embeddings = HuggingFaceEmbeddings( |
| model_name="nomic-ai/nomic-embed-text-v1", |
| model_kwargs={"trust_remote_code": True}, |
| ) |
|
|
| |
| vectorstore = Chroma( |
| persist_directory="./chroma_db", |
| embedding_function=embeddings, |
| ) |
|
|
| |
| vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 8}) |
|
|
| all_docs = vectorstore.get() |
| docs_for_bm25 = [Document(page_content=d) for d in all_docs["documents"]] |
| bm25_retriever = BM25Retriever.from_documents(docs_for_bm25) |
| bm25_retriever.k = 8 |
|
|
| retriever = EnsembleRetriever( |
| retrievers=[bm25_retriever, vector_retriever], |
| weights=[0.6, 0.4], |
| ) |
|
|
| |
| endpoint = HuggingFaceEndpoint( |
| repo_id="Qwen/Qwen2.5-14B-Instruct", |
| huggingfacehub_api_token=os.environ["HF_API_KEY"], |
| provider="featherless-ai", |
| temperature=0.3, |
| max_new_tokens=1024, |
| ) |
| llm = ChatHuggingFace(llm=endpoint) |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| condense_prompt = PromptTemplate.from_template( |
| """Given the conversation below and a follow-up question, rephrase the |
| follow-up as a standalone question that captures all relevant context. |
| |
| Chat History: |
| {chat_history} |
| |
| Follow-up Question: {question} |
| |
| Standalone Question:""" |
| ) |
|
|
| |
| QA_TEMPLATE = """You are FISCAL, a friendly and knowledgeable AI companion specializing in Canadian personal finance, lifestyle budgeting, and cash-flow tracking. |
| |
| {financial_context} |
| |
| STRICT COMPLIANCE GUARDRAILS: |
| - You provide automated budgeting analysis and lifestyle financial coaching for educational and informational purposes only. |
| - You DO NOT provide certified financial, legal, tax, or investment advice. You do not operate under a fiduciary duty. |
| - NEVER recommend specific stocks, ETFs, mutual funds, crypto assets, or financial institutions. |
| - If the user asks for investment recommendations or wealth management advice, you MUST politely refuse and guide them back to lifestyle budgeting. |
| - Do not mention Plaid, access tokens, sandbox, or any technical infrastructure details. |
| - Do not mention any underlying AI models (like Qwen). |
| |
| GUIDELINES: |
| - Do NOT re-introduce yourself in subsequent turns. |
| - If the question is a greeting, respond warmly and briefly introduce yourself. |
| - If the question is about your capabilities, highlight that you analyze live transaction patterns to catch hidden spending leaks, track bill anomalies, and offer automated lifestyle budgeting guidance. |
| COMMON SPENDING TERMS (understand these as the user means them): |
| - "take out" / "takeout" / "take-out" = Food & Drink category (restaurants, DoorDash, UberEats, SkipTheDishes, etc.) |
| - "eating out" / "dining out" = Food & Drink category |
| - "coffees" / "coffee runs" = Food & Drink (Tim Hortons, Starbucks, etc.) |
| - "groceries" = Food & Drink or General Merchandise (grocery stores) |
| - "gas" / "fuel" = Transportation category |
| - "bills" = recurring payments (utilities, subscriptions, insurance) |
| - "subscriptions" / "subs" = recurring digital services (Netflix, Spotify, etc.) |
| - "shopping" = General Merchandise category |
| - "Uber" / "rides" = Transportation (rideshare) |
| - "rent" / "mortgage" = Housing/Shelter |
| - "fun money" / "spending money" = discretionary/wants categories |
| - "delivery" = Food & Drink (food delivery apps) |
| When the user uses casual language, map it to the relevant transaction categories in their data. Do NOT say you cannot find a category — instead, look at the matching transactions and summarize them. |
| |
| CHART GENERATION RULES: |
| - When your answer involves numerical data, trends, comparisons, or breakdowns that would benefit from a visual chart, ALWAYS include the chart data automatically. Do NOT ask the user if they want a visual — just generate it. Include a JSON block at the END of your response using this exact format: |
| |
| [CHART_DATA] |
| {{"type": "pie|bar|area|line", "title": "Chart Title", "subtitle": "Brief description", "data": [...]}} |
| [/CHART_DATA] |
| |
| Chart data formats by type: |
| - pie: [{{"name": "Category", "value": 30, "detail": "optional description"}}, ...] |
| - bar: [{{"name": "Label", "value": 123}}, ...] or with multiple series [{{"name": "Label", "series1": 10, "series2": 20}}, ...] |
| - area/line: [{{"label": "X Axis Label", "value": 123}}, ...] or with multiple series [{{"label": "X", "series1": 10, "series2": 20}}, ...] |
| |
| Only include chart data when the answer genuinely benefits from visualization. Do NOT generate charts for simple text answers, greetings, or yes/no questions. |
| |
| Examples of when to generate charts: |
| - "What's happening with inflation?" → area chart showing CPI trend |
| - "Break down the 50/30/20 rule" → pie chart |
| - "How much rent in different cities?" → bar chart |
| - "Show my spending breakdown" → pie chart from their financial data |
| - "Snowball vs avalanche?" → line chart comparison |
| |
| GOLD STANDARD ANALYSIS EXAMPLES (FOLLOW THIS STYLE): |
| |
| Example 1 (General Analysis): |
| User: "How are my finances looking this month?" |
| AI Answer: "Hey there! Looking at your recent transaction trends, your fixed bills are in great shape, but your variable discretionary spending is climbing a bit faster than usual this month. Here is a quick breakdown of where your cash flow is going: |
| - Rideshare Spikes: You spent a total of $240 on Uber and Lyft over the last three weeks. This is your largest non-essential expense category right now. |
| - Subscription Leaks: I also noticed three separate $14.99 charges for overlapping streaming platforms. |
| My Fiscal Insight: If you consider switching to public transit or carpooling just two days a week, and cancel the streaming apps you aren't actively using, you could easily reclaim roughly $150 in extra room in your chequing account before the month ends! What category would you like to deep dive into next?" |
| |
| Example 2 (Specific Category Leak): |
| User: "Where am I losing money?" |
| AI Answer: "Based on your recent transaction ledger, your most significant spending spike is happening in the Food & Drink category. Specifically, you spent $170 on fast food and takeout this week alone! My Fiscal Insight: Shifting to home-cooked meals just two days a week could instantly keep an extra $100 in your pocket this month. Small adjustments add up quickly!" |
| |
| Context: |
| {context} |
| |
| Question: {question} |
| |
| Answer:""" |
|
|
|
|
|
|
| |
| _memory_store: Dict[str, ConversationBufferMemory] = {} |
|
|
|
|
| def _get_memory(user_id: str) -> ConversationBufferMemory: |
| if user_id not in _memory_store: |
| _memory_store[user_id] = ConversationBufferMemory( |
| memory_key="chat_history", |
| return_messages=True, |
| output_key="answer", |
| ) |
| return _memory_store[user_id] |
|
|
|
|
| def get_answer(message: str, user_id: str, financial_context: str = "") -> str: |
|
|
| |
| if not financial_context or financial_context == "NO_BANK_DATA" or "Unable to fetch" in financial_context or "Bad Request" in financial_context or "access token" in financial_context.lower(): |
| context_block = "The user has not connected their bank account yet. If relevant, gently mention they can connect their bank account in the Options menu to get personalized advice based on their real transactions and balances." |
| else: |
| context_block = f"The user has connected their bank account. Here is their live financial data:\n{financial_context}" |
|
|
| filled_template = QA_TEMPLATE.replace( |
| "{financial_context}", |
| context_block |
| ) |
|
|
| prompt = PromptTemplate( |
| template=filled_template, |
| input_variables=["context", "question"], |
| ) |
|
|
| chain = ConversationalRetrievalChain.from_llm( |
| llm=llm, |
| retriever=retriever, |
| memory=_get_memory(user_id), |
| condense_question_prompt=condense_prompt, |
| combine_docs_chain_kwargs={"prompt": prompt}, |
| return_source_documents=False, |
| ) |
|
|
| result = chain.invoke({"question": message}) |
| return result["answer"] |
|
|
| def clear_memory(user_id: str) -> None: |
| if user_id in _memory_store: |
| _memory_store[user_id].clear() |
|
|
|
|
| def stream_answer(message: str, user_id: str, financial_context: str = "") -> Generator: |
| """Stream answer token by token with conversation memory.""" |
| |
| if not financial_context or financial_context == "NO_BANK_DATA" or "Unable to fetch" in financial_context or "Bad Request" in financial_context or "access token" in financial_context.lower(): |
| context_block = "The user has not connected their bank account yet. If relevant, gently mention they can connect their bank account in the Options menu to get personalized advice based on their real transactions and balances." |
| else: |
| context_block = f"The user has connected their bank account. Here is their live financial data:\n{financial_context}" |
|
|
| filled_template = QA_TEMPLATE.replace( |
| "{financial_context}", |
| context_block |
| ) |
|
|
| prompt = PromptTemplate( |
| template=filled_template, |
| input_variables=["context", "question"], |
| ) |
|
|
| memory = _get_memory(user_id) |
| chat_history = memory.chat_memory.messages |
| |
| history_str = "" |
| for msg in chat_history[-4:]: |
| if msg.type == "human": |
| history_str += f"User: {msg.content}\n" |
| else: |
| history_str += f"FISCAL: {msg.content}\n" |
|
|
| if history_str: |
| condense_input = f"""Given the conversation below and a follow-up question, rephrase the follow-up as a standalone question that captures all relevant context. |
| |
| Chat History: |
| {history_str} |
| |
| Follow-up Question: {message} |
| |
| Standalone Question:""" |
| |
| condensed = llm.invoke(condense_input) |
| standalone_question = condensed.content if hasattr(condensed, 'content') else str(condensed) |
| else: |
| standalone_question = message |
|
|
| docs = retriever.invoke(standalone_question) |
| context = "\n\n".join([doc.page_content for doc in docs]) |
| filled_prompt = prompt.format(context=context, question=standalone_question) |
|
|
| full_response = "" |
| for chunk in llm.stream(filled_prompt): |
| content = chunk.content if hasattr(chunk, 'content') else str(chunk) |
| full_response += content |
| yield content |
|
|
| |
| clean_response = full_response |
| chart_idx = clean_response.find("[CHART_DATA]") |
| if chart_idx > -1: |
| clean_response = clean_response[:chart_idx].strip() |
|
|
| memory.chat_memory.add_user_message(message) |
| memory.chat_memory.add_ai_message(clean_response) |
|
|