Ailernbot / free_web_bot.py
dicksinyass's picture
Rename Free_web_bot.py to free_web_bot.py
33288a0 verified
import json
import os
import random
import re
import numpy as np
from datetime import datetime, date
import math
import hashlib
import requests
from collections import deque
from bs4 import BeautifulSoup
import urllib.parse
import feedparser
class FreeWebBot:
def __init__(self, state_file="/tmp/free_bot_state.json"):
self.state_file = state_file
self.conversation_memory = deque(maxlen=200)
self.learned_patterns = {}
self.response_memory = {}
self.reward_history = deque(maxlen=300)
# Learning parameters
self.learning_rate = 0.3
self.exploration_rate = 0.1
# Web settings
self.search_timeout = 10
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# Free news sources (RSS feeds)
self.news_feeds = {
"general": [
"https://feeds.bbci.co.uk/news/rss.xml",
"https://rss.cnn.com/rss/edition.rss",
"https://feeds.reuters.com/reuters/topNews",
],
"technology": [
"https://feeds.arstechnica.com/arstechnica/index",
"https://techcrunch.com/feed/",
],
"sports": [
"https://feeds.espn.com/espn/rss/news",
]
}
# Load existing state
self.load_state()
print(f"Free web bot initialized with {len(self.learned_patterns)} learned patterns")
def chat(self, user_input, use_web_search=True):
"""Main chat method - returns (response, search_used, sources)"""
user_input = user_input.strip()
if not user_input:
return "Please enter a message.", False, []
# First, try factual responses
factual_response = self._get_factual_response(user_input)
if factual_response:
return factual_response, False, []
# Try free web search for current information
if use_web_search and self._should_search_web(user_input):
web_content, sources = self._free_web_search(user_input)
if web_content and web_content.strip():
response = self._create_web_answer(user_input, web_content, sources)
self._store_interaction(user_input, response, 0.8, sources)
return response, True, sources
# Fallback to learned responses
response = self._get_learned_response(user_input)
self._store_interaction(user_input, response, 0.5, [])
return response, False, []
def _should_search_web(self, user_input):
"""Determine if we should search the web for this query"""
input_lower = user_input.lower()
search_triggers = [
'news', 'current', 'latest', 'today', 'recent', 'update',
'weather', 'forecast', 'temperature',
'sports', 'score', 'game', 'match',
'stock', 'crypto', 'bitcoin', 'price',
'how to', 'tutorial', 'guide', 'explain',
'what is', 'who is', 'where is', 'when was',
'breaking', 'headlines'
]
return any(trigger in input_lower for trigger in search_triggers)
def _free_web_search(self, query):
"""Perform free web search using multiple methods"""
sources = []
all_content = []
# Method 1: RSS Feeds for news/current events
if any(topic in query.lower() for topic in ['news', 'current', 'latest', 'today', 'breaking']):
feed_content = self._search_rss_feeds(query)
all_content.extend(feed_content)
if feed_content:
sources.append("News Feeds")
# Method 2: Wikipedia for factual information
if any(word in query.lower() for word in ['what is', 'who is', 'explain', 'definition']):
wiki_content = self._search_wikipedia(query)
if wiki_content:
all_content.append(wiki_content)
sources.append("Wikipedia")
# Method 3: DuckDuckGo for general search
ddg_content = self._search_duckduckgo(query)
if ddg_content:
all_content.append(ddg_content)
sources.append("Web Search")
# Method 4: Weather information
if any(word in query.lower() for word in ['weather', 'temperature', 'forecast']):
weather_content = self._get_weather_info(query)
if weather_content:
all_content.append(weather_content)
sources.append("Weather Service")
# Combine all content
combined_content = " ".join(all_content)
return combined_content, sources
def _search_rss_feeds(self, query):
"""Search RSS feeds for current information"""
content = []
query_words = query.lower().split()
# Determine feed category based on query
category = "general"
if any(word in query.lower() for word in ['tech', 'technology', 'ai', 'computer', 'software']):
category = "technology"
elif any(word in query.lower() for word in ['sports', 'game', 'score', 'match', 'team']):
category = "sports"
try:
for feed_url in self.news_feeds.get(category, self.news_feeds["general"]):
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:5]: # Top 5 entries
title = entry.get('title', '')
summary = entry.get('summary', '')
# Check if entry matches query
entry_text = f"{title} {summary}".lower()
if any(word in entry_text for word in query_words) or len(query_words) < 2:
content.append(f"{title}: {summary}")
if len(content) >= 3: # Limit to 3 results
break
except Exception as e:
print(f"Error parsing feed {feed_url}: {e}")
continue
if content:
break
except Exception as e:
print(f"RSS feed error: {e}")
return content
def _search_wikipedia(self, query):
"""Search Wikipedia for factual information"""
try:
# Clean query for Wikipedia
clean_query = re.sub(r'(what is|who is|explain|definition of)', '', query, flags=re.IGNORECASE).strip()
clean_query = clean_query.replace('?', '').strip()
if not clean_query:
return ""
# Wikipedia API (completely free)
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{urllib.parse.quote(clean_query)}"
response = requests.get(
url,
headers={'User-Agent': self.user_agent},
timeout=self.search_timeout
)
if response.status_code == 200:
data = response.json()
extract = data.get('extract', '')
if extract:
return f"According to Wikipedia: {extract}"
except Exception as e:
print(f"Wikipedia search error: {e}")
return ""
def _search_duckduckgo(self, query):
"""Search DuckDuckGo for general information"""
try:
url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}"
headers = {
'User-Agent': self.user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
}
response = requests.get(url, headers=headers, timeout=self.search_timeout)
soup = BeautifulSoup(response.text, 'html.parser')
results = []
for result in soup.find_all('div', class_='result')[:3]:
title_elem = result.find('a', class_='result__a')
snippet_elem = result.find('a', class_='result__snippet')
if title_elem and snippet_elem:
title = title_elem.get_text().strip()
snippet = snippet_elem.get_text().strip()
results.append(f"{title}: {snippet}")
return " ".join(results) if results else ""
except Exception as e:
print(f"DuckDuckGo search error: {e}")
return ""
def _get_weather_info(self, query):
"""Get weather information from free sources"""
try:
# Extract location from query
location = self._extract_location(query)
if not location:
location = "New York" # Default location
# Use free weather API
url = f"http://wttr.in/{urllib.parse.quote(location)}?format=%C+%t+%w+%h"
response = requests.get(url, timeout=self.search_timeout)
if response.status_code == 200:
weather_data = response.text.strip()
return f"Weather in {location}: {weather_data}"
except Exception as e:
print(f"Weather error: {e}")
return ""
def _extract_location(self, text):
"""Extract location from text"""
locations = {
'new york', 'london', 'paris', 'tokyo', 'berlin', 'sydney', 'toronto',
'mumbai', 'beijing', 'moscow', 'dubai', 'rome', 'madrid', 'amsterdam',
'chicago', 'los angeles', 'san francisco', 'seattle', 'boston', 'miami'
}
text_lower = text.lower()
for location in locations:
if location in text_lower:
return location
return ""
def _create_web_answer(self, user_input, web_content, sources):
"""Create answer using web content"""
if not web_content or not web_content.strip():
return "I searched but couldn't find current information about that. Could you try rephrasing your question?"
# Clean and format the content
sentences = re.split(r'[.!?]+', web_content)
meaningful_sentences = [s.strip() for s in sentences if len(s.strip()) > 10]
if not meaningful_sentences:
return "I found some information but couldn't extract a clear answer. Try asking more specifically."
# Use the most relevant sentences
answer_sentences = meaningful_sentences[:3]
answer = ". ".join(answer_sentences)
# Ensure the answer ends with proper punctuation
if not answer.endswith(('.', '!', '?')):
answer += "."
# Add source attribution if available
if sources:
source_text = ", ".join(sources)
answer += f"\n\nSources: {source_text}"
return answer
def _get_factual_response(self, user_input):
"""Provide factual responses without web search"""
input_lower = user_input.lower()
# Time and date responses
if any(word in input_lower for word in ['time', 'clock', 'hour']):
current_time = datetime.now().strftime("%I:%M %p")
return f"The current time is {current_time}."
if any(word in input_lower for word in ['date', 'today', 'day month']):
current_date = date.today().strftime("%A, %B %d, %Y")
return f"Today is {current_date}."
if any(word in input_lower for word in ['day of week', 'what day']):
current_day = date.today().strftime("%A")
return f"Today is {current_day}."
# Math calculations
math_result = self._solve_math(user_input)
if math_result:
return math_result
# About the bot
if any(word in input_lower for word in ['your name', 'who are you']):
return "I'm Phoenix AI, a completely free chatbot with web search capabilities!"
if any(word in input_lower for word in ['what can you do', 'capabilities']):
return "I can: Answer questions, search the web for free, do math, tell time/date, and learn from our conversations!"
if any(word in input_lower for word in ['help', 'what can you help with']):
return "I can help you with: current news, weather information, factual questions, calculations, and general conversation. I learn from our chats too!"
return ""
def _solve_math(self, user_input):
"""Solve mathematical expressions"""
try:
# Simple arithmetic
if re.search(r'\d+\s*[\+\-\*\/]\s*\d+', user_input):
numbers = re.findall(r'\d+', user_input)
if len(numbers) >= 2:
a, b = int(numbers[0]), int(numbers[1])
if '+' in user_input:
return f"{a} + {b} = {a + b}"
elif '-' in user_input:
return f"{a} - {b} = {a - b}"
elif '*' in user_input or '×' in user_input:
return f"{a} × {b} = {a * b}"
elif '/' in user_input or '÷' in user_input:
if b == 0:
return "I cannot divide by zero - that's mathematically undefined!"
result = a / b
return f"{a} ÷ {b} = {result:.2f}"
# Square roots
sqrt_match = re.search(r'sqrt\(?(\d+)\)?', user_input)
if sqrt_match:
num = int(sqrt_match.group(1))
if num < 0:
return "I cannot calculate square roots of negative numbers!"
result = math.sqrt(num)
return f"√{num} = {result:.2f}"
# Powers
power_match = re.search(r'(\d+)\s*\^\s*(\d+)', user_input)
if power_match:
base, exponent = int(power_match.group(1)), int(power_match.group(2))
result = base ** exponent
return f"{base}^{exponent} = {result}"
except Exception as e:
print(f"Math solving error: {e}")
return ""
def _get_learned_response(self, user_input):
"""Get response from learned patterns or generate contextual response"""
# Find similar patterns
similar_patterns = self._find_similar_patterns(user_input)
if similar_patterns:
best_pattern = max(similar_patterns, key=lambda x: x[1]['score'])
if best_pattern[1]['score'] > 0.6:
return best_pattern[1]['response']
# Generate contextual response
return self._generate_contextual_response(user_input)
def _find_similar_patterns(self, text):
"""Find similar learned patterns"""
similar = []
text_words = set(text.lower().split())
for pattern, data in self.learned_patterns.items():
pattern_words = set(pattern.lower().split())
common_words = text_words.intersection(pattern_words)
if common_words:
similarity = len(common_words) / len(text_words.union(pattern_words))
if similarity > 0.3:
similar.append((pattern, data))
return similar
def _generate_contextual_response(self, user_input):
"""Generate contextual response when no specific pattern matches"""
context = self._analyze_input(user_input)
if context['has_question']:
responses = [
"That's an interesting question. Based on what I know, ",
"I appreciate your question. From my understanding, ",
"That's a great question. I've been learning that ",
]
base_response = random.choice(responses)
if context['topics']:
topic = random.choice(context['topics'])
return base_response + f"{topic} is quite fascinating. What specifically would you like to know?"
else:
return base_response + "this topic has many interesting aspects. Could you tell me more about what you're curious about?"
# Conversational responses for statements
conversational_responses = [
"I understand. Tell me more about that.",
"That's interesting. What are your thoughts on this?",
"I appreciate you sharing that. How do you feel about it?",
"That's fascinating. I'm learning from our conversation.",
"I see. Could you elaborate on that?",
]
return random.choice(conversational_responses)
def _analyze_input(self, text):
"""Analyze user input for context"""
words = text.split()
return {
'words': words,
'topics': self._extract_topics(text),
'has_question': '?' in text,
'sentiment': self._analyze_sentiment(text),
'word_count': len(words)
}
def _extract_topics(self, text):
"""Extract topics from text"""
topics = []
text_lower = text.lower()
topic_categories = {
'technology': ['tech', 'computer', 'ai', 'software', 'code', 'internet', 'programming'],
'science': ['science', 'research', 'discover', 'physics', 'biology', 'chemistry'],
'sports': ['sports', 'game', 'team', 'player', 'score', 'match', 'tournament'],
'entertainment': ['movie', 'music', 'show', 'celebrity', 'film', 'song'],
'health': ['health', 'medical', 'fitness', 'diet', 'exercise', 'nutrition']
}
for topic, keywords in topic_categories.items():
if any(keyword in text_lower for keyword in keywords):
topics.append(topic)
return topics
def _analyze_sentiment(self, text):
"""Basic sentiment analysis"""
positive_words = ['love', 'like', 'good', 'great', 'awesome', 'happy', 'excited', 'amazing']
negative_words = ['hate', 'bad', 'terrible', 'awful', 'sad', 'angry', 'upset']
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
def _store_interaction(self, user_input, response, reward, sources):
"""Store interaction in memory"""
interaction = {
'input': user_input,
'response': response,
'reward': reward,
'sources': sources,
'timestamp': datetime.now().isoformat()
}
self.conversation_memory.append(interaction)
self._update_learning(user_input, response, reward)
def _update_learning(self, user_input, response, reward):
"""Update learning from interaction"""
# Extract key phrases from input for pattern learning
words = [word for word in user_input.split() if len(word) > 3][:4]
if words:
pattern = ' '.join(words)
if pattern not in self.learned_patterns:
self.learned_patterns[pattern] = {
'response': response,
'score': reward,
'count': 1
}
else:
old_data = self.learned_patterns[pattern]
new_score = (old_data['score'] * old_data['count'] + reward) / (old_data['count'] + 1)
self.learned_patterns[pattern]['score'] = new_score
self.learned_patterns[pattern]['count'] += 1
# Store in response memory
response_hash = hashlib.md5(response.encode()).hexdigest()[:8]
if response_hash not in self.response_memory:
self.response_memory[response_hash] = {
'response': response,
'total_score': reward,
'count': 1,
'avg_score': reward
}
else:
memory = self.response_memory[response_hash]
memory['total_score'] += reward
memory['count'] += 1
memory['avg_score'] = memory['total_score'] / memory['count']
# Store reward
self.reward_history.append(reward)
# Save state periodically
if len(self.conversation_memory) % 10 == 0:
self.save_state()
def learn_from_feedback(self, user_input, reward):
"""Learn from explicit user feedback"""
if self.conversation_memory:
# Update the most recent interaction
recent_interaction = self.conversation_memory[-1]
recent_interaction['reward'] = reward
self._update_learning(recent_interaction['input'], recent_interaction['response'], reward)
def get_learning_stats(self):
"""Get learning statistics"""
recent_rewards = list(self.reward_history)[-10:] or [0.5]
return {
'patterns': len(self.learned_patterns),
'memory_size': len(self.conversation_memory),
'avg_score': float(np.mean(recent_rewards)),
'recent_rewards': len([r for r in recent_rewards if r > 0.7])
}
def save_state(self):
"""Save learning state to file"""
try:
state = {
'learned_patterns': self.learned_patterns,
'response_memory': self.response_memory,
'conversation_memory': list(self.conversation_memory),
'reward_history': list(self.reward_history),
'last_saved': datetime.now().isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(state, f, indent=2)
except Exception as e:
print(f"Error saving state: {e}")
def load_state(self):
"""Load learning state from file"""
try:
if os.path.exists(self.state_file):
with open(self.state_file, 'r') as f:
state = json.load(f)
self.learned_patterns = state.get('learned_patterns', {})
self.response_memory = state.get('response_memory', {})
self.conversation_memory = deque(state.get('conversation_memory', []), maxlen=200)
self.reward_history = deque(state.get('reward_history', []), maxlen=300)
print(f"Loaded state with {len(self.learned_patterns)} patterns")
except Exception as e:
print(f"Error loading state: {e}")