Spaces:
Sleeping
Sleeping
| """ | |
| International Student Finance Portal | |
| A comprehensive financial advisory system for international students | |
| Implements 5 agent design patterns: RAG, Role-based Cooperation, Voting-based Cooperation, | |
| Self-reflection, and Multi-path Plan Generator | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import threading | |
| from typing import List, Dict, Any, Optional | |
| from functools import lru_cache | |
| try: | |
| # Import required libraries | |
| import gradio as gr | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| except ImportError as e: | |
| print(f"Error importing required libraries: {e}") | |
| print("Please install required packages: pip install -r requirements.txt") | |
| sys.exit(1) | |
| # ======================================= | |
| # API Key & Workflow Logging | |
| # ======================================= | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| api_key = input("Please enter your OpenAI API key: ") | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| WORKFLOW_LOG: List[Dict[str, Any]] = [] | |
| def log_workflow(step: str, details: Any = None): | |
| timestamp = time.strftime("%H:%M:%S") | |
| entry = {"time": timestamp, "step": step} | |
| if details is not None: | |
| entry["details"] = details | |
| WORKFLOW_LOG.append(entry) | |
| print(f"[{timestamp}] {step}{': ' + str(details) if details else ''}") | |
| def clear_workflow_log(): | |
| WORKFLOW_LOG.clear() | |
| def get_workflow_log() -> str: | |
| if not WORKFLOW_LOG: | |
| return "No workflow steps recorded yet." | |
| log_text = "## Workflow Execution Log:\n\n" | |
| for entry in WORKFLOW_LOG: | |
| log_text += f"**[{entry['time']}]** {entry['step']}" | |
| if 'details' in entry and entry['details']: | |
| details = entry['details'] | |
| if isinstance(details, dict): | |
| for k, v in details.items(): | |
| if isinstance(v, str) and len(v) > 100: | |
| details[k] = v[:100] + "..." | |
| log_text += f"``````\n" | |
| else: | |
| log_text += f"{details}\n" | |
| log_text += "\n" | |
| return log_text | |
| # ======================================= | |
| # Tax Regulation Database | |
| # ======================================= | |
| class TaxRegulationDatabase: | |
| def __init__(self): | |
| self.llm = ChatOpenAI(temperature=0.1) | |
| self.tax_regulations: Dict[str, List[str]] = {} | |
| self.tax_treaties: Dict[str, List[str]] = {} | |
| self.lock = threading.Lock() | |
| def preload_common_countries(self): | |
| countries = ["India", "China", "South Korea", "Brazil", "Canada", "Mexico", "Taiwan", "Japan", "Vietnam"] | |
| log_workflow("Preloading tax regulations for common countries") | |
| for country in countries: | |
| threading.Thread(target=self._load_all, args=(country,), daemon=True).start() | |
| def _load_all(self, country: str): | |
| self._get_tax_regulations(country) | |
| self._get_tax_treaty(country) | |
| def _get_tax_regulations(self, country: str) -> List[str]: | |
| log_workflow(f"Loading tax regulations for {country}") | |
| prompt = f"Provide 5 factual statements about tax regs for {country} students in the US, incl. forms, thresholds." | |
| try: | |
| resp = self.llm.invoke(prompt) | |
| regs = [line.strip() for line in resp.content.split("\n") if line.strip()] | |
| with self.lock: | |
| self.tax_regulations[country] = regs | |
| return regs | |
| except Exception as e: | |
| log_workflow(f"Error loading tax regs for {country}", str(e)) | |
| return [f"Error: {e}"] | |
| def _get_tax_treaty(self, country: str) -> List[str]: | |
| log_workflow(f"Loading tax treaty for {country}") | |
| prompt = f"Provide 5 statements about US-{country} tax treaty for students, incl. articles, exemptions." | |
| try: | |
| resp = self.llm.invoke(prompt) | |
| treaty = [line.strip() for line in resp.content.split("\n") if line.strip()] | |
| with self.lock: | |
| self.tax_treaties[country] = treaty | |
| return treaty | |
| except Exception as e: | |
| log_workflow(f"Error loading treaty for {country}", str(e)) | |
| return [f"Error: {e}"] | |
| def get_tax_information(self, country: str) -> Dict[str, List[str]]: | |
| return { | |
| "regulations": self._get_tax_regulations(country), | |
| "treaty": self._get_tax_treaty(country) | |
| } | |
| # ======================================= | |
| # Data Collector | |
| # ======================================= | |
| class InternationalStudentDataCollector: | |
| def __init__(self): | |
| self.llm = ChatOpenAI(temperature=0.1) | |
| self.cache: Dict[str, List[str]] = {} | |
| self.tax_db = TaxRegulationDatabase() | |
| def preload_common(self): | |
| log_workflow("Preloading data for common countries") | |
| self.tax_db.preload_common_countries() | |
| for c in ["India", "China"]: | |
| for fn in [self.get_banking_data, self.get_credit_data]: | |
| threading.Thread(target=fn, args=(c,), daemon=True).start() | |
| def _cached(self, key: str, prompt: str) -> List[str]: | |
| log_workflow(f"Collecting data for {key}") | |
| if key in self.cache: | |
| return self.cache[key] | |
| try: | |
| resp = self.llm.invoke(prompt) | |
| items = [line.strip() for line in resp.content.split("\n") if line.strip()] | |
| self.cache[key] = items | |
| return items | |
| except Exception as e: | |
| log_workflow(f"Error collecting {key}", str(e)) | |
| return [f"Error: {e}"] | |
| def get_banking_data(self, country: str) -> List[str]: | |
| return self._cached( | |
| f"banking_{country}", | |
| f"5 facts on banking for {country} students in the US, incl. banks, fees, docs." | |
| ) | |
| def get_credit_data(self, country: str) -> List[str]: | |
| return self._cached( | |
| f"credit_{country}", | |
| f"5 facts on credit building for {country} students: cards, history, pitfalls." | |
| ) | |
| # ======================================= | |
| # Shared RAG Knowledge Base Instances | |
| # ======================================= | |
| KB_INSTANCES: Dict[str, 'KnowledgeBase'] = {} | |
| COMMON_COUNTRIES = ["India", "China"] | |
| DOMAINS = ["banking", "credit", "tax"] | |
| # ======================================= | |
| # RAG Knowledge Base | |
| # ======================================= | |
| class KnowledgeBase: | |
| def __init__(self, domain: str): | |
| self.domain = domain | |
| self.collector = InternationalStudentDataCollector() | |
| self.embeddings = OpenAIEmbeddings() | |
| self.splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| self.vstores: Dict[str, Chroma] = {} | |
| self.retrievers: Dict[str, Any] = {} | |
| self.lock = threading.Lock() | |
| def _init_country(self, country: str): | |
| with self.lock: | |
| if country in self.vstores: | |
| return | |
| if self.domain == "banking": | |
| texts = self.collector.get_banking_data(country) | |
| elif self.domain == "credit": | |
| texts = self.collector.get_credit_data(country) | |
| elif self.domain == "tax": | |
| ti = self.collector.tax_db.get_tax_information(country) | |
| texts = ti.get("regulations", []) + ti.get("treaty", []) | |
| else: | |
| texts = [] | |
| if not texts: | |
| log_workflow(f"No texts for {self.domain}/{country}") | |
| with self.lock: | |
| self.vstores[country] = None | |
| self.retrievers[country] = None | |
| return | |
| splits = self.splitter.split_text("\n\n".join(texts)) | |
| if not splits: | |
| log_workflow(f"No splits for {self.domain}/{country}") | |
| with self.lock: | |
| self.vstores[country] = None | |
| self.retrievers[country] = None | |
| return | |
| store = Chroma.from_texts(splits, self.embeddings, collection_name=f"{self.domain}_{country}") | |
| retr = store.as_retriever(search_kwargs={"k":3}) | |
| with self.lock: | |
| self.vstores[country] = store | |
| self.retrievers[country] = retr | |
| log_workflow(f"Vector store ready for {self.domain}/{country}") | |
| def retrieve(self, query: str, country: str) -> List[str]: | |
| log_workflow(f"Retrieving {self.domain} for {country}") | |
| self._init_country(country) | |
| retr = self.retrievers.get(country) | |
| if not retr: | |
| log_workflow(f"Fallback direct for {self.domain}/{country}") | |
| if self.domain == "banking": return self.collector.get_banking_data(country) | |
| if self.domain == "credit": return self.collector.get_credit_data(country) | |
| if self.domain == "tax": | |
| ti = self.collector.tax_db.get_tax_information(country) | |
| return ti.get("regulations",[]) + ti.get("treaty",[]) | |
| return [] | |
| docs = retr.get_relevant_documents(query) | |
| return [d.page_content for d in docs] | |
| # Preload and register shared KBs | |
| for dom in DOMAINS: | |
| kb = KnowledgeBase(dom) | |
| KB_INSTANCES[dom] = kb | |
| for c in COMMON_COUNTRIES: | |
| threading.Thread(target=kb._init_country, args=(c,), daemon=True).start() | |
| # ======================================= | |
| # Specialist Agents | |
| # ======================================= | |
| class SpecialistAgent: | |
| def __init__(self, name: str, domain: str): | |
| self.name = name | |
| self.kb = KB_INSTANCES[domain] # use shared, preloaded KB | |
| self.llm = ChatOpenAI(temperature=0.2) | |
| def run(self, query: str, country: str) -> str: | |
| log_workflow(f"{self.name} analyzing") | |
| refs = self.kb.retrieve(query, country) | |
| context = "\n".join(f"- {r}" for r in refs) | |
| prompt = f"As {self.name} for {country}, context:\n{context}\nQuestion: {query}\nProvide detailed advice." | |
| resp = self.llm.invoke(prompt) | |
| log_workflow(f"{self.name} done") | |
| return resp.content | |
| # Instantiate specialists using shared KB | |
| BankingAdvisor = lambda: SpecialistAgent("Banking Advisor","banking") | |
| CreditBuilder = lambda: SpecialistAgent("Credit Builder","credit") | |
| TaxSpecialist = lambda: SpecialistAgent("Tax Specialist","tax") | |
| # ======================================= | |
| # Coordinator Agent | |
| # ======================================= | |
| class CoordinatorAgent: | |
| def __init__(self): | |
| self.llm = ChatOpenAI(temperature=0.3) | |
| self.specialists = { | |
| "banking": BankingAdvisor(), | |
| "credit": CreditBuilder(), | |
| "tax": TaxSpecialist() | |
| } | |
| def _identify_relevant_specialists(self, query: str) -> List[str]: | |
| """Identify which specialists are relevant to the query""" | |
| log_workflow("Analyzing query to identify relevant specialists") | |
| relevance_prompt = f""" | |
| Based on this financial query from an international student: | |
| "{query}" | |
| Which of the following specialist advisors should be consulted? Choose only the relevant ones. | |
| - banking (Banking Advisor: bank accounts, account types, transfers, documentation) | |
| - credit (Credit Builder: credit cards, credit scores, credit history) | |
| - tax (Tax Specialist: income taxes, tax treaties, FBAR, tax forms) | |
| Return a comma-separated list of ONLY the relevant domain codes (e.g., "banking,credit"). | |
| """ | |
| try: | |
| response = self.llm.invoke(relevance_prompt) | |
| domains = [domain.strip().lower() for domain in response.content.split(',')] | |
| valid_domains = [domain for domain in domains if domain in self.specialists] | |
| # Add tax domain if query mentions tax | |
| if "tax" not in valid_domains and "tax" in query.lower(): | |
| valid_domains.append("tax") | |
| log_workflow("Identified relevant specialists", {"domains": valid_domains}) | |
| return valid_domains | |
| except Exception as e: | |
| log_workflow("Error identifying specialists", str(e)) | |
| # Default to essential domains if there's an error | |
| default_domains = ["banking"] | |
| if "tax" in query.lower(): | |
| default_domains.append("tax") | |
| if "credit" in query.lower(): | |
| default_domains.append("credit") | |
| return default_domains | |
| def process_query(self, query: str, country: str) -> str: | |
| """Process a query from an international student""" | |
| log_workflow("Processing query", {"query": query, "country": country}) | |
| # Identify relevant specialists | |
| relevant_domains = self._identify_relevant_specialists(query) | |
| # Get advice from each relevant specialist | |
| specialist_advice = {} | |
| for domain in relevant_domains: | |
| specialist = self.specialists[domain] | |
| advice = specialist.run(query, country) | |
| specialist_advice[domain] = advice | |
| # Synthesize advice from specialists | |
| final_advice = self._synthesize_advice(query, country, specialist_advice) | |
| return final_advice | |
| def _synthesize_advice(self, query: str, country: str, specialist_advice: Dict[str, str]) -> str: | |
| """Synthesize advice from multiple specialists into a coherent response""" | |
| log_workflow("Synthesizing advice from specialists") | |
| # Create a consolidated advice text | |
| advice_sections = [] | |
| for domain, advice in specialist_advice.items(): | |
| advice_sections.append(f"## {domain.capitalize()} Advice\n\n{advice}") | |
| consolidated_advice = "\n\n".join(advice_sections) | |
| synthesis_prompt = f""" | |
| As a financial advisor for international students, synthesize this specialist advice into a coherent response. | |
| STUDENT QUERY: | |
| {query} | |
| COUNTRY: | |
| {country} | |
| SPECIALIST ADVICE: | |
| {consolidated_advice} | |
| Create a comprehensive, well-organized response that integrates all relevant advice. | |
| Begin with a summary of key recommendations, then provide detailed sections for each area. | |
| """ | |
| try: | |
| response = self.llm.invoke(synthesis_prompt) | |
| final_advice = response.content | |
| log_workflow("Synthesized final advice", {"length": len(final_advice)}) | |
| return final_advice | |
| except Exception as e: | |
| log_workflow("Error synthesizing advice", str(e)) | |
| # Fallback to concatenated advice | |
| return "# Financial Advice Summary\n\n" + consolidated_advice | |
| def run(self, query: str, profile: Dict[str,Any]) -> str: | |
| clear_workflow_log() | |
| country = profile.get("home_country", "unknown") | |
| q = query.lower() | |
| # 1. Collect domain-specific advice | |
| advice_map: Dict[str,str] = {} | |
| if "bank" in q or "account" in q: | |
| advice_map["banking"] = self.specialists["banking"].run(query, country) | |
| if "credit" in q: | |
| advice_map["credit"] = self.specialists["credit"].run(query, country) | |
| if "tax" in q or "treaty" in q: | |
| advice_map["tax"] = self.specialists["tax"].run(query, country) | |
| if not advice_map: | |
| advice_map["banking"] = self.specialists["banking"].run(query, country) | |
| # 2. Generate multi-path plans as JSON | |
| plans_prompt = ( | |
| f"As a financial advisor for international students from {country}, create three financial strategies for:" | |
| f"Goal: {query}" | |
| "Return your response as a JSON object with keys \"conservative\", \"balanced\", and \"growth\"." | |
| ) | |
| try: | |
| plans_resp = self.llm.invoke(plans_prompt) | |
| plans_text = plans_resp.content | |
| # Clean up the response to ensure it's valid JSON | |
| plans_text = plans_text.strip() | |
| if plans_text.startswith("```json"): | |
| plans_text = plans_text.split("```json")[1] | |
| if plans_text.endswith("```"): | |
| plans_text = plans_text.split("```")[0] | |
| plans = json.loads(plans_text) | |
| except Exception as e: | |
| log_workflow("Error generating multi-path plans", str(e)) | |
| plans = { | |
| "conservative": "Conservative investment strategy focusing on safety", | |
| "balanced": "Balanced approach with moderate risk and return", | |
| "growth": "Growth-oriented strategy with higher risk and potential return" | |
| } | |
| # 3. Build the formatted output using a string builder approach | |
| lines: List[str] = [] | |
| lines.append("# Your Personalized Financial Advice") | |
| for domain, text in advice_map.items(): | |
| lines.append(f"## {domain.capitalize()}") | |
| for paragraph in text.split("\n\n"): # Split by paragraphs | |
| lines.append(paragraph) | |
| lines.append("") # Add empty line between paragraphs | |
| lines.append("## Multi-Path Plans") | |
| lines.append("```") | |
| lines.append(json.dumps(plans, indent=2)) | |
| lines.append("```") | |
| formatted = "\n".join(lines) | |
| log_workflow("Synthesis complete") | |
| return f"{formatted}\n\n---\n\n{get_workflow_log()}" | |
| # ======================================= | |
| # Main Portal Interface | |
| # ======================================= | |
| class FinancePortal: | |
| """Main interface for the International Student Finance Portal""" | |
| def __init__(self): | |
| """Initialize the finance portal with a coordinator agent""" | |
| try: | |
| self.coordinator = CoordinatorAgent() | |
| self.student_profiles = {} # Initialize the student profiles dictionary | |
| # Preload data for common countries | |
| self._preload_data() | |
| except Exception as e: | |
| log_workflow(f"Error initializing Finance Portal: {str(e)}") | |
| print(f"Error initializing Finance Portal: {str(e)}") | |
| def _preload_data(self): | |
| """Preload data for common countries to improve performance""" | |
| log_workflow("Preloading data for common countries at startup") | |
| try: | |
| # Create data collector and start preloading | |
| data_collector = InternationalStudentDataCollector() | |
| data_collector.preload_common_countries() | |
| except Exception as e: | |
| log_workflow(f"Error preloading data: {str(e)}") | |
| print(f"Error preloading data: {str(e)}") | |
| # Continue without preloaded data - it will be loaded on demand | |
| def register_student(self, student_id: str, profile: Dict[str, Any]): | |
| """Register a new student profile""" | |
| self.student_profiles[student_id] = profile | |
| def get_student_profile(self, student_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a student's profile""" | |
| return self.student_profiles.get(student_id) | |
| def handle_query(self, student_id: str, query: str) -> str: | |
| """Process a student query""" | |
| profile = self.get_student_profile(student_id) | |
| if not profile: | |
| return "Please provide your profile information first." | |
| if not query or query.strip() == "": | |
| return "Please enter a specific financial question." | |
| log_workflow(f"Processing query for student {student_id}", {"query": query[:50]}) | |
| # Clear workflow log for new query | |
| clear_workflow_log() | |
| try: | |
| # Process the query with the coordinator | |
| response = self.coordinator.run(query, profile) | |
| # Get the workflow log | |
| workflow_log = get_workflow_log() | |
| # Combine the response and workflow log | |
| full_response = f"{response}\n\n---\n\n{workflow_log}" | |
| return full_response | |
| except Exception as e: | |
| log_workflow(f"Error handling query", str(e)) | |
| # Return the error with the workflow log | |
| workflow_log = get_workflow_log() | |
| return f"I encountered an error while processing your request: {str(e)}\n\n---\n\n{workflow_log}" | |
| def create_interface(): | |
| """Create the Gradio interface for the finance portal""" | |
| log_workflow("Initializing Finance Portal and preloading data") | |
| portal = FinancePortal() | |
| log_workflow("Finance Portal initialized successfully") | |
| def handle_query(query, country, visa_type, university, funding, additional_info): | |
| """Handler for query submission""" | |
| if not query or query.strip() == "": | |
| return "Please enter a financial question." | |
| if not country: | |
| return "Please select your home country." | |
| if not visa_type: | |
| return "Please select your visa type." | |
| # Create a composite student profile | |
| student_id = "current_user" | |
| profile = { | |
| "home_country": country, | |
| "visa_type": visa_type, | |
| "university": university, | |
| "funding": funding, | |
| "additional_info": additional_info | |
| } | |
| portal.register_student(student_id, profile) | |
| return portal.handle_query(student_id, query) | |
| # Create Gradio interface | |
| with gr.Blocks(title="International Student Finance Portal") as demo: | |
| gr.Markdown("# International Student Finance Portal") | |
| gr.Markdown("Get personalized financial advice tailored for international graduate students with visible workflow.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| country = gr.Dropdown( | |
| label="Home Country", | |
| choices=["", "India", "China", "Brazil", "South Korea", "Saudi Arabia", | |
| "Canada", "Mexico", "Taiwan", "Japan", "Vietnam", "Other"], | |
| value="" | |
| ) | |
| visa_type = gr.Dropdown( | |
| label="Visa Type", | |
| choices=["", "F-1", "J-1", "M-1", "Other"], | |
| value="" | |
| ) | |
| university = gr.Textbox( | |
| label="University", | |
| placeholder="e.g., Stanford University" | |
| ) | |
| funding = gr.Dropdown( | |
| label="Primary Funding Source", | |
| choices=["", "Self/Family", "Scholarship", "TA/RA Position", "Education Loan", "Other"], | |
| value="" | |
| ) | |
| additional_info = gr.Textbox( | |
| label="Additional Information (Optional)", | |
| placeholder="Program, expected duration, family situation, etc." | |
| ) | |
| # Predefined query templates | |
| query_templates = gr.Dropdown( | |
| label="Common Questions (Select or type your own below)", | |
| choices=[ | |
| "", | |
| "How do I open a bank account as an international student?", | |
| "What's the best way to build credit in the US?", | |
| "How should I manage my TA/RA stipend?", | |
| "What are my options for sending/receiving money from home?", | |
| "How do CPT/OPT affect my financial situation?", | |
| "What student loan options are available to me?", | |
| "How should I budget for living expenses in the US?", | |
| "What tax forms do I need to file as an international student?", | |
| "How do tax treaties affect my stipend as an international student?", | |
| "I just arrived in the US from India on an F-1 visa to start my PhD program at MIT with a teaching assistantship. I need advice on opening a bank account with minimal fees, building credit from scratch since I have no US history, sending money between India and the US at the best rates, managing my $2,500 monthly TA stipend while saving for emergencies, and understanding tax implications under the US-India tax treaty. Also, how should I financially prepare for a potential CPT internship next summer?" | |
| ], | |
| value="" | |
| ) | |
| query = gr.Textbox( | |
| label="Your Financial Question", | |
| placeholder="Type your financial question here...", | |
| lines=4 | |
| ) | |
| # Update query box when template is selected | |
| query_templates.change( | |
| fn=lambda x: x if x else "", | |
| inputs=query_templates, | |
| outputs=query | |
| ) | |
| submit_btn = gr.Button("Get Financial Advice", variant="primary") | |
| clear_btn = gr.Button("Reset") | |
| with gr.Column(scale=3): | |
| # Use a textbox with markdown enabled | |
| with gr.Group(): | |
| gr.Markdown("### Your Personalized Financial Advice") | |
| response = gr.Markdown() | |
| # Add a loading message while waiting for response | |
| submit_btn.click( | |
| fn=lambda: "## Processing Your Query\n\nConsulting specialist advisors and generating multiple financial approaches...\n\nPlease wait a moment as this may take up to a minute.", | |
| inputs=None, | |
| outputs=response, | |
| queue=False | |
| ) | |
| # Handle main query submission | |
| submit_btn.click( | |
| fn=handle_query, | |
| inputs=[query, country, visa_type, university, funding, additional_info], | |
| outputs=response, | |
| queue=True | |
| ) | |
| # Handle reset button | |
| clear_btn.click( | |
| fn=lambda: ( | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "", | |
| "" | |
| ), | |
| inputs=None, | |
| outputs=[query, country, visa_type, university, funding, additional_info, response] | |
| ) | |
| # Feature explanation section | |
| with gr.Accordion("How This System Works", open=False): | |
| gr.Markdown(""" | |
| ### Financial Advisory Features | |
| This portal uses advanced AI with multiple agent design patterns to provide personalized financial guidance: | |
| 1. **Retrieval Augmented Generation (RAG)**: Uses vector embeddings to retrieve country-specific financial knowledge | |
| - Preloads common data at startup for faster responses | |
| - Dynamically retrieves relevant information based on your query | |
| 2. **Role-based Cooperation**: Specialized agents collaborate based on their domain expertise | |
| - Banking Advisor: Account setup, transfers, banking documentation | |
| - Credit Builder: Credit cards, credit history building, credit scores | |
| - Budget Manager: Expense tracking, savings goals, stipend management | |
| - Currency Exchange Specialist: International transfers, exchange rates | |
| - Student Loan Advisor: Loan options, repayment strategies | |
| - Career Finance Planner: CPT/OPT financial planning, internships | |
| - Legal Finance Advisor: Visa compliance, reporting requirements | |
| - Tax Specialist: Income taxes, tax treaties, tax forms, FBAR filing | |
| 3. **Voting-based Cooperation**: Specialists vote on recommendations when multiple options exist | |
| 4. **Self-reflection**: Legal/visa compliance check on all financial advice | |
| 5. **Multi-path Plan Generator**: Different financial strategies based on risk tolerance | |
| The workflow log at the bottom of each response shows you exactly which components ran and in what order. | |
| """) | |
| # API key notice | |
| if not api_key: | |
| gr.Markdown(""" | |
| > **Note**: This application may be running without an OpenAI API key. For full functionality, | |
| > please set the OPENAI_API_KEY environment variable in your Hugging Face Space secrets. | |
| """) | |
| return demo | |
| # Main method to run the application | |
| if __name__ == "__main__": | |
| print("Starting International Student Finance Portal with Visible Workflow...") | |
| try: | |
| # Create and launch the interface | |
| interface = create_interface() | |
| interface.launch(server_name="0.0.0.0") # Use 0.0.0.0 to make it accessible on Hugging Face | |
| except Exception as e: | |
| print(f"Error launching the interface: {str(e)}") | |
| # Try fallback options if the main interface fails | |
| try: | |
| print("Attempting to launch with minimal interface...") | |
| with gr.Blocks() as fallback_demo: | |
| gr.Markdown("# International Student Finance Portal") | |
| gr.Markdown(""" | |
| There was an error initializing the full application. | |
| Please check that: | |
| 1. You have set the OPENAI_API_KEY environment variable | |
| 2. All dependencies are installed correctly | |
| 3. The application has sufficient memory to run | |
| """) | |
| fallback_demo.launch(server_name="0.0.0.0") | |
| except Exception as fallback_error: | |
| print(f"Error launching fallback interface: {str(fallback_error)}") | |
| # If all else fails, just exit - Hugging Face will show an error") | |
| print("Please install required packages: pip install -r requirements.txt") | |
| sys.exit(1) | |
| # Set up API Key - Modified for Hugging Face Spaces | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| print("WARNING: OPENAI_API_KEY environment variable not set. Please set it in Hugging Face Spaces secrets.") | |
| # For development environment only, not in Hugging Face: | |
| if not os.path.exists("/.dockerenv"): # Not in a Hugging Face docker container | |
| api_key = input("Please enter your OpenAI API key: ") | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| # Configure ChromaDB path for Hugging Face | |
| os.environ["CHROMADB_DEFAULT_DATABASE_DIR"] = "/tmp/chromadb" | |
| # Global workflow log to track the execution flow | |
| WORKFLOW_LOG = [] | |
| def log_workflow(step, details=None): | |
| """Add a step to the workflow log""" | |
| timestamp = time.strftime("%H:%M:%S") | |
| entry = {"time": timestamp, "step": step} | |
| if details: | |
| entry["details"] = details | |
| WORKFLOW_LOG.append(entry) | |
| print(f"[{timestamp}] {step}{': ' + str(details) if details else ''}") | |
| def get_workflow_log(): | |
| """Get the workflow log as formatted text""" | |
| if not WORKFLOW_LOG: | |
| return "No workflow steps recorded yet." | |
| log_text = "## Workflow Execution Log:\n\n" | |
| for entry in WORKFLOW_LOG: | |
| log_text += f"**[{entry['time']}]** {entry['step']}\n" | |
| if 'details' in entry and entry['details']: | |
| details = entry['details'] | |
| if isinstance(details, dict): | |
| for k, v in details.items(): | |
| if isinstance(v, str) and len(v) > 100: | |
| details[k] = v[:100] + "..." | |
| log_text += f"```{details}```\n" | |
| else: | |
| log_text += f"{details}\n" | |
| return log_text | |
| def clear_workflow_log(): | |
| """Clear the workflow log""" | |
| global WORKFLOW_LOG | |
| WORKFLOW_LOG = [] | |
| # ======================================= | |
| # Tax Regulation Database | |
| # ======================================= | |
| class TaxRegulationDatabase: | |
| """Database of tax regulations for international students""" | |
| def __init__(self): | |
| """Initialize the tax regulation database""" | |
| self.llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo") | |
| self.tax_regulations = {} | |
| self.tax_treaties = {} | |
| self.lock = threading.Lock() | |
| def preload_common_countries(self): | |
| """Preload tax regulations for common countries""" | |
| common_countries = ["India", "China", "South Korea", "Brazil", "Saudi Arabia", | |
| "Canada", "Mexico", "Taiwan", "Japan", "Vietnam"] | |
| log_workflow("Preloading tax regulations for common countries") | |
| for country in common_countries: | |
| # Start loading in background threads to avoid blocking startup | |
| thread = threading.Thread(target=self._load_country_tax_info, args=(country,)) | |
| thread.daemon = True | |
| thread.start() | |
| def _load_country_tax_info(self, country): | |
| """Load tax information for a specific country""" | |
| self._get_tax_regulations(country) | |
| self._get_tax_treaty(country) | |
| def _get_tax_regulations(self, country): | |
| """Get tax regulations for a specific country""" | |
| if country in self.tax_regulations: | |
| return self.tax_regulations[country] | |
| log_workflow(f"Loading tax regulations for {country}") | |
| prompt = f""" | |
| Provide 5 specific, factual statements about tax regulations that directly affect international students from {country} studying in the United States. | |
| Focus on: | |
| 1. FICA tax exemption status for F-1/J-1 students from {country} | |
| 2. Federal income tax filing requirements for {country} students | |
| 3. State tax considerations specifically relevant to {country} students | |
| 4. Any special tax forms required for {country} citizens (beyond standard 1040NR, 8843, etc.) | |
| 5. Tax implications for various types of income (scholarships, stipends, OPT income, passive income) | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact form numbers, specific dollar thresholds, and deadlines where applicable. | |
| """ | |
| try: | |
| response = self.llm.invoke(prompt) | |
| regulations = [line.strip() for line in response.content.split('\n') if line.strip()] | |
| with self.lock: | |
| self.tax_regulations[country] = regulations | |
| log_workflow(f"Loaded {len(regulations)} tax regulations for {country}") | |
| return regulations | |
| except Exception as e: | |
| log_workflow(f"Error loading tax regulations for {country}", str(e)) | |
| return [f"Error retrieving tax regulations for {country}: {str(e)}"] | |
| def _get_tax_treaty(self, country): | |
| """Get tax treaty information for a specific country""" | |
| if country in self.tax_treaties: | |
| return self.tax_treaties[country] | |
| log_workflow(f"Loading tax treaty information for {country}") | |
| prompt = f""" | |
| Provide 5 specific, factual statements about the tax treaty between the United States and {country} that are especially relevant to students. | |
| Focus on: | |
| 1. Specific treaty articles that apply to students/scholars | |
| 2. Income exemption limits with exact dollar amounts and time limits | |
| 3. Special provisions for research assistants or teaching assistants from {country} | |
| 4. Documentation required to claim treaty benefits as a {country} student | |
| 5. Step-by-step process for claiming treaty benefits on tax returns | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact article numbers, specific dollar thresholds, and time periods where applicable. | |
| If there is no tax treaty with {country}, state this fact and provide alternative information relevant to {country} nationals. | |
| """ | |
| try: | |
| response = self.llm.invoke(prompt) | |
| treaty_info = [line.strip() for line in response.content.split('\n') if line.strip()] | |
| with self.lock: | |
| self.tax_treaties[country] = treaty_info | |
| log_workflow(f"Loaded {len(treaty_info)} tax treaty facts for {country}") | |
| return treaty_info | |
| except Exception as e: | |
| log_workflow(f"Error loading tax treaty for {country}", str(e)) | |
| return [f"Error retrieving tax treaty information for {country}: {str(e)}"] | |
| def get_tax_information(self, country): | |
| """Get comprehensive tax information for a specific country""" | |
| regulations = self._get_tax_regulations(country) | |
| treaty = self._get_tax_treaty(country) | |
| return { | |
| "regulations": regulations, | |
| "treaty": treaty | |
| } | |
| # ======================================= | |
| # Data Collector | |
| # ======================================= | |
| class InternationalStudentDataCollector: | |
| """Collects financial data for international students from different countries""" | |
| def __init__(self): | |
| """Initialize the data collector with a model for generating data""" | |
| self.llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo") | |
| self.cache = {} | |
| self.tax_database = TaxRegulationDatabase() | |
| def preload_common_countries(self): | |
| """Preload data for common source countries""" | |
| log_workflow("Preloading data for common source countries") | |
| # Start tax database preloading | |
| self.tax_database.preload_common_countries() | |
| # Common countries to preload | |
| common_countries = ["India", "China"] | |
| # Preload basic information for common domains | |
| for country in common_countries: | |
| for domain_func in [self.get_banking_data, self.get_credit_data]: | |
| thread = threading.Thread(target=domain_func, args=(country,)) | |
| thread.daemon = True | |
| thread.start() | |
| def _get_data_with_caching(self, prompt_key, prompt): | |
| """Get data with caching to avoid repeated API calls""" | |
| log_workflow(f"Collecting data for {prompt_key}") | |
| if prompt_key in self.cache: | |
| log_workflow("Using cached data") | |
| return self.cache[prompt_key] | |
| try: | |
| response = self.llm.invoke(prompt) | |
| facts = [line.strip() for line in response.content.split('\n') if line.strip()] | |
| self.cache[prompt_key] = facts | |
| log_workflow(f"Collected {len(facts)} facts") | |
| return facts | |
| except Exception as e: | |
| log_workflow("Error collecting data", str(e)) | |
| return [f"Error retrieving information: {str(e)}"] | |
| def get_banking_data(self, country): | |
| """Get banking information for international students from specific country""" | |
| prompt_key = f"banking_{country.lower()}" | |
| banking_prompt = f""" | |
| Provide 5 specific, actionable facts about banking options for international students from {country} in the United States. | |
| Focus on: | |
| 1. The best US banks that offer accounts for {country} students with minimal fees | |
| 2. Exact documentation requirements for {country} students to open an account | |
| 3. Special features available to international students from {country} | |
| 4. Precise fee structures and minimum balances for recommended accounts | |
| 5. Best options for international money transfers between {country} and US | |
| Format as a list of factual, specific statements, one per line. | |
| Be extremely specific and include bank names, exact documentation needed, and fee amounts where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, banking_prompt) | |
| def get_credit_data(self, country): | |
| """Get credit building information for international students from specific country""" | |
| prompt_key = f"credit_{country.lower()}" | |
| credit_prompt = f""" | |
| Provide 5 specific, actionable facts about credit building options for international students from {country} in the United States. | |
| Focus on: | |
| 1. Exact credit card options available to {country} students without US credit history (with specific bank names) | |
| 2. Precisely how {country} credit history can or cannot be used in the US (e.g., Nova Credit) | |
| 3. Detailed secured credit card requirements and deposit amounts for specific cards | |
| 4. Step-by-step strategies for building credit scores for {country} nationals | |
| 5. Specific credit-building pitfalls that {country} students should avoid | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact credit card names, specific dollar amounts for deposits, and precise steps where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, credit_prompt) | |
| def get_budget_data(self, country): | |
| """Get budget management information for international students from specific country""" | |
| prompt_key = f"budget_{country.lower()}" | |
| budget_prompt = f""" | |
| Provide 5 specific, actionable facts about budget management for international students from {country} in the United States. | |
| Focus on: | |
| 1. Exact breakdown of typical monthly expenses for {country} students in the US (with dollar amounts) | |
| 2. Specific money transfer services popular with {country} students (with fee structures) | |
| 3. Detailed tax implications for {country} students with TA/RA stipends (including tax treaty benefits) | |
| 4. Names of specific budget apps or tools popular with {country} students | |
| 5. Step-by-step plan for managing a $2,500 monthly TA stipend, including saving for emergencies | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact dollar amounts, percentages, and specific service names where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, budget_prompt) | |
| def get_currency_data(self, country): | |
| """Get currency exchange information for international students from specific country""" | |
| prompt_key = f"currency_{country.lower()}" | |
| currency_prompt = f""" | |
| Provide 5 specific, actionable facts about currency exchange and international money transfers for {country} students in the US. | |
| Focus on: | |
| 1. Current exchange rate trends between {country} currency and USD (with specific ranges) | |
| 2. Exact fee structures of money transfer services for {country}-US transfers (Wise, Remitly, etc.) | |
| 3. Specific regulatory considerations for moving money from {country} to US (limits, documentation) | |
| 4. Precise breakdown of hidden fees and exchange rate markups typical in {country}-US transfers | |
| 5. Step-by-step strategies for optimizing currency exchange for {country} students | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact service names, fee percentages, and dollar amounts where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, currency_prompt) | |
| def get_loan_data(self, country): | |
| """Get student loan information for international students from specific country""" | |
| prompt_key = f"loan_{country.lower()}" | |
| loan_prompt = f""" | |
| Provide 5 specific, actionable facts about student loan options for international students from {country} studying in the US. | |
| Focus on: | |
| 1. Names of specific education loan providers in {country} for international study (with interest rates) | |
| 2. Exact US-based lenders that serve {country} students without US cosigners (Prodigy, MPOWER, etc.) | |
| 3. Precise interest rates and terms for various {country} student loan options | |
| 4. Specific collateral requirements for loans to {country} students (with dollar amounts) | |
| 5. Names of loan forgiveness or assistance programs available to {country} students | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact lender names, interest rate percentages, and dollar amounts where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, loan_prompt) | |
| def get_career_data(self, country): | |
| """Get career financial planning information for international students from specific country""" | |
| prompt_key = f"career_{country.lower()}" | |
| career_prompt = f""" | |
| Provide 5 specific, actionable facts about career financial planning for international students from {country} in the US. | |
| Focus on: | |
| 1. Exact F-1 visa work restrictions and opportunities (with hour limits and eligible positions) | |
| 2. Detailed CPT/OPT regulations affecting {country} students (application timeline, costs) | |
| 3. Step-by-step financial planning for summer internships specifically for {country} students | |
| 4. Specific post-graduation work authorization financial considerations (with costs and timeline) | |
| 5. Precise salary negotiation strategies and benefits evaluation for {country} nationals | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact hour limits, application fees, timeline durations, and dollar amounts where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, career_prompt) | |
| def get_legal_data(self, country): | |
| """Get legal financial information for international students from specific country""" | |
| prompt_key = f"legal_{country.lower()}" | |
| legal_prompt = f""" | |
| Provide 5 specific, actionable facts about legal financial considerations for international students from {country} in the US. | |
| Focus on: | |
| 1. Exact visa maintenance financial requirements for {country} students (with dollar amounts) | |
| 2. Specific tax treaty benefits between US and {country} (with article numbers and percentage rates) | |
| 3. Detailed FBAR and foreign account reporting requirements for {country} nationals ($10,000 threshold, etc.) | |
| 4. Precise financial documentation needed for visa renewals/applications (with dollar amounts) | |
| 5. Specific legal implications of different types of income for {country} students on F-1 visas | |
| Format as a list of factual, specific statements, one per line. | |
| Include exact dollar thresholds, tax treaty article numbers, and specific form names where possible. | |
| """ | |
| return self._get_data_with_caching(prompt_key, legal_prompt) | |
| def get_tax_data(self, country): | |
| """Get comprehensive tax information for international students from specific country""" | |
| return self.tax_database.get_tax_information(country) | |
| # ======================================= | |
| # Knowledge Base (RAG Implementation) | |
| # ======================================= | |
| class KnowledgeBase: | |
| """RAG implementation for domain-specific knowledge retrieval""" | |
| def __init__(self, domain: str): | |
| """Initialize the knowledge base for a specific domain""" | |
| self.domain = domain | |
| self.vector_stores = {} # Dictionary to store vector stores by country | |
| self.retrievers = {} # Dictionary to store retrievers by country | |
| self.data_collector = InternationalStudentDataCollector() | |
| self.embeddings = OpenAIEmbeddings() | |
| self.lock = threading.Lock() | |
| def _initialize_for_country(self, country: str): | |
| """Initialize the vector store for a specific country""" | |
| domain_key = f"{self.domain}_{country.lower()}" | |
| # Check if already initialized | |
| with self.lock: | |
| if country.lower() in self.vector_stores: | |
| log_workflow("Using existing vector store") | |
| return | |
| log_workflow(f"Initializing knowledge base", {"domain": self.domain, "country": country}) | |
| # Get country-specific data from the data collector | |
| if self.domain == "banking": | |
| domain_texts = self.data_collector.get_banking_data(country) | |
| elif self.domain == "credit": | |
| domain_texts = self.data_collector.get_credit_data(country) | |
| elif self.domain == "budget": | |
| domain_texts = self.data_collector.get_budget_data(country) | |
| elif self.domain == "currency": | |
| domain_texts = self.data_collector.get_currency_data(country) | |
| elif self.domain == "loans": | |
| domain_texts = self.data_collector.get_loan_data(country) | |
| elif self.domain == "career": | |
| domain_texts = self.data_collector.get_career_data(country) | |
| elif self.domain == "legal": | |
| domain_texts = self.data_collector.get_legal_data(country) | |
| elif self.domain == "tax": | |
| tax_info = self.data_collector.get_tax_data(country) | |
| domain_texts = tax_info["regulations"] + tax_info["treaty"] | |
| else: | |
| domain_texts = [f"General information for {self.domain} domain for {country} international students."] | |
| log_workflow(f"Creating vector store with {len(domain_texts)} documents") | |
| # Create text splitter for chunking | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| splits = text_splitter.split_text("\n\n".join(domain_texts)) | |
| # Create vector store with embeddings | |
| try: | |
| vector_store = Chroma.from_texts( | |
| splits, | |
| self.embeddings, | |
| collection_name=domain_key | |
| ) | |
| # Create retriever for similarity search | |
| retriever = vector_store.as_retriever( | |
| search_type="similarity", | |
| search_kwargs={"k": 3} | |
| ) | |
| with self.lock: | |
| self.vector_stores[country.lower()] = vector_store | |
| self.retrievers[country.lower()] = retriever | |
| log_workflow("Vector store created successfully") | |
| except Exception as e: | |
| log_workflow("Error creating vector store", str(e)) | |
| # We'll fall back to direct retrieval if vector storage fails | |
| def retrieve(self, query: str, country: str) -> List[str]: | |
| """Retrieve relevant information using vector similarity search""" | |
| log_workflow(f"RAG Pattern: Retrieving {self.domain} knowledge", {"query": query[:50], "country": country}) | |
| try: | |
| # Initialize the vector store if needed | |
| self._initialize_for_country(country) | |
| # Check if retriever exists for this country | |
| country_key = country.lower() | |
| with self.lock: | |
| if country_key in self.retrievers: | |
| retriever = self.retrievers[country_key] | |
| else: | |
| raise ValueError(f"Retriever not initialized for {country}") | |
| # Use the retriever to find similar content | |
| documents = retriever.get_relevant_documents(query) | |
| results = [doc.page_content for doc in documents] | |
| log_workflow(f"Retrieved {len(results)} relevant documents") | |
| return results | |
| except Exception as e: | |
| log_workflow("Error in RAG retrieval, falling back to direct retrieval", str(e)) | |
| # Fallback to direct retrieval if vector storage fails | |
| if self.domain == "banking": | |
| return self.data_collector.get_banking_data(country) | |
| elif self.domain == "credit": | |
| return self.data_collector.get_credit_data(country) | |
| elif self.domain == "budget": | |
| return self.data_collector.get_budget_data(country) | |
| elif self.domain == "currency": | |
| return self.data_collector.get_currency_data(country) | |
| elif self.domain == "loans": | |
| return self.data_collector.get_loan_data(country) | |
| elif self.domain == "career": | |
| return self.data_collector.get_career_data(country) | |
| elif self.domain == "legal": | |
| return self.data_collector.get_legal_data(country) | |
| elif self.domain == "tax": | |
| tax_info = self.data_collector.get_tax_data(country) | |
| return tax_info["regulations"] + tax_info["treaty"] | |
| else: | |
| return [f"Information about {self.domain} for {country} international students."] | |
| # ======================================= | |
| # Domain Specialist Agents | |
| # ======================================= | |
| class SpecialistAgent: | |
| """Base class for specialist agents with domain expertise""" | |
| def __init__(self, name: str, domain: str, llm=None): | |
| """Initialize a specialist agent with domain expertise""" | |
| self.name = name | |
| self.domain = domain | |
| self.knowledge_base = KnowledgeBase(domain) | |
| self.llm = llm if llm else ChatOpenAI(temperature=0.2) | |
| def run(self, query: str, country: str) -> str: | |
| """Run the specialist agent to get domain-specific advice""" | |
| log_workflow(f"Role-based Cooperation: {self.name} analyzing query", {"query": query[:50]}) | |
| # Get country-specific knowledge using RAG | |
| knowledge = self.knowledge_base.retrieve(query, country) | |
| # Join the knowledge items with newlines | |
| knowledge_text = "\n".join('- ' + item for item in knowledge) | |
| # Prepare a detailed prompt with the knowledge and query | |
| prompt = f""" | |
| As a specialist {self.name} for international students, provide detailed, specific financial advice for a student from {country}. | |
| STUDENT QUERY: | |
| {query} | |
| RELEVANT KNOWLEDGE FROM RAG: | |
| {knowledge_text} | |
| Provide extremely detailed, actionable advice addressing the query with these requirements: | |
| 1. Include specific bank/service/product names with exact fees or rates where applicable | |
| 2. Provide step-by-step instructions for any processes (account opening, credit building, etc.) | |
| 3. Include specific dollar amounts, percentages, and time frames | |
| 4. List exact documentation requirements where relevant | |
| 5. Address all aspects of the query related to your domain of {self.domain} | |
| Format your response with clear sections, bullet points, and numbered steps. | |
| """ | |
| try: | |
| log_workflow(f"{self.name} generating advice") | |
| response = self.llm.invoke(prompt) | |
| advice = response.content | |
| log_workflow(f"{self.name} generated advice", {"length": len(advice)}) | |
| return advice | |
| except Exception as e: | |
| log_workflow(f"Error in {self.name}", str(e)) | |
| return f"The {self.name} encountered an issue: {str(e)}" | |
| # Specialized agent implementations | |
| class BankingAdvisor(SpecialistAgent): | |
| """Specialist agent for banking advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Banking Advisor", domain="banking", llm=llm) | |
| class CreditBuilder(SpecialistAgent): | |
| """Specialist agent for credit building advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Credit Builder", domain="credit", llm=llm) | |
| class BudgetManager(SpecialistAgent): | |
| """Specialist agent for budget management advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Budget Manager", domain="budget", llm=llm) | |
| class CurrencyExchangeSpecialist(SpecialistAgent): | |
| """Specialist agent for currency exchange advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Currency Exchange Specialist", domain="currency", llm=llm) | |
| class StudentLoanAdvisor(SpecialistAgent): | |
| """Specialist agent for student loan advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Student Loan Advisor", domain="loans", llm=llm) | |
| class CareerFinancePlanner(SpecialistAgent): | |
| """Specialist agent for career financial planning advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Career Finance Planner", domain="career", llm=llm) | |
| class LegalFinanceAdvisor(SpecialistAgent): | |
| """Specialist agent for legal financial advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Legal Finance Advisor", domain="legal", llm=llm) | |
| class TaxSpecialist(SpecialistAgent): | |
| """Specialist agent for tax advice""" | |
| def __init__(self, llm=None): | |
| super().__init__(name="Tax Specialist", domain="tax", llm=llm) | |
| # ======================================= | |
| # Coordinator Agent (Central Agent) | |
| # ======================================= | |
| class CoordinatorAgent: | |
| """Central coordinator agent that orchestrates specialist agents""" | |
| def __init__(self, llm=None): | |
| """Initialize the coordinator agent""" | |
| self.llm = llm if llm else ChatOpenAI(temperature=0.3) | |
| # Initialize specialist agents | |
| self.banking_advisor = BankingAdvisor(self.llm) | |
| self.credit_builder = CreditBuilder(self.llm) | |
| self.budget_manager = BudgetManager(self.llm) | |
| self.currency_specialist = CurrencyExchangeSpecialist(self.llm) | |
| self.loan_advisor = StudentLoanAdvisor(self.llm) | |
| self.career_planner = CareerFinancePlanner(self.llm) | |
| self.legal_advisor = LegalFinanceAdvisor(self.llm) | |
| self.tax_specialist = TaxSpecialist(self.llm) | |
| # Map domains to specialists | |
| self.specialists = { | |
| "banking": self.banking_advisor, | |
| "credit": self.credit_builder, | |
| "budget": self.budget_manager, | |
| "currency": self.currency_specialist, | |
| "loans": self.loan_advisor, | |
| "career": self.career_planner, | |
| "legal": self.legal_advisor, | |
| "tax": self.tax_specialist | |
| } | |
| def _identify_relevant_specialists(self, query: str) -> List[str]: | |
| """Identify which specialists are relevant to the query""" | |
| log_workflow("Analyzing query to identify relevant specialists") | |
| relevance_prompt = f""" | |
| Based on this financial query from an international student: | |
| "{query}" | |
| Which of the following specialist advisors should be consulted? Choose only the relevant ones. | |
| - banking (Banking Advisor: bank accounts, account types, transfers, documentation) | |
| - credit (Credit Builder: credit cards, credit scores, credit history) | |
| - budget (Budget Manager: expense tracking, savings, stipend management) | |
| - currency (Currency Exchange Specialist: exchange rates, money transfers) | |
| - loans (Student Loan Advisor: educational loans, repayment strategies) | |
| - career (Career Finance Planner: internships, CPT/OPT, job preparation) | |
| - legal (Legal Finance Advisor: visa regulations, tax implications) | |
| - tax (Tax Specialist: income taxes, tax treaties, FBAR, tax forms) | |
| Return a comma-separated list of ONLY the relevant domain codes (e.g., "banking,credit"). | |
| """ | |
| try: | |
| response = self.llm.invoke(relevance_prompt) | |
| domains = [domain.strip().lower() for domain in response.content.split(',')] | |
| valid_domains = [domain for domain in domains if domain in self.specialists] | |
| # Add budget domain if query mentions stipend or expenses | |
| if "budget" not in valid_domains and ("stipend" in query.lower() or "expense" in query.lower()): | |
| valid_domains.append("budget") | |
| # Add tax domain if query mentions tax | |
| if "tax" not in valid_domains and "tax" in query.lower(): | |
| valid_domains.append("tax") | |
| # Add legal domain if query mentions visa | |
| if "legal" not in valid_domains and "visa" in query.lower(): | |
| valid_domains.append("legal") | |
| # Add career domain if query mentions internship, CPT, or OPT | |
| if "career" not in valid_domains and any(term in query.lower() for term in ["internship", "cpt", "opt"]): | |
| valid_domains.append("career") | |
| log_workflow("Identified relevant specialists", {"domains": valid_domains}) | |
| return valid_domains | |
| except Exception as e: | |
| log_workflow("Error identifying specialists", str(e)) | |
| # Default to essential domains if there's an error | |
| default_domains = ["banking", "budget"] | |
| if "tax" in query.lower(): | |
| default_domains.append("tax") | |
| if "credit" in query.lower(): | |
| default_domains.append("credit") | |
| return default_domains | |
| def _conduct_vote(self, question: str, options: List[str], country: str) -> Dict[str, Any]: | |
| """Implement voting-based cooperation between specialists""" | |
| log_workflow("Voting-based Cooperation: Specialists voting on options", | |
| {"question": question[:50], "options": options}) | |
| voting_results = {option: 0 for option in options} | |
| specialist_votes = {} | |
| # Create options text separately | |
| options_text = "\n".join([f"{i+1}. {option}" for i, option in enumerate(options)]) | |
| voting_prompt = f""" | |
| As a financial advisor for international students from {country}, which of the following options would you recommend? | |
| QUESTION: {question} | |
| OPTIONS: | |
| {options_text} | |
| Analyze the options carefully, then respond with ONLY the number of your recommendation (e.g., "1" or "2"). | |
| """ | |
| # Select appropriate specialists for voting | |
| relevant_domains = self._identify_relevant_specialists(question) | |
| for domain in relevant_domains: | |
| specialist = self.specialists[domain] | |
| try: | |
| response = self.llm.invoke(voting_prompt) | |
| vote_text = response.content.strip() | |
| # Try to extract a number from the response | |
| vote = None | |
| for i, option in enumerate(options): | |
| if str(i+1) in vote_text: | |
| vote = options[i] | |
| break | |
| if vote is None and len(options) > 0: | |
| vote = options[0] # Default to first option if parsing fails | |
| if vote in voting_results: | |
| voting_results[vote] += 1 | |
| specialist_votes[domain] = vote | |
| log_workflow(f"{domain.capitalize()} voted for: {vote}") | |
| except Exception as e: | |
| log_workflow(f"Error during voting from {domain}", str(e)) | |
| # Find the winner | |
| winner = max(voting_results.items(), key=lambda x: x[1]) if voting_results else (options[0], 0) | |
| log_workflow(f"Voting complete, winner determined", | |
| {"winner": winner[0], "vote_count": winner[1]}) | |
| return { | |
| "winner": winner[0], | |
| "votes": voting_results, | |
| "specialist_votes": specialist_votes | |
| } |