Refat81's picture
Update pages/facebook_extractor.py
dfdb161 verified
# pages/facebook_extractor.py
import streamlit as st
import requests
from bs4 import BeautifulSoup
import json
import re
from datetime import datetime
from typing import List, Dict
import os
import tempfile
import random
# Import your existing AI components
from langchain_text_splitters import CharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.schema import Document
from langchain_community.llms import HuggingFaceHub
st.set_page_config(
page_title="Facebook Data Extractor",
page_icon="πŸ“˜",
layout="wide"
)
class FacebookRealExtractor:
"""Aggressive Facebook data extractor that tries multiple approaches"""
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
"""Setup requests session with rotating headers"""
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0'
]
def extract_data(self, url: str, data_type: str) -> Dict:
"""Extract real Facebook data with multiple attempts"""
st.info(f"πŸ” Attempting real extraction: {url}")
# Try multiple extraction methods
methods = [
self._try_direct_extraction,
self._try_mobile_extraction,
self._try_text_only_extraction
]
for method in methods:
result = method(url)
if result.get("status") == "success":
st.success("βœ… Real Facebook data extracted!")
result["source"] = "real"
result["data_type"] = data_type
return result
# If all methods fail, provide better error info
st.error("❌ All real extraction methods failed. Facebook has strong anti-bot protection.")
st.info("""
**Why this happens:**
- Facebook blocks automated requests
- Requires JavaScript execution
- Needs cookies and session management
- Heavy anti-bot detection
**For your university project, you can:**
1. Use the demo data to demonstrate functionality
2. Explain these technical limitations in your report
3. Show that LinkedIn works (no restrictions)
4. Discuss platform security differences
""")
# Only use demo data as last resort
return self._get_minimal_demo_data(url, data_type)
def _try_direct_extraction(self, url: str) -> Dict:
"""Try direct extraction with rotating headers"""
try:
headers = {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/avif,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
}
# Try with different timeouts and settings
response = self.session.get(
url,
headers=headers,
timeout=15,
allow_redirects=True
)
if response.status_code == 200:
return self._parse_facebook_response(response, url)
else:
return {"status": "error", "reason": f"HTTP {response.status_code}"}
except Exception as e:
return {"status": "error", "reason": str(e)}
def _try_mobile_extraction(self, url: str) -> Dict:
"""Try mobile version extraction"""
try:
mobile_headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
}
response = self.session.get(url, headers=mobile_headers, timeout=15)
if response.status_code == 200:
return self._parse_facebook_response(response, url)
else:
return {"status": "error", "reason": f"Mobile HTTP {response.status_code}"}
except Exception as e:
return {"status": "error", "reason": str(e)}
def _try_text_only_extraction(self, url: str) -> Dict:
"""Try text-only version or alternative approaches"""
try:
# Try textise.iitty
textise_url = f"https://r.jina.ai/{url}"
response = self.session.get(textise_url, timeout=20)
if response.status_code == 200:
return self._parse_textise_response(response, url)
else:
return {"status": "error", "reason": "Textise failed"}
except Exception as e:
return {"status": "error", "reason": str(e)}
def _parse_facebook_response(self, response, url: str) -> Dict:
"""Parse Facebook response for real data"""
try:
soup = BeautifulSoup(response.text, 'html.parser')
# Extract basic information
title = soup.find('title')
description = soup.find('meta', attrs={'name': 'description'})
og_title = soup.find('meta', property='og:title')
og_description = soup.find('meta', property='og:description')
# Try to find meaningful content
content_elements = soup.find_all(['p', 'div', 'span'], string=True)
meaningful_text = []
for element in content_elements:
text = element.get_text().strip()
if (len(text) > 20 and
not any(word in text.lower() for word in ['cookie', 'login', 'sign up', 'facebook']) and
len(text.split()) > 3):
meaningful_text.append(text)
# Create content blocks from real data
content_blocks = []
for i, text in enumerate(meaningful_text[:10]): # Limit to first 10 meaningful texts
content_blocks.append({
"id": i + 1,
"content": text,
"length": len(text),
"word_count": len(text.split()),
"content_type": self._classify_content(text),
"is_public_content": True
})
if content_blocks:
return {
"page_info": {
"title": og_title['content'] if og_title else (title.text if title else "Facebook Content"),
"description": og_description['content'] if og_description else (description['content'] if description else ""),
"url": url,
"response_code": response.status_code,
"content_length": len(response.text),
"access_note": "Real data extracted successfully"
},
"content_blocks": content_blocks,
"extraction_time": datetime.now().isoformat(),
"status": "success"
}
else:
return {"status": "error", "reason": "No meaningful content found"}
except Exception as e:
return {"status": "error", "reason": f"Parsing error: {str(e)}"}
def _parse_textise_response(self, response, url: str) -> Dict:
"""Parse textise response"""
try:
# Textise provides cleaner text content
lines = response.text.split('\n')
meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 30]
content_blocks = []
for i, line in enumerate(meaningful_lines[:8]):
content_blocks.append({
"id": i + 1,
"content": line,
"length": len(line),
"word_count": len(line.split()),
"content_type": self._classify_content(line),
"is_public_content": True
})
if content_blocks:
return {
"page_info": {
"title": "Facebook Content (via Textise)",
"description": "Content extracted using text-only method",
"url": url,
"response_code": response.status_code,
"content_length": len(response.text),
"access_note": "Real data via text-only extraction"
},
"content_blocks": content_blocks,
"extraction_time": datetime.now().isoformat(),
"status": "success"
}
else:
return {"status": "error", "reason": "No content from textise"}
except Exception as e:
return {"status": "error", "reason": str(e)}
def _classify_content(self, text: str) -> str:
"""Classify content type"""
text_lower = text.lower()
if any(word in text_lower for word in ['welcome', 'join', 'community']):
return "welcome_message"
elif any(word in text_lower for word in ['event', 'meetup', 'schedule']):
return "event_info"
elif any(word in text_lower for word in ['post', 'share', 'comment']):
return "social_content"
elif any(word in text_lower for word in ['question', 'help', 'advice']):
return "question_post"
else:
return "general_content"
def _get_minimal_demo_data(self, url: str, data_type: str) -> Dict:
"""Only use demo data as absolute last resort"""
st.warning("πŸ”„ Using minimal demo data for demonstration purposes")
return {
"page_info": {
"title": "Facebook Content (Demo - Real extraction blocked)",
"description": "This would show real Facebook data if not blocked by platform restrictions",
"url": url,
"response_code": 403,
"content_length": 0,
"access_note": "DEMO: Facebook blocked real data extraction"
},
"content_blocks": [
{
"id": 1,
"content": "This is a demonstration of what real Facebook data would look like. Actual extraction is blocked by Facebook's anti-bot protection.",
"length": 120,
"word_count": 20,
"content_type": "demo_notice",
"is_public_content": True
},
{
"id": 2,
"content": "For your university project, you can discuss these technical limitations and how social media platforms implement security measures.",
"length": 130,
"word_count": 18,
"content_type": "educational_note",
"is_public_content": True
}
],
"url_type": "Facebook Content",
"extraction_time": datetime.now().isoformat(),
"data_type": data_type,
"status": "success",
"source": "demo_fallback"
}
# Rest of the functions remain the same (get_embeddings, get_llm, simple_chat_analysis, etc.)
def get_embeddings():
"""Initialize embeddings with better error handling and cache management"""
try:
# Try multiple embedding models with different cache directories
model_options = [
"sentence-transformers/all-MiniLM-L6-v2",
"sentence-transformers/paraphrase-MiniLM-L3-v2",
"sentence-transformers/all-mpnet-base-v2"
]
for model_name in model_options:
try:
st.info(f"πŸ”„ Trying embedding model: {model_name}")
# Use temporary directory for cache to avoid permission issues
with tempfile.TemporaryDirectory() as temp_cache:
embeddings = HuggingFaceEmbeddings(
model_name=model_name,
cache_folder=temp_cache,
model_kwargs={'device': 'cpu'}
)
# Test the embeddings
test_text = "Hello world"
test_embedding = embeddings.embed_query(test_text)
if test_embedding and len(test_embedding) > 0:
st.success(f"βœ… Loaded embeddings: {model_name.split('/')[-1]}")
return embeddings
except Exception as e:
st.warning(f"⚠️ Failed to load {model_name}: {str(e)}")
continue
# If all models fail, try without cache
st.warning("πŸ”„ Trying fallback embedding method...")
try:
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
st.success("βœ… Loaded fallback embeddings")
return embeddings
except Exception as e:
st.error(f"❌ All embedding models failed: {e}")
return None
except Exception as e:
st.error(f"❌ Embeddings error: {e}")
return None
def get_llm():
"""Initialize HuggingFace LLM"""
try:
api_key = os.getenv('HUGGINGFACEHUB_API_TOKEN')
if not api_key:
st.error("HuggingFace API Key not found")
return None
# Try multiple models
model_options = [
"mistralai/Mistral-7B-Instruct-v0.1",
"google/flan-t5-large",
"microsoft/DialoGPT-large"
]
for model_id in model_options:
try:
st.info(f"πŸ”„ Trying LLM: {model_id}")
llm = HuggingFaceHub(
repo_id=model_id,
huggingfacehub_api_token=api_key,
model_kwargs={
"temperature": 0.7,
"max_length": 512,
"max_new_tokens": 256,
}
)
# Test the model
test_response = llm.invoke("Hello")
if test_response and len(test_response.strip()) > 0:
st.success(f"βœ… Loaded LLM: {model_id.split('/')[-1]}")
return llm
except Exception as e:
st.warning(f"⚠️ Failed to load {model_id}: {str(e)}")
continue
st.error("❌ All LLMs failed to load")
return None
except Exception as e:
st.error(f"❌ LLM error: {e}")
return None
def simple_chat_analysis(user_input: str, extracted_data: Dict) -> str:
"""Simple rule-based chat analysis when embeddings fail"""
try:
if not extracted_data:
return "No data available for analysis."
page_info = extracted_data.get('page_info', {})
content_blocks = extracted_data.get('content_blocks', [])
url_type = extracted_data.get('url_type', 'Facebook Content')
source = extracted_data.get('source', 'unknown')
user_input_lower = user_input.lower()
# Basic analysis based on input
if any(word in user_input_lower for word in ['summary', 'summarize', 'overview']):
response_lines = [
f"**πŸ“Š Summary of {page_info.get('title', 'Facebook Content')}**",
"",
f"**Type:** {url_type}",
f"**Data Source:** {source.upper()}",
f"**Description:** {page_info.get('description', 'No description available')}",
"",
f"This appears to be a {url_type.lower()} with {len(content_blocks)} content blocks.",
"",
"**Key Content Types:**",
f"{', '.join(set(block['content_type'] for block in content_blocks))}",
"",
"The content focuses on community engagement and social interactions."
]
return "\n".join(response_lines)
elif any(word in user_input_lower for word in ['purpose', 'about', 'what is']):
community_posts = len([b for b in content_blocks if 'community' in b['content_type'].lower()])
announcement_posts = len([b for b in content_blocks if 'announcement' in b['content_type'].lower()])
member_posts = len([b for b in content_blocks if 'post' in b['content_type'].lower()])
response_lines = [
"**🎯 Purpose Analysis**",
"",
f"Based on the extracted data, this {url_type.lower()} appears to be focused on:",
"",
f"- **Community Building:** {community_posts} community-related posts",
f"- **Information Sharing:** {announcement_posts} announcements",
f"- **Member Engagement:** {member_posts} member posts",
"",
f"**Overall Purpose:** {page_info.get('description', 'Community engagement and content sharing')}"
]
return "\n".join(response_lines)
elif any(word in user_input_lower for word in ['activity', 'engagement', 'active']):
active_blocks = len([b for b in content_blocks if any(word in b['content_type'].lower() for word in ['post', 'question', 'event'])])
info_blocks = len(content_blocks) - active_blocks
response_lines = [
"**πŸ“ˆ Activity Analysis**",
"",
"**Content Activity Level:**",
f"- Total Content Blocks: {len(content_blocks)}",
f"- Active Engagement Posts: {active_blocks}",
f"- Informational Posts: {info_blocks}",
"",
f"The {url_type.lower()} shows a good mix of member engagement and informational content, suggesting an active community."
]
return "\n".join(response_lines)
else:
response_lines = [
"**πŸ€– Analysis Response**",
"",
f"I've analyzed the {url_type.lower()} data for you.",
"",
f"**Your question:** \"{user_input}\"",
f"**Content Source:** {source.upper()} data",
f"**Content Type:** {url_type}",
"",
f"This {url_type.lower()} contains {len(content_blocks)} pieces of content focusing on community engagement and information sharing.",
"",
"**Try asking:**",
"- \"What is the main purpose of this group/page?\"",
"- \"Summarize the content and activities\"",
"- \"What kind of engagement does this content show?\""
]
return "\n".join(response_lines)
except Exception as e:
return f"Analysis error: {str(e)}"
def process_facebook_data(extracted_data):
"""Process extracted data for AI analysis with fallbacks"""
if not extracted_data or extracted_data.get("status") != "success":
return None, []
page_info = extracted_data['page_info']
content_blocks = extracted_data['content_blocks']
url_type = extracted_data.get('url_type', 'Facebook Content')
source = extracted_data.get('source', 'unknown')
all_text = f"FACEBOOK DATA ANALYSIS\n{'='*50}\n\n"
all_text += f"πŸ“„ PAGE INFORMATION:\n"
all_text += f"Title: {page_info['title']}\n"
all_text += f"URL Type: {url_type}\n"
all_text += f"Data Source: {source.upper()}\n"
all_text += f"Access: {page_info.get('access_note', 'Public content')}\n"
if page_info.get('member_count'):
all_text += f"Members: {page_info['member_count']}\n"
elif page_info.get('follower_count'):
all_text += f"Followers: {page_info['follower_count']}\n"
all_text += f"Extracted: {extracted_data['extraction_time']}\n\n"
all_text += f"πŸ“Š CONTENT ANALYSIS:\n"
all_text += f"Content Blocks: {len(content_blocks)}\n"
all_text += f"Public Content: {sum(1 for b in content_blocks if b['is_public_content'])} blocks\n\n"
for i, block in enumerate(content_blocks):
all_text += f"--- BLOCK {i+1} ---\n"
all_text += f"Type: {block['content_type']}\n"
all_text += f"Words: {block['word_count']} | Public: {block['is_public_content']}\n"
all_text += f"Content: {block['content']}\n\n"
all_text += "="*50
# Split into chunks
splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
chunks = splitter.split_text(all_text)
documents = [Document(page_content=chunk) for chunk in chunks]
return "simple", documents
def create_chatbot(vectorstore):
"""Create conversational chatbot"""
try:
llm = get_llm()
if llm is None:
return "simple" # Return simple mode if LLM fails
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(search_kwargs={"k": 3}),
memory=memory,
return_source_documents=True,
output_key="answer"
)
return chain
except Exception as e:
st.error(f"Chatbot creation failed: {str(e)}")
return "simple" # Fallback to simple mode
def main():
st.title("πŸ“˜ Facebook Data Extractor - REAL DATA ATTEMPT")
st.markdown("**Aggressive real data extraction - No automatic demo fallback**")
if st.button("← Back to Main Dashboard"):
st.switch_page("app.py")
# Initialize session state
if "extractor" not in st.session_state:
st.session_state.extractor = FacebookRealExtractor() # Changed to real extractor
if "facebook_data" not in st.session_state:
st.session_state.facebook_data = None
if "vectorstore" not in st.session_state:
st.session_state.vectorstore = None
if "chatbot" not in st.session_state:
st.session_state.chatbot = None
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "processing_mode" not in st.session_state:
st.session_state.processing_mode = "ai"
if "last_user_input" not in st.session_state:
st.session_state.last_user_input = ""
# Sidebar
with st.sidebar:
st.header("βš™οΈ Facebook Configuration")
data_type = st.selectbox(
"Content Type",
["group", "page", "event", "post", "general"],
help="Select the type of Facebook content"
)
facebook_url = st.text_input(
"Facebook URL",
placeholder="https://www.facebook.com/groups/gamersofbangladesh2",
help="Enter any Facebook URL for REAL data extraction"
)
# Quick test URLs
st.markdown("### πŸš€ Test URLs")
test_urls = {
"Gaming Group": "https://www.facebook.com/groups/gamersofbangladesh2",
"Tech Community": "https://www.facebook.com/groups/programmingcommunity",
"Business Page": "https://www.facebook.com/Meta/",
}
for name, url in test_urls.items():
if st.button(f"πŸ”— {name}", key=f"fb_{name}"):
st.session_state.current_fb_url = url
st.rerun()
if st.button("πŸš€ EXTRACT REAL DATA", type="primary"):
url_to_use = facebook_url or getattr(st.session_state, 'current_fb_url', '')
if not url_to_use:
st.error("❌ Please enter a Facebook URL")
elif 'facebook.com' not in url_to_use:
st.error("❌ Please enter a valid Facebook URL")
else:
with st.spinner("πŸ”„ Aggressively extracting REAL Facebook data..."):
extracted_data = st.session_state.extractor.extract_data(url_to_use, data_type)
if extracted_data.get("status") == "success":
st.session_state.facebook_data = extracted_data
st.session_state.chatbot = "simple"
st.session_state.chat_history = []
st.session_state.last_user_input = ""
source = extracted_data.get('source', 'unknown')
if source == 'real':
st.success("πŸŽ‰ SUCCESS: Real Facebook data extracted!")
st.balloons()
else:
st.warning("⚠️ Using fallback data - Facebook blocked real extraction")
else:
error_msg = extracted_data.get("error", "Unknown error")
st.error(f"❌ Extraction failed: {error_msg}")
if st.session_state.facebook_data:
st.markdown("---")
if st.button("πŸ—‘οΈ Clear Data", type="secondary"):
st.session_state.facebook_data = None
st.session_state.vectorstore = None
st.session_state.chatbot = None
st.session_state.chat_history = []
st.session_state.last_user_input = ""
st.rerun()
# Main content
st.header("πŸ“Š Extraction Results")
if st.session_state.facebook_data:
data = st.session_state.facebook_data
page_info = data['page_info']
content_blocks = data['content_blocks']
source = data.get('source', 'unknown')
if source == 'real':
st.success("βœ… **REAL DATA** - Successfully extracted from Facebook!")
else:
st.warning("πŸ“ **FALLBACK DATA** - Facebook blocked real extraction")
# Metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Content Blocks", len(content_blocks))
with col2:
st.metric("Data Source", "REAL" if source == 'real' else "FALLBACK")
with col3:
st.metric("Status", "Success")
# Page info
st.subheader("🏷️ Page Information")
st.write(f"**Title:** {page_info['title']}")
st.write(f"**Description:** {page_info.get('description', 'No description')}")
st.write(f"**Access Note:** {page_info.get('access_note', 'Public content')}")
st.write(f"**Response Code:** {page_info.get('response_code', 'N/A')}")
# Content samples
st.subheader("πŸ“ Content Analysis")
for i, block in enumerate(content_blocks):
with st.expander(f"Content {i+1} - {block['content_type']} ({block['word_count']} words)"):
st.write(block['content'])
st.caption(f"Public: {block['is_public_content']}")
else:
st.info("""
## πŸ“˜ Facebook Real Data Extractor
**Aggressive Approach - No Automatic Demo**
**This version:**
- Tries multiple extraction methods
- Uses rotating user agents
- Attempts mobile versions
- Tries text-only alternatives
- Only uses demo data as LAST RESORT
**Technical Challenges:**
- Facebook has strong anti-bot protection
- Requires JavaScript execution
- Needs session management
- Heavy rate limiting
**For your project:**
- Shows real technical limitations
- Demonstrates platform security
- Provides educational value
""")
# Chat section
st.markdown("---")
st.header("πŸ’¬ Analysis Chat")
if st.session_state.chatbot and st.session_state.facebook_data:
# Display chat history
for chat in st.session_state.chat_history:
if chat["role"] == "user":
with st.chat_message("user"):
st.write(chat['content'])
elif chat["role"] == "assistant":
with st.chat_message("assistant"):
st.write(chat['content'])
# Suggested questions when no history
if not st.session_state.chat_history:
st.subheader("πŸ’‘ Try asking:")
suggestions = [
"What is this Facebook content about?",
"Summarize the extracted data",
"What kind of information was found?",
"Analyze the content structure"
]
cols = st.columns(len(suggestions))
for i, suggestion in enumerate(suggestions):
with cols[i]:
if st.button(suggestion, key=f"fb_suggest_{suggestion}", use_container_width=True):
st.info(f"Type: '{suggestion}' in the chat below")
elif st.session_state.facebook_data:
st.info("πŸ’¬ Start chatting about the Facebook data")
else:
st.info("πŸ” Extract Facebook data to enable analysis")
# CHAT INPUT
if st.session_state.chatbot and st.session_state.facebook_data:
user_input = st.chat_input("Ask about the Facebook data...")
if user_input and user_input != st.session_state.last_user_input:
st.session_state.last_user_input = user_input
st.session_state.chat_history.append({"role": "user", "content": user_input})
with st.spinner("πŸ€” Analyzing..."):
try:
response = simple_chat_analysis(user_input, st.session_state.facebook_data)
st.session_state.chat_history.append({"role": "assistant", "content": response})
st.rerun()
except Exception as e:
error_msg = f"Analysis Error: {str(e)}"
st.session_state.chat_history.append({"role": "assistant", "content": error_msg})
st.rerun()
if __name__ == "__main__":
main()