Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import random | |
| import re | |
| import numpy as np | |
| from datetime import datetime, date | |
| import math | |
| import hashlib | |
| import requests | |
| from collections import deque | |
| from bs4 import BeautifulSoup | |
| import urllib.parse | |
| import feedparser | |
| class FreeWebBot: | |
| def __init__(self, state_file="/tmp/free_bot_state.json"): | |
| self.state_file = state_file | |
| self.conversation_memory = deque(maxlen=200) | |
| self.learned_patterns = {} | |
| self.response_memory = {} | |
| self.reward_history = deque(maxlen=300) | |
| # Learning parameters | |
| self.learning_rate = 0.3 | |
| self.exploration_rate = 0.1 | |
| # Web settings | |
| self.search_timeout = 10 | |
| self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
| # Free news sources (RSS feeds) | |
| self.news_feeds = { | |
| "general": [ | |
| "https://feeds.bbci.co.uk/news/rss.xml", | |
| "https://rss.cnn.com/rss/edition.rss", | |
| "https://feeds.reuters.com/reuters/topNews", | |
| ], | |
| "technology": [ | |
| "https://feeds.arstechnica.com/arstechnica/index", | |
| "https://techcrunch.com/feed/", | |
| ], | |
| "sports": [ | |
| "https://feeds.espn.com/espn/rss/news", | |
| ] | |
| } | |
| # Load existing state | |
| self.load_state() | |
| print(f"Free web bot initialized with {len(self.learned_patterns)} learned patterns") | |
| def chat(self, user_input, use_web_search=True): | |
| """Main chat method - returns (response, search_used, sources)""" | |
| user_input = user_input.strip() | |
| if not user_input: | |
| return "Please enter a message.", False, [] | |
| # First, try factual responses | |
| factual_response = self._get_factual_response(user_input) | |
| if factual_response: | |
| return factual_response, False, [] | |
| # Try free web search for current information | |
| if use_web_search and self._should_search_web(user_input): | |
| web_content, sources = self._free_web_search(user_input) | |
| if web_content and web_content.strip(): | |
| response = self._create_web_answer(user_input, web_content, sources) | |
| self._store_interaction(user_input, response, 0.8, sources) | |
| return response, True, sources | |
| # Fallback to learned responses | |
| response = self._get_learned_response(user_input) | |
| self._store_interaction(user_input, response, 0.5, []) | |
| return response, False, [] | |
| def _should_search_web(self, user_input): | |
| """Determine if we should search the web for this query""" | |
| input_lower = user_input.lower() | |
| search_triggers = [ | |
| 'news', 'current', 'latest', 'today', 'recent', 'update', | |
| 'weather', 'forecast', 'temperature', | |
| 'sports', 'score', 'game', 'match', | |
| 'stock', 'crypto', 'bitcoin', 'price', | |
| 'how to', 'tutorial', 'guide', 'explain', | |
| 'what is', 'who is', 'where is', 'when was', | |
| 'breaking', 'headlines' | |
| ] | |
| return any(trigger in input_lower for trigger in search_triggers) | |
| def _free_web_search(self, query): | |
| """Perform free web search using multiple methods""" | |
| sources = [] | |
| all_content = [] | |
| # Method 1: RSS Feeds for news/current events | |
| if any(topic in query.lower() for topic in ['news', 'current', 'latest', 'today', 'breaking']): | |
| feed_content = self._search_rss_feeds(query) | |
| all_content.extend(feed_content) | |
| if feed_content: | |
| sources.append("News Feeds") | |
| # Method 2: Wikipedia for factual information | |
| if any(word in query.lower() for word in ['what is', 'who is', 'explain', 'definition']): | |
| wiki_content = self._search_wikipedia(query) | |
| if wiki_content: | |
| all_content.append(wiki_content) | |
| sources.append("Wikipedia") | |
| # Method 3: DuckDuckGo for general search | |
| ddg_content = self._search_duckduckgo(query) | |
| if ddg_content: | |
| all_content.append(ddg_content) | |
| sources.append("Web Search") | |
| # Method 4: Weather information | |
| if any(word in query.lower() for word in ['weather', 'temperature', 'forecast']): | |
| weather_content = self._get_weather_info(query) | |
| if weather_content: | |
| all_content.append(weather_content) | |
| sources.append("Weather Service") | |
| # Combine all content | |
| combined_content = " ".join(all_content) | |
| return combined_content, sources | |
| def _search_rss_feeds(self, query): | |
| """Search RSS feeds for current information""" | |
| content = [] | |
| query_words = query.lower().split() | |
| # Determine feed category based on query | |
| category = "general" | |
| if any(word in query.lower() for word in ['tech', 'technology', 'ai', 'computer', 'software']): | |
| category = "technology" | |
| elif any(word in query.lower() for word in ['sports', 'game', 'score', 'match', 'team']): | |
| category = "sports" | |
| try: | |
| for feed_url in self.news_feeds.get(category, self.news_feeds["general"]): | |
| try: | |
| feed = feedparser.parse(feed_url) | |
| for entry in feed.entries[:5]: # Top 5 entries | |
| title = entry.get('title', '') | |
| summary = entry.get('summary', '') | |
| # Check if entry matches query | |
| entry_text = f"{title} {summary}".lower() | |
| if any(word in entry_text for word in query_words) or len(query_words) < 2: | |
| content.append(f"{title}: {summary}") | |
| if len(content) >= 3: # Limit to 3 results | |
| break | |
| except Exception as e: | |
| print(f"Error parsing feed {feed_url}: {e}") | |
| continue | |
| if content: | |
| break | |
| except Exception as e: | |
| print(f"RSS feed error: {e}") | |
| return content | |
| def _search_wikipedia(self, query): | |
| """Search Wikipedia for factual information""" | |
| try: | |
| # Clean query for Wikipedia | |
| clean_query = re.sub(r'(what is|who is|explain|definition of)', '', query, flags=re.IGNORECASE).strip() | |
| clean_query = clean_query.replace('?', '').strip() | |
| if not clean_query: | |
| return "" | |
| # Wikipedia API (completely free) | |
| url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(clean_query)}" | |
| response = requests.get( | |
| url, | |
| headers={'User-Agent': self.user_agent}, | |
| timeout=self.search_timeout | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| extract = data.get('extract', '') | |
| if extract: | |
| return f"According to Wikipedia: {extract}" | |
| except Exception as e: | |
| print(f"Wikipedia search error: {e}") | |
| return "" | |
| def _search_duckduckgo(self, query): | |
| """Search DuckDuckGo for general information""" | |
| try: | |
| url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" | |
| headers = { | |
| 'User-Agent': self.user_agent, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| } | |
| response = requests.get(url, headers=headers, timeout=self.search_timeout) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| results = [] | |
| for result in soup.find_all('div', class_='result')[:3]: | |
| title_elem = result.find('a', class_='result__a') | |
| snippet_elem = result.find('a', class_='result__snippet') | |
| if title_elem and snippet_elem: | |
| title = title_elem.get_text().strip() | |
| snippet = snippet_elem.get_text().strip() | |
| results.append(f"{title}: {snippet}") | |
| return " ".join(results) if results else "" | |
| except Exception as e: | |
| print(f"DuckDuckGo search error: {e}") | |
| return "" | |
| def _get_weather_info(self, query): | |
| """Get weather information from free sources""" | |
| try: | |
| # Extract location from query | |
| location = self._extract_location(query) | |
| if not location: | |
| location = "New York" # Default location | |
| # Use free weather API | |
| url = f"http://wttr.in/{urllib.parse.quote(location)}?format=%C+%t+%w+%h" | |
| response = requests.get(url, timeout=self.search_timeout) | |
| if response.status_code == 200: | |
| weather_data = response.text.strip() | |
| return f"Weather in {location}: {weather_data}" | |
| except Exception as e: | |
| print(f"Weather error: {e}") | |
| return "" | |
| def _extract_location(self, text): | |
| """Extract location from text""" | |
| locations = { | |
| 'new york', 'london', 'paris', 'tokyo', 'berlin', 'sydney', 'toronto', | |
| 'mumbai', 'beijing', 'moscow', 'dubai', 'rome', 'madrid', 'amsterdam', | |
| 'chicago', 'los angeles', 'san francisco', 'seattle', 'boston', 'miami' | |
| } | |
| text_lower = text.lower() | |
| for location in locations: | |
| if location in text_lower: | |
| return location | |
| return "" | |
| def _create_web_answer(self, user_input, web_content, sources): | |
| """Create answer using web content""" | |
| if not web_content or not web_content.strip(): | |
| return "I searched but couldn't find current information about that. Could you try rephrasing your question?" | |
| # Clean and format the content | |
| sentences = re.split(r'[.!?]+', web_content) | |
| meaningful_sentences = [s.strip() for s in sentences if len(s.strip()) > 10] | |
| if not meaningful_sentences: | |
| return "I found some information but couldn't extract a clear answer. Try asking more specifically." | |
| # Use the most relevant sentences | |
| answer_sentences = meaningful_sentences[:3] | |
| answer = ". ".join(answer_sentences) | |
| # Ensure the answer ends with proper punctuation | |
| if not answer.endswith(('.', '!', '?')): | |
| answer += "." | |
| # Add source attribution if available | |
| if sources: | |
| source_text = ", ".join(sources) | |
| answer += f"\n\nSources: {source_text}" | |
| return answer | |
| def _get_factual_response(self, user_input): | |
| """Provide factual responses without web search""" | |
| input_lower = user_input.lower() | |
| # Time and date responses | |
| if any(word in input_lower for word in ['time', 'clock', 'hour']): | |
| current_time = datetime.now().strftime("%I:%M %p") | |
| return f"The current time is {current_time}." | |
| if any(word in input_lower for word in ['date', 'today', 'day month']): | |
| current_date = date.today().strftime("%A, %B %d, %Y") | |
| return f"Today is {current_date}." | |
| if any(word in input_lower for word in ['day of week', 'what day']): | |
| current_day = date.today().strftime("%A") | |
| return f"Today is {current_day}." | |
| # Math calculations | |
| math_result = self._solve_math(user_input) | |
| if math_result: | |
| return math_result | |
| # About the bot | |
| if any(word in input_lower for word in ['your name', 'who are you']): | |
| return "I'm Phoenix AI, a completely free chatbot with web search capabilities!" | |
| if any(word in input_lower for word in ['what can you do', 'capabilities']): | |
| return "I can: Answer questions, search the web for free, do math, tell time/date, and learn from our conversations!" | |
| if any(word in input_lower for word in ['help', 'what can you help with']): | |
| return "I can help you with: current news, weather information, factual questions, calculations, and general conversation. I learn from our chats too!" | |
| return "" | |
| def _solve_math(self, user_input): | |
| """Solve mathematical expressions""" | |
| try: | |
| # Simple arithmetic | |
| if re.search(r'\d+\s*[\+\-\*\/]\s*\d+', user_input): | |
| numbers = re.findall(r'\d+', user_input) | |
| if len(numbers) >= 2: | |
| a, b = int(numbers[0]), int(numbers[1]) | |
| if '+' in user_input: | |
| return f"{a} + {b} = {a + b}" | |
| elif '-' in user_input: | |
| return f"{a} - {b} = {a - b}" | |
| elif '*' in user_input or '×' in user_input: | |
| return f"{a} × {b} = {a * b}" | |
| elif '/' in user_input or '÷' in user_input: | |
| if b == 0: | |
| return "I cannot divide by zero - that's mathematically undefined!" | |
| result = a / b | |
| return f"{a} ÷ {b} = {result:.2f}" | |
| # Square roots | |
| sqrt_match = re.search(r'sqrt\(?(\d+)\)?', user_input) | |
| if sqrt_match: | |
| num = int(sqrt_match.group(1)) | |
| if num < 0: | |
| return "I cannot calculate square roots of negative numbers!" | |
| result = math.sqrt(num) | |
| return f"√{num} = {result:.2f}" | |
| # Powers | |
| power_match = re.search(r'(\d+)\s*\^\s*(\d+)', user_input) | |
| if power_match: | |
| base, exponent = int(power_match.group(1)), int(power_match.group(2)) | |
| result = base ** exponent | |
| return f"{base}^{exponent} = {result}" | |
| except Exception as e: | |
| print(f"Math solving error: {e}") | |
| return "" | |
| def _get_learned_response(self, user_input): | |
| """Get response from learned patterns or generate contextual response""" | |
| # Find similar patterns | |
| similar_patterns = self._find_similar_patterns(user_input) | |
| if similar_patterns: | |
| best_pattern = max(similar_patterns, key=lambda x: x[1]['score']) | |
| if best_pattern[1]['score'] > 0.6: | |
| return best_pattern[1]['response'] | |
| # Generate contextual response | |
| return self._generate_contextual_response(user_input) | |
| def _find_similar_patterns(self, text): | |
| """Find similar learned patterns""" | |
| similar = [] | |
| text_words = set(text.lower().split()) | |
| for pattern, data in self.learned_patterns.items(): | |
| pattern_words = set(pattern.lower().split()) | |
| common_words = text_words.intersection(pattern_words) | |
| if common_words: | |
| similarity = len(common_words) / len(text_words.union(pattern_words)) | |
| if similarity > 0.3: | |
| similar.append((pattern, data)) | |
| return similar | |
| def _generate_contextual_response(self, user_input): | |
| """Generate contextual response when no specific pattern matches""" | |
| context = self._analyze_input(user_input) | |
| if context['has_question']: | |
| responses = [ | |
| "That's an interesting question. Based on what I know, ", | |
| "I appreciate your question. From my understanding, ", | |
| "That's a great question. I've been learning that ", | |
| ] | |
| base_response = random.choice(responses) | |
| if context['topics']: | |
| topic = random.choice(context['topics']) | |
| return base_response + f"{topic} is quite fascinating. What specifically would you like to know?" | |
| else: | |
| return base_response + "this topic has many interesting aspects. Could you tell me more about what you're curious about?" | |
| # Conversational responses for statements | |
| conversational_responses = [ | |
| "I understand. Tell me more about that.", | |
| "That's interesting. What are your thoughts on this?", | |
| "I appreciate you sharing that. How do you feel about it?", | |
| "That's fascinating. I'm learning from our conversation.", | |
| "I see. Could you elaborate on that?", | |
| ] | |
| return random.choice(conversational_responses) | |
| def _analyze_input(self, text): | |
| """Analyze user input for context""" | |
| words = text.split() | |
| return { | |
| 'words': words, | |
| 'topics': self._extract_topics(text), | |
| 'has_question': '?' in text, | |
| 'sentiment': self._analyze_sentiment(text), | |
| 'word_count': len(words) | |
| } | |
| def _extract_topics(self, text): | |
| """Extract topics from text""" | |
| topics = [] | |
| text_lower = text.lower() | |
| topic_categories = { | |
| 'technology': ['tech', 'computer', 'ai', 'software', 'code', 'internet', 'programming'], | |
| 'science': ['science', 'research', 'discover', 'physics', 'biology', 'chemistry'], | |
| 'sports': ['sports', 'game', 'team', 'player', 'score', 'match', 'tournament'], | |
| 'entertainment': ['movie', 'music', 'show', 'celebrity', 'film', 'song'], | |
| 'health': ['health', 'medical', 'fitness', 'diet', 'exercise', 'nutrition'] | |
| } | |
| for topic, keywords in topic_categories.items(): | |
| if any(keyword in text_lower for keyword in keywords): | |
| topics.append(topic) | |
| return topics | |
| def _analyze_sentiment(self, text): | |
| """Basic sentiment analysis""" | |
| positive_words = ['love', 'like', 'good', 'great', 'awesome', 'happy', 'excited', 'amazing'] | |
| negative_words = ['hate', 'bad', 'terrible', 'awful', 'sad', 'angry', 'upset'] | |
| text_lower = text.lower() | |
| positive_count = sum(1 for word in positive_words if word in text_lower) | |
| negative_count = sum(1 for word in negative_words if word in text_lower) | |
| if positive_count > negative_count: | |
| return "positive" | |
| elif negative_count > positive_count: | |
| return "negative" | |
| else: | |
| return "neutral" | |
| def _store_interaction(self, user_input, response, reward, sources): | |
| """Store interaction in memory""" | |
| interaction = { | |
| 'input': user_input, | |
| 'response': response, | |
| 'reward': reward, | |
| 'sources': sources, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.conversation_memory.append(interaction) | |
| self._update_learning(user_input, response, reward) | |
| def _update_learning(self, user_input, response, reward): | |
| """Update learning from interaction""" | |
| # Extract key phrases from input for pattern learning | |
| words = [word for word in user_input.split() if len(word) > 3][:4] | |
| if words: | |
| pattern = ' '.join(words) | |
| if pattern not in self.learned_patterns: | |
| self.learned_patterns[pattern] = { | |
| 'response': response, | |
| 'score': reward, | |
| 'count': 1 | |
| } | |
| else: | |
| old_data = self.learned_patterns[pattern] | |
| new_score = (old_data['score'] * old_data['count'] + reward) / (old_data['count'] + 1) | |
| self.learned_patterns[pattern]['score'] = new_score | |
| self.learned_patterns[pattern]['count'] += 1 | |
| # Store in response memory | |
| response_hash = hashlib.md5(response.encode()).hexdigest()[:8] | |
| if response_hash not in self.response_memory: | |
| self.response_memory[response_hash] = { | |
| 'response': response, | |
| 'total_score': reward, | |
| 'count': 1, | |
| 'avg_score': reward | |
| } | |
| else: | |
| memory = self.response_memory[response_hash] | |
| memory['total_score'] += reward | |
| memory['count'] += 1 | |
| memory['avg_score'] = memory['total_score'] / memory['count'] | |
| # Store reward | |
| self.reward_history.append(reward) | |
| # Save state periodically | |
| if len(self.conversation_memory) % 10 == 0: | |
| self.save_state() | |
| def learn_from_feedback(self, user_input, reward): | |
| """Learn from explicit user feedback""" | |
| if self.conversation_memory: | |
| # Update the most recent interaction | |
| recent_interaction = self.conversation_memory[-1] | |
| recent_interaction['reward'] = reward | |
| self._update_learning(recent_interaction['input'], recent_interaction['response'], reward) | |
| def get_learning_stats(self): | |
| """Get learning statistics""" | |
| recent_rewards = list(self.reward_history)[-10:] or [0.5] | |
| return { | |
| 'patterns': len(self.learned_patterns), | |
| 'memory_size': len(self.conversation_memory), | |
| 'avg_score': float(np.mean(recent_rewards)), | |
| 'recent_rewards': len([r for r in recent_rewards if r > 0.7]) | |
| } | |
| def save_state(self): | |
| """Save learning state to file""" | |
| try: | |
| state = { | |
| 'learned_patterns': self.learned_patterns, | |
| 'response_memory': self.response_memory, | |
| 'conversation_memory': list(self.conversation_memory), | |
| 'reward_history': list(self.reward_history), | |
| 'last_saved': datetime.now().isoformat() | |
| } | |
| with open(self.state_file, 'w') as f: | |
| json.dump(state, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving state: {e}") | |
| def load_state(self): | |
| """Load learning state from file""" | |
| try: | |
| if os.path.exists(self.state_file): | |
| with open(self.state_file, 'r') as f: | |
| state = json.load(f) | |
| self.learned_patterns = state.get('learned_patterns', {}) | |
| self.response_memory = state.get('response_memory', {}) | |
| self.conversation_memory = deque(state.get('conversation_memory', []), maxlen=200) | |
| self.reward_history = deque(state.get('reward_history', []), maxlen=300) | |
| print(f"Loaded state with {len(self.learned_patterns)} patterns") | |
| except Exception as e: | |
| print(f"Error loading state: {e}") |