import json import os import random import re import numpy as np from datetime import datetime, date import math import hashlib import requests from collections import deque from bs4 import BeautifulSoup import urllib.parse import feedparser class FreeWebBot: def __init__(self, state_file="/tmp/free_bot_state.json"): self.state_file = state_file self.conversation_memory = deque(maxlen=200) self.learned_patterns = {} self.response_memory = {} self.reward_history = deque(maxlen=300) # Learning parameters self.learning_rate = 0.3 self.exploration_rate = 0.1 # Web settings self.search_timeout = 10 self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # Free news sources (RSS feeds) self.news_feeds = { "general": [ "https://feeds.bbci.co.uk/news/rss.xml", "https://rss.cnn.com/rss/edition.rss", "https://feeds.reuters.com/reuters/topNews", ], "technology": [ "https://feeds.arstechnica.com/arstechnica/index", "https://techcrunch.com/feed/", ], "sports": [ "https://feeds.espn.com/espn/rss/news", ] } # Load existing state self.load_state() print(f"Free web bot initialized with {len(self.learned_patterns)} learned patterns") def chat(self, user_input, use_web_search=True): """Main chat method - returns (response, search_used, sources)""" user_input = user_input.strip() if not user_input: return "Please enter a message.", False, [] # First, try factual responses factual_response = self._get_factual_response(user_input) if factual_response: return factual_response, False, [] # Try free web search for current information if use_web_search and self._should_search_web(user_input): web_content, sources = self._free_web_search(user_input) if web_content and web_content.strip(): response = self._create_web_answer(user_input, web_content, sources) self._store_interaction(user_input, response, 0.8, sources) return response, True, sources # Fallback to learned responses response = self._get_learned_response(user_input) self._store_interaction(user_input, response, 0.5, []) return response, False, [] def _should_search_web(self, user_input): """Determine if we should search the web for this query""" input_lower = user_input.lower() search_triggers = [ 'news', 'current', 'latest', 'today', 'recent', 'update', 'weather', 'forecast', 'temperature', 'sports', 'score', 'game', 'match', 'stock', 'crypto', 'bitcoin', 'price', 'how to', 'tutorial', 'guide', 'explain', 'what is', 'who is', 'where is', 'when was', 'breaking', 'headlines' ] return any(trigger in input_lower for trigger in search_triggers) def _free_web_search(self, query): """Perform free web search using multiple methods""" sources = [] all_content = [] # Method 1: RSS Feeds for news/current events if any(topic in query.lower() for topic in ['news', 'current', 'latest', 'today', 'breaking']): feed_content = self._search_rss_feeds(query) all_content.extend(feed_content) if feed_content: sources.append("News Feeds") # Method 2: Wikipedia for factual information if any(word in query.lower() for word in ['what is', 'who is', 'explain', 'definition']): wiki_content = self._search_wikipedia(query) if wiki_content: all_content.append(wiki_content) sources.append("Wikipedia") # Method 3: DuckDuckGo for general search ddg_content = self._search_duckduckgo(query) if ddg_content: all_content.append(ddg_content) sources.append("Web Search") # Method 4: Weather information if any(word in query.lower() for word in ['weather', 'temperature', 'forecast']): weather_content = self._get_weather_info(query) if weather_content: all_content.append(weather_content) sources.append("Weather Service") # Combine all content combined_content = " ".join(all_content) return combined_content, sources def _search_rss_feeds(self, query): """Search RSS feeds for current information""" content = [] query_words = query.lower().split() # Determine feed category based on query category = "general" if any(word in query.lower() for word in ['tech', 'technology', 'ai', 'computer', 'software']): category = "technology" elif any(word in query.lower() for word in ['sports', 'game', 'score', 'match', 'team']): category = "sports" try: for feed_url in self.news_feeds.get(category, self.news_feeds["general"]): try: feed = feedparser.parse(feed_url) for entry in feed.entries[:5]: # Top 5 entries title = entry.get('title', '') summary = entry.get('summary', '') # Check if entry matches query entry_text = f"{title} {summary}".lower() if any(word in entry_text for word in query_words) or len(query_words) < 2: content.append(f"{title}: {summary}") if len(content) >= 3: # Limit to 3 results break except Exception as e: print(f"Error parsing feed {feed_url}: {e}") continue if content: break except Exception as e: print(f"RSS feed error: {e}") return content def _search_wikipedia(self, query): """Search Wikipedia for factual information""" try: # Clean query for Wikipedia clean_query = re.sub(r'(what is|who is|explain|definition of)', '', query, flags=re.IGNORECASE).strip() clean_query = clean_query.replace('?', '').strip() if not clean_query: return "" # Wikipedia API (completely free) url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(clean_query)}" response = requests.get( url, headers={'User-Agent': self.user_agent}, timeout=self.search_timeout ) if response.status_code == 200: data = response.json() extract = data.get('extract', '') if extract: return f"According to Wikipedia: {extract}" except Exception as e: print(f"Wikipedia search error: {e}") return "" def _search_duckduckgo(self, query): """Search DuckDuckGo for general information""" try: url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" headers = { 'User-Agent': self.user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', } response = requests.get(url, headers=headers, timeout=self.search_timeout) soup = BeautifulSoup(response.text, 'html.parser') results = [] for result in soup.find_all('div', class_='result')[:3]: title_elem = result.find('a', class_='result__a') snippet_elem = result.find('a', class_='result__snippet') if title_elem and snippet_elem: title = title_elem.get_text().strip() snippet = snippet_elem.get_text().strip() results.append(f"{title}: {snippet}") return " ".join(results) if results else "" except Exception as e: print(f"DuckDuckGo search error: {e}") return "" def _get_weather_info(self, query): """Get weather information from free sources""" try: # Extract location from query location = self._extract_location(query) if not location: location = "New York" # Default location # Use free weather API url = f"http://wttr.in/{urllib.parse.quote(location)}?format=%C+%t+%w+%h" response = requests.get(url, timeout=self.search_timeout) if response.status_code == 200: weather_data = response.text.strip() return f"Weather in {location}: {weather_data}" except Exception as e: print(f"Weather error: {e}") return "" def _extract_location(self, text): """Extract location from text""" locations = { 'new york', 'london', 'paris', 'tokyo', 'berlin', 'sydney', 'toronto', 'mumbai', 'beijing', 'moscow', 'dubai', 'rome', 'madrid', 'amsterdam', 'chicago', 'los angeles', 'san francisco', 'seattle', 'boston', 'miami' } text_lower = text.lower() for location in locations: if location in text_lower: return location return "" def _create_web_answer(self, user_input, web_content, sources): """Create answer using web content""" if not web_content or not web_content.strip(): return "I searched but couldn't find current information about that. Could you try rephrasing your question?" # Clean and format the content sentences = re.split(r'[.!?]+', web_content) meaningful_sentences = [s.strip() for s in sentences if len(s.strip()) > 10] if not meaningful_sentences: return "I found some information but couldn't extract a clear answer. Try asking more specifically." # Use the most relevant sentences answer_sentences = meaningful_sentences[:3] answer = ". ".join(answer_sentences) # Ensure the answer ends with proper punctuation if not answer.endswith(('.', '!', '?')): answer += "." # Add source attribution if available if sources: source_text = ", ".join(sources) answer += f"\n\nSources: {source_text}" return answer def _get_factual_response(self, user_input): """Provide factual responses without web search""" input_lower = user_input.lower() # Time and date responses if any(word in input_lower for word in ['time', 'clock', 'hour']): current_time = datetime.now().strftime("%I:%M %p") return f"The current time is {current_time}." if any(word in input_lower for word in ['date', 'today', 'day month']): current_date = date.today().strftime("%A, %B %d, %Y") return f"Today is {current_date}." if any(word in input_lower for word in ['day of week', 'what day']): current_day = date.today().strftime("%A") return f"Today is {current_day}." # Math calculations math_result = self._solve_math(user_input) if math_result: return math_result # About the bot if any(word in input_lower for word in ['your name', 'who are you']): return "I'm Phoenix AI, a completely free chatbot with web search capabilities!" if any(word in input_lower for word in ['what can you do', 'capabilities']): return "I can: Answer questions, search the web for free, do math, tell time/date, and learn from our conversations!" if any(word in input_lower for word in ['help', 'what can you help with']): return "I can help you with: current news, weather information, factual questions, calculations, and general conversation. I learn from our chats too!" return "" def _solve_math(self, user_input): """Solve mathematical expressions""" try: # Simple arithmetic if re.search(r'\d+\s*[\+\-\*\/]\s*\d+', user_input): numbers = re.findall(r'\d+', user_input) if len(numbers) >= 2: a, b = int(numbers[0]), int(numbers[1]) if '+' in user_input: return f"{a} + {b} = {a + b}" elif '-' in user_input: return f"{a} - {b} = {a - b}" elif '*' in user_input or '×' in user_input: return f"{a} × {b} = {a * b}" elif '/' in user_input or '÷' in user_input: if b == 0: return "I cannot divide by zero - that's mathematically undefined!" result = a / b return f"{a} ÷ {b} = {result:.2f}" # Square roots sqrt_match = re.search(r'sqrt\(?(\d+)\)?', user_input) if sqrt_match: num = int(sqrt_match.group(1)) if num < 0: return "I cannot calculate square roots of negative numbers!" result = math.sqrt(num) return f"√{num} = {result:.2f}" # Powers power_match = re.search(r'(\d+)\s*\^\s*(\d+)', user_input) if power_match: base, exponent = int(power_match.group(1)), int(power_match.group(2)) result = base ** exponent return f"{base}^{exponent} = {result}" except Exception as e: print(f"Math solving error: {e}") return "" def _get_learned_response(self, user_input): """Get response from learned patterns or generate contextual response""" # Find similar patterns similar_patterns = self._find_similar_patterns(user_input) if similar_patterns: best_pattern = max(similar_patterns, key=lambda x: x[1]['score']) if best_pattern[1]['score'] > 0.6: return best_pattern[1]['response'] # Generate contextual response return self._generate_contextual_response(user_input) def _find_similar_patterns(self, text): """Find similar learned patterns""" similar = [] text_words = set(text.lower().split()) for pattern, data in self.learned_patterns.items(): pattern_words = set(pattern.lower().split()) common_words = text_words.intersection(pattern_words) if common_words: similarity = len(common_words) / len(text_words.union(pattern_words)) if similarity > 0.3: similar.append((pattern, data)) return similar def _generate_contextual_response(self, user_input): """Generate contextual response when no specific pattern matches""" context = self._analyze_input(user_input) if context['has_question']: responses = [ "That's an interesting question. Based on what I know, ", "I appreciate your question. From my understanding, ", "That's a great question. I've been learning that ", ] base_response = random.choice(responses) if context['topics']: topic = random.choice(context['topics']) return base_response + f"{topic} is quite fascinating. What specifically would you like to know?" else: return base_response + "this topic has many interesting aspects. Could you tell me more about what you're curious about?" # Conversational responses for statements conversational_responses = [ "I understand. Tell me more about that.", "That's interesting. What are your thoughts on this?", "I appreciate you sharing that. How do you feel about it?", "That's fascinating. I'm learning from our conversation.", "I see. Could you elaborate on that?", ] return random.choice(conversational_responses) def _analyze_input(self, text): """Analyze user input for context""" words = text.split() return { 'words': words, 'topics': self._extract_topics(text), 'has_question': '?' in text, 'sentiment': self._analyze_sentiment(text), 'word_count': len(words) } def _extract_topics(self, text): """Extract topics from text""" topics = [] text_lower = text.lower() topic_categories = { 'technology': ['tech', 'computer', 'ai', 'software', 'code', 'internet', 'programming'], 'science': ['science', 'research', 'discover', 'physics', 'biology', 'chemistry'], 'sports': ['sports', 'game', 'team', 'player', 'score', 'match', 'tournament'], 'entertainment': ['movie', 'music', 'show', 'celebrity', 'film', 'song'], 'health': ['health', 'medical', 'fitness', 'diet', 'exercise', 'nutrition'] } for topic, keywords in topic_categories.items(): if any(keyword in text_lower for keyword in keywords): topics.append(topic) return topics def _analyze_sentiment(self, text): """Basic sentiment analysis""" positive_words = ['love', 'like', 'good', 'great', 'awesome', 'happy', 'excited', 'amazing'] negative_words = ['hate', 'bad', 'terrible', 'awful', 'sad', 'angry', 'upset'] text_lower = text.lower() positive_count = sum(1 for word in positive_words if word in text_lower) negative_count = sum(1 for word in negative_words if word in text_lower) if positive_count > negative_count: return "positive" elif negative_count > positive_count: return "negative" else: return "neutral" def _store_interaction(self, user_input, response, reward, sources): """Store interaction in memory""" interaction = { 'input': user_input, 'response': response, 'reward': reward, 'sources': sources, 'timestamp': datetime.now().isoformat() } self.conversation_memory.append(interaction) self._update_learning(user_input, response, reward) def _update_learning(self, user_input, response, reward): """Update learning from interaction""" # Extract key phrases from input for pattern learning words = [word for word in user_input.split() if len(word) > 3][:4] if words: pattern = ' '.join(words) if pattern not in self.learned_patterns: self.learned_patterns[pattern] = { 'response': response, 'score': reward, 'count': 1 } else: old_data = self.learned_patterns[pattern] new_score = (old_data['score'] * old_data['count'] + reward) / (old_data['count'] + 1) self.learned_patterns[pattern]['score'] = new_score self.learned_patterns[pattern]['count'] += 1 # Store in response memory response_hash = hashlib.md5(response.encode()).hexdigest()[:8] if response_hash not in self.response_memory: self.response_memory[response_hash] = { 'response': response, 'total_score': reward, 'count': 1, 'avg_score': reward } else: memory = self.response_memory[response_hash] memory['total_score'] += reward memory['count'] += 1 memory['avg_score'] = memory['total_score'] / memory['count'] # Store reward self.reward_history.append(reward) # Save state periodically if len(self.conversation_memory) % 10 == 0: self.save_state() def learn_from_feedback(self, user_input, reward): """Learn from explicit user feedback""" if self.conversation_memory: # Update the most recent interaction recent_interaction = self.conversation_memory[-1] recent_interaction['reward'] = reward self._update_learning(recent_interaction['input'], recent_interaction['response'], reward) def get_learning_stats(self): """Get learning statistics""" recent_rewards = list(self.reward_history)[-10:] or [0.5] return { 'patterns': len(self.learned_patterns), 'memory_size': len(self.conversation_memory), 'avg_score': float(np.mean(recent_rewards)), 'recent_rewards': len([r for r in recent_rewards if r > 0.7]) } def save_state(self): """Save learning state to file""" try: state = { 'learned_patterns': self.learned_patterns, 'response_memory': self.response_memory, 'conversation_memory': list(self.conversation_memory), 'reward_history': list(self.reward_history), 'last_saved': datetime.now().isoformat() } with open(self.state_file, 'w') as f: json.dump(state, f, indent=2) except Exception as e: print(f"Error saving state: {e}") def load_state(self): """Load learning state from file""" try: if os.path.exists(self.state_file): with open(self.state_file, 'r') as f: state = json.load(f) self.learned_patterns = state.get('learned_patterns', {}) self.response_memory = state.get('response_memory', {}) self.conversation_memory = deque(state.get('conversation_memory', []), maxlen=200) self.reward_history = deque(state.get('reward_history', []), maxlen=300) print(f"Loaded state with {len(self.learned_patterns)} patterns") except Exception as e: print(f"Error loading state: {e}")