""" Chatbot engine with RAG pipeline and proactive/reactive logic. Adapted for the AI Trading Experiment with parameter-aware responses. Supports multiple LLM providers: - HuggingFace Inference API (free tier available) - DeepSeek API - Fallback rule-based responses """ import os import sys import random import requests from typing import Optional, List, Tuple, Dict, Any from dataclasses import dataclass from enum import Enum # Attempt pysqlite3 workaround for HF Spaces try: __import__('pysqlite3') sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') except ImportError: pass # Not in HF Spaces, use default sqlite3 # LangChain imports - using langchain_community for newer versions try: # Try newer import paths first (langchain >= 0.1.0) from langchain_community.document_loaders import TextLoader from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain_core.language_models.llms import LLM except ImportError: # Fall back to older import paths (langchain < 0.1.0) from langchain.document_loaders import TextLoader from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.llms.base import LLM from config import Scenario, ResearcherControlledParams, ParticipantVisibleParams # Configuration KNOWLEDGE_BASE_DIR = "knowledge_base" VECTOR_DB_DIR = "db/vectorstore" class LLMProvider(Enum): """Available LLM providers.""" HUGGINGFACE = "huggingface" DEEPSEEK = "deepseek" FALLBACK = "fallback" # ==================== LLM Provider Selection ==================== # Check which API keys are available and select provider def get_llm_provider() -> LLMProvider: """Determine which LLM provider to use based on available credentials.""" if os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_API_KEY"): return LLMProvider.HUGGINGFACE elif os.getenv("DEEPSEEK_API_KEY"): return LLMProvider.DEEPSEEK else: print("Warning: No LLM API key found. Using fallback responses.") print("Set HF_TOKEN for HuggingFace or DEEPSEEK_API_KEY for DeepSeek.") return LLMProvider.FALLBACK # ==================== HuggingFace LLM ==================== # Recommended free/cheap models (smallest to largest): # - "microsoft/Phi-3-mini-4k-instruct" # 3.8B params, very fast # - "Qwen/Qwen2-1.5B-Instruct" # 1.5B params, smallest # - "HuggingFaceH4/zephyr-7b-beta" # 7B params, good quality # - "mistralai/Mistral-7B-Instruct-v0.2" # 7B params, popular # - "meta-llama/Llama-2-7b-chat-hf" # 7B params, requires approval DEFAULT_HF_MODEL = "HuggingFaceH4/zephyr-7b-beta" class HuggingFaceLLM(LLM): """LLM wrapper for HuggingFace Inference API (free tier available).""" api_key: str = "" model_id: str = DEFAULT_HF_MODEL temperature: float = 0.7 max_tokens: int = 512 def __init__(self, model_id: str = None, **kwargs): super().__init__(**kwargs) # Try multiple possible env var names self.api_key = ( os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_API_KEY") or "" ) if model_id: self.model_id = model_id if self.api_key: print(f"Using HuggingFace model: {self.model_id}") else: print("Warning: No HuggingFace token found.") def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: if not self.api_key: return self._fallback_response(prompt) # HuggingFace Inference API endpoint api_url = f"https://api-inference.huggingface.co/models/{self.model_id}" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } # Format prompt for instruction-tuned models formatted_prompt = f"""<|system|> You are an AI trading advisor in the TradeVerse financial ecosystem. Provide helpful, concise advice. <|user|> {prompt} <|assistant|> """ payload = { "inputs": formatted_prompt, "parameters": { "max_new_tokens": self.max_tokens, "temperature": self.temperature, "do_sample": True, "return_full_text": False } } try: response = requests.post(api_url, headers=headers, json=payload, timeout=60) # Handle model loading (HF free tier may need to load model) if response.status_code == 503: data = response.json() wait_time = data.get("estimated_time", 20) print(f"Model loading, waiting {wait_time}s...") import time time.sleep(min(wait_time, 30)) response = requests.post(api_url, headers=headers, json=payload, timeout=60) response.raise_for_status() data = response.json() # Handle different response formats if isinstance(data, list) and len(data) > 0: return data[0].get("generated_text", "").strip() elif isinstance(data, dict): return data.get("generated_text", "").strip() return self._fallback_response(prompt) except Exception as e: print(f"HuggingFace API error: {e}") return self._fallback_response(prompt) def _fallback_response(self, prompt: str) -> str: """Generate a basic response when API is unavailable.""" return FallbackLLM()._call(prompt) @property def _llm_type(self) -> str: return "huggingface_inference" # ==================== DeepSeek LLM ==================== DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions" class DeepSeekLLM(LLM): """LLM wrapper for DeepSeek API.""" api_key: str = "" temperature: float = 0.7 max_tokens: int = 512 def __init__(self, **kwargs): super().__init__(**kwargs) self.api_key = os.getenv("DEEPSEEK_API_KEY", "") if self.api_key: print("Using DeepSeek API") def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: if not self.api_key: return FallbackLLM()._call(prompt) headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": "You are an AI trading advisor in the TradeVerse financial ecosystem."}, {"role": "user", "content": prompt} ], "temperature": self.temperature, "max_tokens": self.max_tokens } try: response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"].strip() except Exception as e: print(f"DeepSeek API error: {e}") return FallbackLLM()._call(prompt) @property def _llm_type(self) -> str: return "deepseek_api" # ==================== Fallback LLM (Rule-based) ==================== class FallbackLLM(LLM): """ Rule-based fallback when no API is available. Generates responses based on scenario context and parameters. """ def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: """Generate a context-aware response without an LLM.""" prompt_lower = prompt.lower() # Detect recommendation requests if "buy" in prompt_lower and "recommend" in prompt_lower: return self._generate_buy_response(prompt) elif "sell" in prompt_lower and "recommend" in prompt_lower: return self._generate_sell_response(prompt) elif "hold" in prompt_lower and "recommend" in prompt_lower: return self._generate_hold_response(prompt) # Detect question types if "risk" in prompt_lower: return "When evaluating risk, consider the company's debt levels, market volatility, and any red flags like insider selling or unusual trading volume. The current scenario presents factors that warrant careful consideration." if "insider" in prompt_lower or "trading volume" in prompt_lower: return "Unusual insider activity or trading volume can signal that informed parties have information not yet public. This is often a warning sign that warrants caution." if "sector" in prompt_lower or "industry" in prompt_lower: return "Sector trends significantly impact individual companies. Consider broader market conditions, regulatory environment, and competitive dynamics when making your decision." # Default analytical response return "Based on the available information, I'd encourage you to weigh the key factors mentioned in the scenario. Consider both the potential opportunities and the risk factors before making your decision." def _generate_buy_response(self, prompt: str) -> str: return "Based on my analysis, buying could be appropriate here. The positive signals suggest potential upside, though you should consider your risk tolerance and the size of your position carefully." def _generate_sell_response(self, prompt: str) -> str: return "Based on my analysis, selling may be prudent. The risk factors present suggest potential downside that could outweigh staying invested. Consider protecting your capital." def _generate_hold_response(self, prompt: str) -> str: return "Based on my analysis, holding your position seems reasonable. The situation shows mixed signals, and waiting for more clarity before acting could be the wisest approach." @property def _llm_type(self) -> str: return "fallback_rules" # ==================== LLM Factory ==================== def create_llm(provider: LLMProvider = None, model_id: str = None) -> LLM: """Factory function to create the appropriate LLM instance.""" if provider is None: provider = get_llm_provider() if provider == LLMProvider.HUGGINGFACE: return HuggingFaceLLM(model_id=model_id) elif provider == LLMProvider.DEEPSEEK: return DeepSeekLLM() else: return FallbackLLM() # ==================== Chat Response ==================== @dataclass class ChatResponse: """Response from the chatbot.""" message: str is_proactive: bool confidence_level: str # "low", "medium", "high" sources_used: List[str] # ==================== Trading Chatbot ==================== class TradingChatbot: """ AI Chatbot for the trading experiment. Supports both proactive advice and reactive queries. """ def __init__(self, llm_provider: LLMProvider = None, model_id: str = None): self.llm = create_llm(llm_provider, model_id) self.vectorstore = None self.chat_history: List[Tuple[str, str]] = [] self._initialize_knowledge_base() def _initialize_knowledge_base(self): """Load and index the knowledge base documents.""" docs = [] if os.path.exists(KNOWLEDGE_BASE_DIR): for filename in os.listdir(KNOWLEDGE_BASE_DIR): if filename.endswith(".txt"): filepath = os.path.join(KNOWLEDGE_BASE_DIR, filename) try: loader = TextLoader(filepath) docs.extend(loader.load()) except Exception as e: print(f"Error loading {filename}: {e}") if not docs: print("Warning: No knowledge base documents found.") return # Split documents into chunks splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) split_docs = splitter.split_documents(docs) texts = [doc.page_content for doc in split_docs] metadatas = [{"source": doc.metadata.get("source", "unknown")} for doc in split_docs] # Create embeddings and vectorstore try: embedding_function = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) os.makedirs(VECTOR_DB_DIR, exist_ok=True) self.vectorstore = Chroma( persist_directory=VECTOR_DB_DIR, embedding_function=embedding_function ) self.vectorstore.add_texts(texts=texts, metadatas=metadatas) self.vectorstore.persist() print(f"Knowledge base initialized with {len(texts)} chunks.") except Exception as e: print(f"Error initializing vectorstore: {e}") def _get_confidence_framing(self, level: int) -> Dict[str, str]: """Get language framing based on confidence parameter.""" if level < 34: return { "prefix": "Based on the available information, one possibility is that", "verb": "might consider", "qualifier": "though there is considerable uncertainty", "level": "low" } elif level < 67: return { "prefix": "Looking at the situation,", "verb": "suggests", "qualifier": "while noting some risk factors", "level": "medium" } else: return { "prefix": "Based on my analysis,", "verb": "strongly recommend", "qualifier": "with high confidence", "level": "high" } def _get_depth_instructions(self, level: int) -> str: """Get explanation depth instructions based on parameter.""" if level < 34: return "Provide a very brief response (1-2 sentences maximum). Focus only on the key point." elif level < 67: return "Provide a moderate explanation (3-4 sentences). Include the main reasoning and key factors." else: return "Provide a detailed analysis. Cover all relevant factors, risks, and opportunities comprehensively." def _get_risk_framing(self, level: int) -> str: """Get risk perspective based on parameter.""" if level < 34: return "Emphasize potential risks and downside scenarios. Favor capital preservation over potential gains." elif level < 67: return "Balance potential risks and opportunities. Present a measured risk-reward analysis." else: return "Emphasize potential opportunities and upside. Be willing to tolerate higher risk for potential gains." def _get_style_instructions(self, level: int) -> str: """Get communication style instructions based on parameter.""" if level < 34: return "Use formal, professional language. Be precise and measured in your statements." elif level < 67: return "Use clear, accessible language. Be professional but approachable." else: return "Use conversational, friendly language. Be direct and engaging." def _retrieve_context(self, query: str, k: int = 4) -> str: """Retrieve relevant context from the knowledge base.""" if not self.vectorstore: return "" try: docs = self.vectorstore.similarity_search(query, k=k) return "\n\n".join([doc.page_content for doc in docs]) except Exception as e: print(f"Error retrieving context: {e}") return "" def generate_proactive_advice( self, scenario: Scenario, visible_params: ParticipantVisibleParams, hidden_params: ResearcherControlledParams ) -> Optional[ChatResponse]: """ Generate proactive advice for a scenario. Returns None if proactive advice should not be shown. """ # Check if we should show proactive advice based on proactivity level proactive_threshold = hidden_params.proactivity_level / 100 if random.random() > proactive_threshold: return None # Build the prompt confidence = self._get_confidence_framing(hidden_params.confidence_framing) depth = self._get_depth_instructions(visible_params.explanation_depth) risk = self._get_risk_framing(hidden_params.risk_bias) style = self._get_style_instructions(visible_params.communication_style) # Retrieve relevant context context = self._retrieve_context( f"{scenario.company_name} {scenario.sector} trading analysis" ) # Determine what factors to highlight factors_to_mention = [] if hidden_params.risk_bias < 50: factors_to_mention = scenario.red_flags[:2] if scenario.red_flags else scenario.key_factors[:2] else: factors_to_mention = scenario.positive_signals[:2] if scenario.positive_signals else scenario.key_factors[:2] prompt = f""" {style} {depth} {risk} You are an AI trading advisor. A participant is viewing this trading scenario: Company: {scenario.company_name} ({scenario.company_symbol}) Sector: {scenario.sector} Country: {scenario.country} Current Price: {scenario.current_price} credits Situation: {scenario.situation_description} Key factors to consider: {', '.join(factors_to_mention)} Relevant knowledge: {context} You should proactively offer some initial observations about this situation. {confidence['prefix']} the situation {confidence['verb']} careful attention {confidence['qualifier']}. Your recommendation should lean toward: {scenario.ai_recommendation} Generate a brief proactive message offering your initial take on this situation. Do NOT explicitly tell them to BUY, SELL, or HOLD yet - this is an initial observation. Keep it natural, as if you're an advisor noticing something they should be aware of. """ response_text = self.llm._call(prompt) return ChatResponse( message=response_text, is_proactive=True, confidence_level=confidence["level"], sources_used=["market_context", "company_profile"] ) def generate_ai_recommendation( self, scenario: Scenario, visible_params: ParticipantVisibleParams, hidden_params: ResearcherControlledParams ) -> ChatResponse: """ Generate the AI's recommendation for a scenario. This is the main advice given before the participant decides. """ confidence = self._get_confidence_framing(hidden_params.confidence_framing) depth = self._get_depth_instructions(visible_params.explanation_depth) risk = self._get_risk_framing(hidden_params.risk_bias) style = self._get_style_instructions(visible_params.communication_style) context = self._retrieve_context( f"{scenario.company_name} {scenario.sector} {scenario.ai_recommendation}" ) prompt = f""" {style} {depth} {risk} You are an AI trading advisor. Analyze this situation and provide your recommendation: Company: {scenario.company_name} ({scenario.company_symbol}) Sector: {scenario.sector} Country: {scenario.country} Current Price: {scenario.current_price} credits Situation: {scenario.situation_description} Key factors: {', '.join(scenario.key_factors)} Warning signs: {', '.join(scenario.red_flags) if scenario.red_flags else 'None identified'} Positive signals: {', '.join(scenario.positive_signals) if scenario.positive_signals else 'None identified'} Relevant market knowledge: {context} {confidence['prefix']} I {confidence['verb']} the participant to {scenario.ai_recommendation} {confidence['qualifier']}. Generate your recommendation. Be clear about your suggested action ({scenario.ai_recommendation}). Explain your reasoning according to the depth level specified. Frame risks according to the risk perspective specified. """ response_text = self.llm._call(prompt) return ChatResponse( message=response_text, is_proactive=False, confidence_level=confidence["level"], sources_used=["market_context", "company_profile", "trading_basics"] ) def answer_query( self, query: str, scenario: Optional[Scenario], visible_params: ParticipantVisibleParams, hidden_params: ResearcherControlledParams ) -> ChatResponse: """ Answer a participant's question (reactive query). """ depth = self._get_depth_instructions(visible_params.explanation_depth) style = self._get_style_instructions(visible_params.communication_style) risk = self._get_risk_framing(hidden_params.risk_bias) confidence = self._get_confidence_framing(hidden_params.confidence_framing) # Retrieve context based on the query context = self._retrieve_context(query) # Build scenario context if available scenario_context = "" if scenario: scenario_context = f""" Current scenario: Company: {scenario.company_name} ({scenario.company_symbol}) Sector: {scenario.sector} Situation: {scenario.situation_description} """ # Include chat history for context history_context = "" if self.chat_history: recent_history = self.chat_history[-3:] # Last 3 exchanges history_context = "Recent conversation:\n" + "\n".join( [f"User: {q}\nAI: {a}" for q, a in recent_history] ) prompt = f""" {style} {depth} {risk} You are an AI trading advisor in the TradeVerse. Answer the participant's question. {scenario_context} {history_context} Relevant knowledge from your database: {context} User question: {query} Guidelines: - Only use information from the TradeVerse (fictional universe) - If asked about real-world companies or markets, politely redirect to TradeVerse - {confidence['prefix'].lower()} frame your response {confidence['qualifier']} - Be helpful but don't make decisions for the participant Provide your response: """ response_text = self.llm._call(prompt) # Update chat history self.chat_history.append((query, response_text)) return ChatResponse( message=response_text, is_proactive=False, confidence_level=confidence["level"], sources_used=["knowledge_base"] ) def clear_history(self): """Clear the chat history for a new session.""" self.chat_history = [] # Singleton instance (uses auto-detected provider) chatbot = TradingChatbot()