# pages/facebook_extractor.py import streamlit as st import requests from bs4 import BeautifulSoup import json import re from datetime import datetime from typing import List, Dict import os import tempfile import random # Import your existing AI components from langchain_text_splitters import CharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain.schema import Document from langchain_community.llms import HuggingFaceHub st.set_page_config( page_title="Facebook Data Extractor", page_icon="📘", layout="wide" ) class FacebookRealExtractor: """Aggressive Facebook data extractor that tries multiple approaches""" def __init__(self): self.session = requests.Session() self.setup_session() def setup_session(self): """Setup requests session with rotating headers""" self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0' ] def extract_data(self, url: str, data_type: str) -> Dict: """Extract real Facebook data with multiple attempts""" st.info(f"🔍 Attempting real extraction: {url}") # Try multiple extraction methods methods = [ self._try_direct_extraction, self._try_mobile_extraction, self._try_text_only_extraction ] for method in methods: result = method(url) if result.get("status") == "success": st.success("✅ Real Facebook data extracted!") result["source"] = "real" result["data_type"] = data_type return result # If all methods fail, provide better error info st.error("❌ All real extraction methods failed. Facebook has strong anti-bot protection.") st.info(""" **Why this happens:** - Facebook blocks automated requests - Requires JavaScript execution - Needs cookies and session management - Heavy anti-bot detection **For your university project, you can:** 1. Use the demo data to demonstrate functionality 2. Explain these technical limitations in your report 3. Show that LinkedIn works (no restrictions) 4. Discuss platform security differences """) # Only use demo data as last resort return self._get_minimal_demo_data(url, data_type) def _try_direct_extraction(self, url: str) -> Dict: """Try direct extraction with rotating headers""" try: headers = { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/avif,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } # Try with different timeouts and settings response = self.session.get( url, headers=headers, timeout=15, allow_redirects=True ) if response.status_code == 200: return self._parse_facebook_response(response, url) else: return {"status": "error", "reason": f"HTTP {response.status_code}"} except Exception as e: return {"status": "error", "reason": str(e)} def _try_mobile_extraction(self, url: str) -> Dict: """Try mobile version extraction""" try: mobile_headers = { 'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', } response = self.session.get(url, headers=mobile_headers, timeout=15) if response.status_code == 200: return self._parse_facebook_response(response, url) else: return {"status": "error", "reason": f"Mobile HTTP {response.status_code}"} except Exception as e: return {"status": "error", "reason": str(e)} def _try_text_only_extraction(self, url: str) -> Dict: """Try text-only version or alternative approaches""" try: # Try textise.iitty textise_url = f"https://r.jina.ai/{url}" response = self.session.get(textise_url, timeout=20) if response.status_code == 200: return self._parse_textise_response(response, url) else: return {"status": "error", "reason": "Textise failed"} except Exception as e: return {"status": "error", "reason": str(e)} def _parse_facebook_response(self, response, url: str) -> Dict: """Parse Facebook response for real data""" try: soup = BeautifulSoup(response.text, 'html.parser') # Extract basic information title = soup.find('title') description = soup.find('meta', attrs={'name': 'description'}) og_title = soup.find('meta', property='og:title') og_description = soup.find('meta', property='og:description') # Try to find meaningful content content_elements = soup.find_all(['p', 'div', 'span'], string=True) meaningful_text = [] for element in content_elements: text = element.get_text().strip() if (len(text) > 20 and not any(word in text.lower() for word in ['cookie', 'login', 'sign up', 'facebook']) and len(text.split()) > 3): meaningful_text.append(text) # Create content blocks from real data content_blocks = [] for i, text in enumerate(meaningful_text[:10]): # Limit to first 10 meaningful texts content_blocks.append({ "id": i + 1, "content": text, "length": len(text), "word_count": len(text.split()), "content_type": self._classify_content(text), "is_public_content": True }) if content_blocks: return { "page_info": { "title": og_title['content'] if og_title else (title.text if title else "Facebook Content"), "description": og_description['content'] if og_description else (description['content'] if description else ""), "url": url, "response_code": response.status_code, "content_length": len(response.text), "access_note": "Real data extracted successfully" }, "content_blocks": content_blocks, "extraction_time": datetime.now().isoformat(), "status": "success" } else: return {"status": "error", "reason": "No meaningful content found"} except Exception as e: return {"status": "error", "reason": f"Parsing error: {str(e)}"} def _parse_textise_response(self, response, url: str) -> Dict: """Parse textise response""" try: # Textise provides cleaner text content lines = response.text.split('\n') meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 30] content_blocks = [] for i, line in enumerate(meaningful_lines[:8]): content_blocks.append({ "id": i + 1, "content": line, "length": len(line), "word_count": len(line.split()), "content_type": self._classify_content(line), "is_public_content": True }) if content_blocks: return { "page_info": { "title": "Facebook Content (via Textise)", "description": "Content extracted using text-only method", "url": url, "response_code": response.status_code, "content_length": len(response.text), "access_note": "Real data via text-only extraction" }, "content_blocks": content_blocks, "extraction_time": datetime.now().isoformat(), "status": "success" } else: return {"status": "error", "reason": "No content from textise"} except Exception as e: return {"status": "error", "reason": str(e)} def _classify_content(self, text: str) -> str: """Classify content type""" text_lower = text.lower() if any(word in text_lower for word in ['welcome', 'join', 'community']): return "welcome_message" elif any(word in text_lower for word in ['event', 'meetup', 'schedule']): return "event_info" elif any(word in text_lower for word in ['post', 'share', 'comment']): return "social_content" elif any(word in text_lower for word in ['question', 'help', 'advice']): return "question_post" else: return "general_content" def _get_minimal_demo_data(self, url: str, data_type: str) -> Dict: """Only use demo data as absolute last resort""" st.warning("🔄 Using minimal demo data for demonstration purposes") return { "page_info": { "title": "Facebook Content (Demo - Real extraction blocked)", "description": "This would show real Facebook data if not blocked by platform restrictions", "url": url, "response_code": 403, "content_length": 0, "access_note": "DEMO: Facebook blocked real data extraction" }, "content_blocks": [ { "id": 1, "content": "This is a demonstration of what real Facebook data would look like. Actual extraction is blocked by Facebook's anti-bot protection.", "length": 120, "word_count": 20, "content_type": "demo_notice", "is_public_content": True }, { "id": 2, "content": "For your university project, you can discuss these technical limitations and how social media platforms implement security measures.", "length": 130, "word_count": 18, "content_type": "educational_note", "is_public_content": True } ], "url_type": "Facebook Content", "extraction_time": datetime.now().isoformat(), "data_type": data_type, "status": "success", "source": "demo_fallback" } # Rest of the functions remain the same (get_embeddings, get_llm, simple_chat_analysis, etc.) def get_embeddings(): """Initialize embeddings with better error handling and cache management""" try: # Try multiple embedding models with different cache directories model_options = [ "sentence-transformers/all-MiniLM-L6-v2", "sentence-transformers/paraphrase-MiniLM-L3-v2", "sentence-transformers/all-mpnet-base-v2" ] for model_name in model_options: try: st.info(f"🔄 Trying embedding model: {model_name}") # Use temporary directory for cache to avoid permission issues with tempfile.TemporaryDirectory() as temp_cache: embeddings = HuggingFaceEmbeddings( model_name=model_name, cache_folder=temp_cache, model_kwargs={'device': 'cpu'} ) # Test the embeddings test_text = "Hello world" test_embedding = embeddings.embed_query(test_text) if test_embedding and len(test_embedding) > 0: st.success(f"✅ Loaded embeddings: {model_name.split('/')[-1]}") return embeddings except Exception as e: st.warning(f"⚠️ Failed to load {model_name}: {str(e)}") continue # If all models fail, try without cache st.warning("🔄 Trying fallback embedding method...") try: embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) st.success("✅ Loaded fallback embeddings") return embeddings except Exception as e: st.error(f"❌ All embedding models failed: {e}") return None except Exception as e: st.error(f"❌ Embeddings error: {e}") return None def get_llm(): """Initialize HuggingFace LLM""" try: api_key = os.getenv('HUGGINGFACEHUB_API_TOKEN') if not api_key: st.error("HuggingFace API Key not found") return None # Try multiple models model_options = [ "mistralai/Mistral-7B-Instruct-v0.1", "google/flan-t5-large", "microsoft/DialoGPT-large" ] for model_id in model_options: try: st.info(f"🔄 Trying LLM: {model_id}") llm = HuggingFaceHub( repo_id=model_id, huggingfacehub_api_token=api_key, model_kwargs={ "temperature": 0.7, "max_length": 512, "max_new_tokens": 256, } ) # Test the model test_response = llm.invoke("Hello") if test_response and len(test_response.strip()) > 0: st.success(f"✅ Loaded LLM: {model_id.split('/')[-1]}") return llm except Exception as e: st.warning(f"⚠️ Failed to load {model_id}: {str(e)}") continue st.error("❌ All LLMs failed to load") return None except Exception as e: st.error(f"❌ LLM error: {e}") return None def simple_chat_analysis(user_input: str, extracted_data: Dict) -> str: """Simple rule-based chat analysis when embeddings fail""" try: if not extracted_data: return "No data available for analysis." page_info = extracted_data.get('page_info', {}) content_blocks = extracted_data.get('content_blocks', []) url_type = extracted_data.get('url_type', 'Facebook Content') source = extracted_data.get('source', 'unknown') user_input_lower = user_input.lower() # Basic analysis based on input if any(word in user_input_lower for word in ['summary', 'summarize', 'overview']): response_lines = [ f"**📊 Summary of {page_info.get('title', 'Facebook Content')}**", "", f"**Type:** {url_type}", f"**Data Source:** {source.upper()}", f"**Description:** {page_info.get('description', 'No description available')}", "", f"This appears to be a {url_type.lower()} with {len(content_blocks)} content blocks.", "", "**Key Content Types:**", f"{', '.join(set(block['content_type'] for block in content_blocks))}", "", "The content focuses on community engagement and social interactions." ] return "\n".join(response_lines) elif any(word in user_input_lower for word in ['purpose', 'about', 'what is']): community_posts = len([b for b in content_blocks if 'community' in b['content_type'].lower()]) announcement_posts = len([b for b in content_blocks if 'announcement' in b['content_type'].lower()]) member_posts = len([b for b in content_blocks if 'post' in b['content_type'].lower()]) response_lines = [ "**🎯 Purpose Analysis**", "", f"Based on the extracted data, this {url_type.lower()} appears to be focused on:", "", f"- **Community Building:** {community_posts} community-related posts", f"- **Information Sharing:** {announcement_posts} announcements", f"- **Member Engagement:** {member_posts} member posts", "", f"**Overall Purpose:** {page_info.get('description', 'Community engagement and content sharing')}" ] return "\n".join(response_lines) elif any(word in user_input_lower for word in ['activity', 'engagement', 'active']): active_blocks = len([b for b in content_blocks if any(word in b['content_type'].lower() for word in ['post', 'question', 'event'])]) info_blocks = len(content_blocks) - active_blocks response_lines = [ "**📈 Activity Analysis**", "", "**Content Activity Level:**", f"- Total Content Blocks: {len(content_blocks)}", f"- Active Engagement Posts: {active_blocks}", f"- Informational Posts: {info_blocks}", "", f"The {url_type.lower()} shows a good mix of member engagement and informational content, suggesting an active community." ] return "\n".join(response_lines) else: response_lines = [ "**🤖 Analysis Response**", "", f"I've analyzed the {url_type.lower()} data for you.", "", f"**Your question:** \"{user_input}\"", f"**Content Source:** {source.upper()} data", f"**Content Type:** {url_type}", "", f"This {url_type.lower()} contains {len(content_blocks)} pieces of content focusing on community engagement and information sharing.", "", "**Try asking:**", "- \"What is the main purpose of this group/page?\"", "- \"Summarize the content and activities\"", "- \"What kind of engagement does this content show?\"" ] return "\n".join(response_lines) except Exception as e: return f"Analysis error: {str(e)}" def process_facebook_data(extracted_data): """Process extracted data for AI analysis with fallbacks""" if not extracted_data or extracted_data.get("status") != "success": return None, [] page_info = extracted_data['page_info'] content_blocks = extracted_data['content_blocks'] url_type = extracted_data.get('url_type', 'Facebook Content') source = extracted_data.get('source', 'unknown') all_text = f"FACEBOOK DATA ANALYSIS\n{'='*50}\n\n" all_text += f"📄 PAGE INFORMATION:\n" all_text += f"Title: {page_info['title']}\n" all_text += f"URL Type: {url_type}\n" all_text += f"Data Source: {source.upper()}\n" all_text += f"Access: {page_info.get('access_note', 'Public content')}\n" if page_info.get('member_count'): all_text += f"Members: {page_info['member_count']}\n" elif page_info.get('follower_count'): all_text += f"Followers: {page_info['follower_count']}\n" all_text += f"Extracted: {extracted_data['extraction_time']}\n\n" all_text += f"📊 CONTENT ANALYSIS:\n" all_text += f"Content Blocks: {len(content_blocks)}\n" all_text += f"Public Content: {sum(1 for b in content_blocks if b['is_public_content'])} blocks\n\n" for i, block in enumerate(content_blocks): all_text += f"--- BLOCK {i+1} ---\n" all_text += f"Type: {block['content_type']}\n" all_text += f"Words: {block['word_count']} | Public: {block['is_public_content']}\n" all_text += f"Content: {block['content']}\n\n" all_text += "="*50 # Split into chunks splitter = CharacterTextSplitter( separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len ) chunks = splitter.split_text(all_text) documents = [Document(page_content=chunk) for chunk in chunks] return "simple", documents def create_chatbot(vectorstore): """Create conversational chatbot""" try: llm = get_llm() if llm is None: return "simple" # Return simple mode if LLM fails memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key="answer" ) chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=vectorstore.as_retriever(search_kwargs={"k": 3}), memory=memory, return_source_documents=True, output_key="answer" ) return chain except Exception as e: st.error(f"Chatbot creation failed: {str(e)}") return "simple" # Fallback to simple mode def main(): st.title("📘 Facebook Data Extractor - REAL DATA ATTEMPT") st.markdown("**Aggressive real data extraction - No automatic demo fallback**") if st.button("← Back to Main Dashboard"): st.switch_page("app.py") # Initialize session state if "extractor" not in st.session_state: st.session_state.extractor = FacebookRealExtractor() # Changed to real extractor if "facebook_data" not in st.session_state: st.session_state.facebook_data = None if "vectorstore" not in st.session_state: st.session_state.vectorstore = None if "chatbot" not in st.session_state: st.session_state.chatbot = None if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "processing_mode" not in st.session_state: st.session_state.processing_mode = "ai" if "last_user_input" not in st.session_state: st.session_state.last_user_input = "" # Sidebar with st.sidebar: st.header("⚙️ Facebook Configuration") data_type = st.selectbox( "Content Type", ["group", "page", "event", "post", "general"], help="Select the type of Facebook content" ) facebook_url = st.text_input( "Facebook URL", placeholder="https://www.facebook.com/groups/gamersofbangladesh2", help="Enter any Facebook URL for REAL data extraction" ) # Quick test URLs st.markdown("### 🚀 Test URLs") test_urls = { "Gaming Group": "https://www.facebook.com/groups/gamersofbangladesh2", "Tech Community": "https://www.facebook.com/groups/programmingcommunity", "Business Page": "https://www.facebook.com/Meta/", } for name, url in test_urls.items(): if st.button(f"🔗 {name}", key=f"fb_{name}"): st.session_state.current_fb_url = url st.rerun() if st.button("🚀 EXTRACT REAL DATA", type="primary"): url_to_use = facebook_url or getattr(st.session_state, 'current_fb_url', '') if not url_to_use: st.error("❌ Please enter a Facebook URL") elif 'facebook.com' not in url_to_use: st.error("❌ Please enter a valid Facebook URL") else: with st.spinner("🔄 Aggressively extracting REAL Facebook data..."): extracted_data = st.session_state.extractor.extract_data(url_to_use, data_type) if extracted_data.get("status") == "success": st.session_state.facebook_data = extracted_data st.session_state.chatbot = "simple" st.session_state.chat_history = [] st.session_state.last_user_input = "" source = extracted_data.get('source', 'unknown') if source == 'real': st.success("🎉 SUCCESS: Real Facebook data extracted!") st.balloons() else: st.warning("⚠️ Using fallback data - Facebook blocked real extraction") else: error_msg = extracted_data.get("error", "Unknown error") st.error(f"❌ Extraction failed: {error_msg}") if st.session_state.facebook_data: st.markdown("---") if st.button("🗑️ Clear Data", type="secondary"): st.session_state.facebook_data = None st.session_state.vectorstore = None st.session_state.chatbot = None st.session_state.chat_history = [] st.session_state.last_user_input = "" st.rerun() # Main content st.header("📊 Extraction Results") if st.session_state.facebook_data: data = st.session_state.facebook_data page_info = data['page_info'] content_blocks = data['content_blocks'] source = data.get('source', 'unknown') if source == 'real': st.success("✅ **REAL DATA** - Successfully extracted from Facebook!") else: st.warning("📝 **FALLBACK DATA** - Facebook blocked real extraction") # Metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Content Blocks", len(content_blocks)) with col2: st.metric("Data Source", "REAL" if source == 'real' else "FALLBACK") with col3: st.metric("Status", "Success") # Page info st.subheader("🏷️ Page Information") st.write(f"**Title:** {page_info['title']}") st.write(f"**Description:** {page_info.get('description', 'No description')}") st.write(f"**Access Note:** {page_info.get('access_note', 'Public content')}") st.write(f"**Response Code:** {page_info.get('response_code', 'N/A')}") # Content samples st.subheader("📝 Content Analysis") for i, block in enumerate(content_blocks): with st.expander(f"Content {i+1} - {block['content_type']} ({block['word_count']} words)"): st.write(block['content']) st.caption(f"Public: {block['is_public_content']}") else: st.info(""" ## 📘 Facebook Real Data Extractor **Aggressive Approach - No Automatic Demo** **This version:** - Tries multiple extraction methods - Uses rotating user agents - Attempts mobile versions - Tries text-only alternatives - Only uses demo data as LAST RESORT **Technical Challenges:** - Facebook has strong anti-bot protection - Requires JavaScript execution - Needs session management - Heavy rate limiting **For your project:** - Shows real technical limitations - Demonstrates platform security - Provides educational value """) # Chat section st.markdown("---") st.header("💬 Analysis Chat") if st.session_state.chatbot and st.session_state.facebook_data: # Display chat history for chat in st.session_state.chat_history: if chat["role"] == "user": with st.chat_message("user"): st.write(chat['content']) elif chat["role"] == "assistant": with st.chat_message("assistant"): st.write(chat['content']) # Suggested questions when no history if not st.session_state.chat_history: st.subheader("💡 Try asking:") suggestions = [ "What is this Facebook content about?", "Summarize the extracted data", "What kind of information was found?", "Analyze the content structure" ] cols = st.columns(len(suggestions)) for i, suggestion in enumerate(suggestions): with cols[i]: if st.button(suggestion, key=f"fb_suggest_{suggestion}", use_container_width=True): st.info(f"Type: '{suggestion}' in the chat below") elif st.session_state.facebook_data: st.info("💬 Start chatting about the Facebook data") else: st.info("🔍 Extract Facebook data to enable analysis") # CHAT INPUT if st.session_state.chatbot and st.session_state.facebook_data: user_input = st.chat_input("Ask about the Facebook data...") if user_input and user_input != st.session_state.last_user_input: st.session_state.last_user_input = user_input st.session_state.chat_history.append({"role": "user", "content": user_input}) with st.spinner("🤔 Analyzing..."): try: response = simple_chat_analysis(user_input, st.session_state.facebook_data) st.session_state.chat_history.append({"role": "assistant", "content": response}) st.rerun() except Exception as e: error_msg = f"Analysis Error: {str(e)}" st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) st.rerun() if __name__ == "__main__": main()