|
|
|
|
|
import streamlit as st |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import json |
|
|
import re |
|
|
from datetime import datetime |
|
|
from typing import List, Dict |
|
|
import os |
|
|
import tempfile |
|
|
import random |
|
|
|
|
|
|
|
|
from langchain_text_splitters import CharacterTextSplitter |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain.vectorstores import FAISS |
|
|
from langchain.memory import ConversationBufferMemory |
|
|
from langchain.chains import ConversationalRetrievalChain |
|
|
from langchain.schema import Document |
|
|
from langchain_community.llms import HuggingFaceHub |
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Facebook Data Extractor", |
|
|
page_icon="π", |
|
|
layout="wide" |
|
|
) |
|
|
|
|
|
class FacebookRealExtractor: |
|
|
"""Aggressive Facebook data extractor that tries multiple approaches""" |
|
|
|
|
|
def __init__(self): |
|
|
self.session = requests.Session() |
|
|
self.setup_session() |
|
|
|
|
|
def setup_session(self): |
|
|
"""Setup requests session with rotating headers""" |
|
|
self.user_agents = [ |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0' |
|
|
] |
|
|
|
|
|
def extract_data(self, url: str, data_type: str) -> Dict: |
|
|
"""Extract real Facebook data with multiple attempts""" |
|
|
st.info(f"π Attempting real extraction: {url}") |
|
|
|
|
|
|
|
|
methods = [ |
|
|
self._try_direct_extraction, |
|
|
self._try_mobile_extraction, |
|
|
self._try_text_only_extraction |
|
|
] |
|
|
|
|
|
for method in methods: |
|
|
result = method(url) |
|
|
if result.get("status") == "success": |
|
|
st.success("β
Real Facebook data extracted!") |
|
|
result["source"] = "real" |
|
|
result["data_type"] = data_type |
|
|
return result |
|
|
|
|
|
|
|
|
st.error("β All real extraction methods failed. Facebook has strong anti-bot protection.") |
|
|
st.info(""" |
|
|
**Why this happens:** |
|
|
- Facebook blocks automated requests |
|
|
- Requires JavaScript execution |
|
|
- Needs cookies and session management |
|
|
- Heavy anti-bot detection |
|
|
|
|
|
**For your university project, you can:** |
|
|
1. Use the demo data to demonstrate functionality |
|
|
2. Explain these technical limitations in your report |
|
|
3. Show that LinkedIn works (no restrictions) |
|
|
4. Discuss platform security differences |
|
|
""") |
|
|
|
|
|
|
|
|
return self._get_minimal_demo_data(url, data_type) |
|
|
|
|
|
def _try_direct_extraction(self, url: str) -> Dict: |
|
|
"""Try direct extraction with rotating headers""" |
|
|
try: |
|
|
headers = { |
|
|
'User-Agent': random.choice(self.user_agents), |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/avif,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.5', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
'DNT': '1', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1', |
|
|
'Sec-Fetch-Dest': 'document', |
|
|
'Sec-Fetch-Mode': 'navigate', |
|
|
'Sec-Fetch-Site': 'none', |
|
|
'Cache-Control': 'max-age=0', |
|
|
} |
|
|
|
|
|
|
|
|
response = self.session.get( |
|
|
url, |
|
|
headers=headers, |
|
|
timeout=15, |
|
|
allow_redirects=True |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return self._parse_facebook_response(response, url) |
|
|
else: |
|
|
return {"status": "error", "reason": f"HTTP {response.status_code}"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": str(e)} |
|
|
|
|
|
def _try_mobile_extraction(self, url: str) -> Dict: |
|
|
"""Try mobile version extraction""" |
|
|
try: |
|
|
mobile_headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.5', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
} |
|
|
|
|
|
response = self.session.get(url, headers=mobile_headers, timeout=15) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return self._parse_facebook_response(response, url) |
|
|
else: |
|
|
return {"status": "error", "reason": f"Mobile HTTP {response.status_code}"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": str(e)} |
|
|
|
|
|
def _try_text_only_extraction(self, url: str) -> Dict: |
|
|
"""Try text-only version or alternative approaches""" |
|
|
try: |
|
|
|
|
|
textise_url = f"https://r.jina.ai/{url}" |
|
|
response = self.session.get(textise_url, timeout=20) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return self._parse_textise_response(response, url) |
|
|
else: |
|
|
return {"status": "error", "reason": "Textise failed"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": str(e)} |
|
|
|
|
|
def _parse_facebook_response(self, response, url: str) -> Dict: |
|
|
"""Parse Facebook response for real data""" |
|
|
try: |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
|
|
|
title = soup.find('title') |
|
|
description = soup.find('meta', attrs={'name': 'description'}) |
|
|
og_title = soup.find('meta', property='og:title') |
|
|
og_description = soup.find('meta', property='og:description') |
|
|
|
|
|
|
|
|
content_elements = soup.find_all(['p', 'div', 'span'], string=True) |
|
|
meaningful_text = [] |
|
|
|
|
|
for element in content_elements: |
|
|
text = element.get_text().strip() |
|
|
if (len(text) > 20 and |
|
|
not any(word in text.lower() for word in ['cookie', 'login', 'sign up', 'facebook']) and |
|
|
len(text.split()) > 3): |
|
|
meaningful_text.append(text) |
|
|
|
|
|
|
|
|
content_blocks = [] |
|
|
for i, text in enumerate(meaningful_text[:10]): |
|
|
content_blocks.append({ |
|
|
"id": i + 1, |
|
|
"content": text, |
|
|
"length": len(text), |
|
|
"word_count": len(text.split()), |
|
|
"content_type": self._classify_content(text), |
|
|
"is_public_content": True |
|
|
}) |
|
|
|
|
|
if content_blocks: |
|
|
return { |
|
|
"page_info": { |
|
|
"title": og_title['content'] if og_title else (title.text if title else "Facebook Content"), |
|
|
"description": og_description['content'] if og_description else (description['content'] if description else ""), |
|
|
"url": url, |
|
|
"response_code": response.status_code, |
|
|
"content_length": len(response.text), |
|
|
"access_note": "Real data extracted successfully" |
|
|
}, |
|
|
"content_blocks": content_blocks, |
|
|
"extraction_time": datetime.now().isoformat(), |
|
|
"status": "success" |
|
|
} |
|
|
else: |
|
|
return {"status": "error", "reason": "No meaningful content found"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": f"Parsing error: {str(e)}"} |
|
|
|
|
|
def _parse_textise_response(self, response, url: str) -> Dict: |
|
|
"""Parse textise response""" |
|
|
try: |
|
|
|
|
|
lines = response.text.split('\n') |
|
|
meaningful_lines = [line.strip() for line in lines if len(line.strip()) > 30] |
|
|
|
|
|
content_blocks = [] |
|
|
for i, line in enumerate(meaningful_lines[:8]): |
|
|
content_blocks.append({ |
|
|
"id": i + 1, |
|
|
"content": line, |
|
|
"length": len(line), |
|
|
"word_count": len(line.split()), |
|
|
"content_type": self._classify_content(line), |
|
|
"is_public_content": True |
|
|
}) |
|
|
|
|
|
if content_blocks: |
|
|
return { |
|
|
"page_info": { |
|
|
"title": "Facebook Content (via Textise)", |
|
|
"description": "Content extracted using text-only method", |
|
|
"url": url, |
|
|
"response_code": response.status_code, |
|
|
"content_length": len(response.text), |
|
|
"access_note": "Real data via text-only extraction" |
|
|
}, |
|
|
"content_blocks": content_blocks, |
|
|
"extraction_time": datetime.now().isoformat(), |
|
|
"status": "success" |
|
|
} |
|
|
else: |
|
|
return {"status": "error", "reason": "No content from textise"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": str(e)} |
|
|
|
|
|
def _classify_content(self, text: str) -> str: |
|
|
"""Classify content type""" |
|
|
text_lower = text.lower() |
|
|
|
|
|
if any(word in text_lower for word in ['welcome', 'join', 'community']): |
|
|
return "welcome_message" |
|
|
elif any(word in text_lower for word in ['event', 'meetup', 'schedule']): |
|
|
return "event_info" |
|
|
elif any(word in text_lower for word in ['post', 'share', 'comment']): |
|
|
return "social_content" |
|
|
elif any(word in text_lower for word in ['question', 'help', 'advice']): |
|
|
return "question_post" |
|
|
else: |
|
|
return "general_content" |
|
|
|
|
|
def _get_minimal_demo_data(self, url: str, data_type: str) -> Dict: |
|
|
"""Only use demo data as absolute last resort""" |
|
|
st.warning("π Using minimal demo data for demonstration purposes") |
|
|
|
|
|
return { |
|
|
"page_info": { |
|
|
"title": "Facebook Content (Demo - Real extraction blocked)", |
|
|
"description": "This would show real Facebook data if not blocked by platform restrictions", |
|
|
"url": url, |
|
|
"response_code": 403, |
|
|
"content_length": 0, |
|
|
"access_note": "DEMO: Facebook blocked real data extraction" |
|
|
}, |
|
|
"content_blocks": [ |
|
|
{ |
|
|
"id": 1, |
|
|
"content": "This is a demonstration of what real Facebook data would look like. Actual extraction is blocked by Facebook's anti-bot protection.", |
|
|
"length": 120, |
|
|
"word_count": 20, |
|
|
"content_type": "demo_notice", |
|
|
"is_public_content": True |
|
|
}, |
|
|
{ |
|
|
"id": 2, |
|
|
"content": "For your university project, you can discuss these technical limitations and how social media platforms implement security measures.", |
|
|
"length": 130, |
|
|
"word_count": 18, |
|
|
"content_type": "educational_note", |
|
|
"is_public_content": True |
|
|
} |
|
|
], |
|
|
"url_type": "Facebook Content", |
|
|
"extraction_time": datetime.now().isoformat(), |
|
|
"data_type": data_type, |
|
|
"status": "success", |
|
|
"source": "demo_fallback" |
|
|
} |
|
|
|
|
|
|
|
|
def get_embeddings(): |
|
|
"""Initialize embeddings with better error handling and cache management""" |
|
|
try: |
|
|
|
|
|
model_options = [ |
|
|
"sentence-transformers/all-MiniLM-L6-v2", |
|
|
"sentence-transformers/paraphrase-MiniLM-L3-v2", |
|
|
"sentence-transformers/all-mpnet-base-v2" |
|
|
] |
|
|
|
|
|
for model_name in model_options: |
|
|
try: |
|
|
st.info(f"π Trying embedding model: {model_name}") |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_cache: |
|
|
embeddings = HuggingFaceEmbeddings( |
|
|
model_name=model_name, |
|
|
cache_folder=temp_cache, |
|
|
model_kwargs={'device': 'cpu'} |
|
|
) |
|
|
|
|
|
|
|
|
test_text = "Hello world" |
|
|
test_embedding = embeddings.embed_query(test_text) |
|
|
if test_embedding and len(test_embedding) > 0: |
|
|
st.success(f"β
Loaded embeddings: {model_name.split('/')[-1]}") |
|
|
return embeddings |
|
|
|
|
|
except Exception as e: |
|
|
st.warning(f"β οΈ Failed to load {model_name}: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
st.warning("π Trying fallback embedding method...") |
|
|
try: |
|
|
embeddings = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/all-MiniLM-L6-v2" |
|
|
) |
|
|
st.success("β
Loaded fallback embeddings") |
|
|
return embeddings |
|
|
except Exception as e: |
|
|
st.error(f"β All embedding models failed: {e}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"β Embeddings error: {e}") |
|
|
return None |
|
|
|
|
|
def get_llm(): |
|
|
"""Initialize HuggingFace LLM""" |
|
|
try: |
|
|
api_key = os.getenv('HUGGINGFACEHUB_API_TOKEN') |
|
|
if not api_key: |
|
|
st.error("HuggingFace API Key not found") |
|
|
return None |
|
|
|
|
|
|
|
|
model_options = [ |
|
|
"mistralai/Mistral-7B-Instruct-v0.1", |
|
|
"google/flan-t5-large", |
|
|
"microsoft/DialoGPT-large" |
|
|
] |
|
|
|
|
|
for model_id in model_options: |
|
|
try: |
|
|
st.info(f"π Trying LLM: {model_id}") |
|
|
|
|
|
llm = HuggingFaceHub( |
|
|
repo_id=model_id, |
|
|
huggingfacehub_api_token=api_key, |
|
|
model_kwargs={ |
|
|
"temperature": 0.7, |
|
|
"max_length": 512, |
|
|
"max_new_tokens": 256, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
test_response = llm.invoke("Hello") |
|
|
if test_response and len(test_response.strip()) > 0: |
|
|
st.success(f"β
Loaded LLM: {model_id.split('/')[-1]}") |
|
|
return llm |
|
|
|
|
|
except Exception as e: |
|
|
st.warning(f"β οΈ Failed to load {model_id}: {str(e)}") |
|
|
continue |
|
|
|
|
|
st.error("β All LLMs failed to load") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"β LLM error: {e}") |
|
|
return None |
|
|
|
|
|
def simple_chat_analysis(user_input: str, extracted_data: Dict) -> str: |
|
|
"""Simple rule-based chat analysis when embeddings fail""" |
|
|
try: |
|
|
if not extracted_data: |
|
|
return "No data available for analysis." |
|
|
|
|
|
page_info = extracted_data.get('page_info', {}) |
|
|
content_blocks = extracted_data.get('content_blocks', []) |
|
|
url_type = extracted_data.get('url_type', 'Facebook Content') |
|
|
source = extracted_data.get('source', 'unknown') |
|
|
|
|
|
user_input_lower = user_input.lower() |
|
|
|
|
|
|
|
|
if any(word in user_input_lower for word in ['summary', 'summarize', 'overview']): |
|
|
response_lines = [ |
|
|
f"**π Summary of {page_info.get('title', 'Facebook Content')}**", |
|
|
"", |
|
|
f"**Type:** {url_type}", |
|
|
f"**Data Source:** {source.upper()}", |
|
|
f"**Description:** {page_info.get('description', 'No description available')}", |
|
|
"", |
|
|
f"This appears to be a {url_type.lower()} with {len(content_blocks)} content blocks.", |
|
|
"", |
|
|
"**Key Content Types:**", |
|
|
f"{', '.join(set(block['content_type'] for block in content_blocks))}", |
|
|
"", |
|
|
"The content focuses on community engagement and social interactions." |
|
|
] |
|
|
return "\n".join(response_lines) |
|
|
|
|
|
elif any(word in user_input_lower for word in ['purpose', 'about', 'what is']): |
|
|
community_posts = len([b for b in content_blocks if 'community' in b['content_type'].lower()]) |
|
|
announcement_posts = len([b for b in content_blocks if 'announcement' in b['content_type'].lower()]) |
|
|
member_posts = len([b for b in content_blocks if 'post' in b['content_type'].lower()]) |
|
|
|
|
|
response_lines = [ |
|
|
"**π― Purpose Analysis**", |
|
|
"", |
|
|
f"Based on the extracted data, this {url_type.lower()} appears to be focused on:", |
|
|
"", |
|
|
f"- **Community Building:** {community_posts} community-related posts", |
|
|
f"- **Information Sharing:** {announcement_posts} announcements", |
|
|
f"- **Member Engagement:** {member_posts} member posts", |
|
|
"", |
|
|
f"**Overall Purpose:** {page_info.get('description', 'Community engagement and content sharing')}" |
|
|
] |
|
|
return "\n".join(response_lines) |
|
|
|
|
|
elif any(word in user_input_lower for word in ['activity', 'engagement', 'active']): |
|
|
active_blocks = len([b for b in content_blocks if any(word in b['content_type'].lower() for word in ['post', 'question', 'event'])]) |
|
|
info_blocks = len(content_blocks) - active_blocks |
|
|
|
|
|
response_lines = [ |
|
|
"**π Activity Analysis**", |
|
|
"", |
|
|
"**Content Activity Level:**", |
|
|
f"- Total Content Blocks: {len(content_blocks)}", |
|
|
f"- Active Engagement Posts: {active_blocks}", |
|
|
f"- Informational Posts: {info_blocks}", |
|
|
"", |
|
|
f"The {url_type.lower()} shows a good mix of member engagement and informational content, suggesting an active community." |
|
|
] |
|
|
return "\n".join(response_lines) |
|
|
|
|
|
else: |
|
|
response_lines = [ |
|
|
"**π€ Analysis Response**", |
|
|
"", |
|
|
f"I've analyzed the {url_type.lower()} data for you.", |
|
|
"", |
|
|
f"**Your question:** \"{user_input}\"", |
|
|
f"**Content Source:** {source.upper()} data", |
|
|
f"**Content Type:** {url_type}", |
|
|
"", |
|
|
f"This {url_type.lower()} contains {len(content_blocks)} pieces of content focusing on community engagement and information sharing.", |
|
|
"", |
|
|
"**Try asking:**", |
|
|
"- \"What is the main purpose of this group/page?\"", |
|
|
"- \"Summarize the content and activities\"", |
|
|
"- \"What kind of engagement does this content show?\"" |
|
|
] |
|
|
return "\n".join(response_lines) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Analysis error: {str(e)}" |
|
|
|
|
|
def process_facebook_data(extracted_data): |
|
|
"""Process extracted data for AI analysis with fallbacks""" |
|
|
if not extracted_data or extracted_data.get("status") != "success": |
|
|
return None, [] |
|
|
|
|
|
page_info = extracted_data['page_info'] |
|
|
content_blocks = extracted_data['content_blocks'] |
|
|
url_type = extracted_data.get('url_type', 'Facebook Content') |
|
|
source = extracted_data.get('source', 'unknown') |
|
|
|
|
|
all_text = f"FACEBOOK DATA ANALYSIS\n{'='*50}\n\n" |
|
|
all_text += f"π PAGE INFORMATION:\n" |
|
|
all_text += f"Title: {page_info['title']}\n" |
|
|
all_text += f"URL Type: {url_type}\n" |
|
|
all_text += f"Data Source: {source.upper()}\n" |
|
|
all_text += f"Access: {page_info.get('access_note', 'Public content')}\n" |
|
|
|
|
|
if page_info.get('member_count'): |
|
|
all_text += f"Members: {page_info['member_count']}\n" |
|
|
elif page_info.get('follower_count'): |
|
|
all_text += f"Followers: {page_info['follower_count']}\n" |
|
|
|
|
|
all_text += f"Extracted: {extracted_data['extraction_time']}\n\n" |
|
|
|
|
|
all_text += f"π CONTENT ANALYSIS:\n" |
|
|
all_text += f"Content Blocks: {len(content_blocks)}\n" |
|
|
all_text += f"Public Content: {sum(1 for b in content_blocks if b['is_public_content'])} blocks\n\n" |
|
|
|
|
|
for i, block in enumerate(content_blocks): |
|
|
all_text += f"--- BLOCK {i+1} ---\n" |
|
|
all_text += f"Type: {block['content_type']}\n" |
|
|
all_text += f"Words: {block['word_count']} | Public: {block['is_public_content']}\n" |
|
|
all_text += f"Content: {block['content']}\n\n" |
|
|
|
|
|
all_text += "="*50 |
|
|
|
|
|
|
|
|
splitter = CharacterTextSplitter( |
|
|
separator="\n", |
|
|
chunk_size=1000, |
|
|
chunk_overlap=200, |
|
|
length_function=len |
|
|
) |
|
|
|
|
|
chunks = splitter.split_text(all_text) |
|
|
documents = [Document(page_content=chunk) for chunk in chunks] |
|
|
|
|
|
return "simple", documents |
|
|
|
|
|
def create_chatbot(vectorstore): |
|
|
"""Create conversational chatbot""" |
|
|
try: |
|
|
llm = get_llm() |
|
|
if llm is None: |
|
|
return "simple" |
|
|
|
|
|
memory = ConversationBufferMemory( |
|
|
memory_key="chat_history", |
|
|
return_messages=True, |
|
|
output_key="answer" |
|
|
) |
|
|
|
|
|
chain = ConversationalRetrievalChain.from_llm( |
|
|
llm=llm, |
|
|
retriever=vectorstore.as_retriever(search_kwargs={"k": 3}), |
|
|
memory=memory, |
|
|
return_source_documents=True, |
|
|
output_key="answer" |
|
|
) |
|
|
return chain |
|
|
except Exception as e: |
|
|
st.error(f"Chatbot creation failed: {str(e)}") |
|
|
return "simple" |
|
|
|
|
|
def main(): |
|
|
st.title("π Facebook Data Extractor - REAL DATA ATTEMPT") |
|
|
st.markdown("**Aggressive real data extraction - No automatic demo fallback**") |
|
|
|
|
|
if st.button("β Back to Main Dashboard"): |
|
|
st.switch_page("app.py") |
|
|
|
|
|
|
|
|
if "extractor" not in st.session_state: |
|
|
st.session_state.extractor = FacebookRealExtractor() |
|
|
if "facebook_data" not in st.session_state: |
|
|
st.session_state.facebook_data = None |
|
|
if "vectorstore" not in st.session_state: |
|
|
st.session_state.vectorstore = None |
|
|
if "chatbot" not in st.session_state: |
|
|
st.session_state.chatbot = None |
|
|
if "chat_history" not in st.session_state: |
|
|
st.session_state.chat_history = [] |
|
|
if "processing_mode" not in st.session_state: |
|
|
st.session_state.processing_mode = "ai" |
|
|
if "last_user_input" not in st.session_state: |
|
|
st.session_state.last_user_input = "" |
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
st.header("βοΈ Facebook Configuration") |
|
|
|
|
|
data_type = st.selectbox( |
|
|
"Content Type", |
|
|
["group", "page", "event", "post", "general"], |
|
|
help="Select the type of Facebook content" |
|
|
) |
|
|
|
|
|
facebook_url = st.text_input( |
|
|
"Facebook URL", |
|
|
placeholder="https://www.facebook.com/groups/gamersofbangladesh2", |
|
|
help="Enter any Facebook URL for REAL data extraction" |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("### π Test URLs") |
|
|
test_urls = { |
|
|
"Gaming Group": "https://www.facebook.com/groups/gamersofbangladesh2", |
|
|
"Tech Community": "https://www.facebook.com/groups/programmingcommunity", |
|
|
"Business Page": "https://www.facebook.com/Meta/", |
|
|
} |
|
|
|
|
|
for name, url in test_urls.items(): |
|
|
if st.button(f"π {name}", key=f"fb_{name}"): |
|
|
st.session_state.current_fb_url = url |
|
|
st.rerun() |
|
|
|
|
|
if st.button("π EXTRACT REAL DATA", type="primary"): |
|
|
url_to_use = facebook_url or getattr(st.session_state, 'current_fb_url', '') |
|
|
|
|
|
if not url_to_use: |
|
|
st.error("β Please enter a Facebook URL") |
|
|
elif 'facebook.com' not in url_to_use: |
|
|
st.error("β Please enter a valid Facebook URL") |
|
|
else: |
|
|
with st.spinner("π Aggressively extracting REAL Facebook data..."): |
|
|
extracted_data = st.session_state.extractor.extract_data(url_to_use, data_type) |
|
|
|
|
|
if extracted_data.get("status") == "success": |
|
|
st.session_state.facebook_data = extracted_data |
|
|
st.session_state.chatbot = "simple" |
|
|
st.session_state.chat_history = [] |
|
|
st.session_state.last_user_input = "" |
|
|
|
|
|
source = extracted_data.get('source', 'unknown') |
|
|
if source == 'real': |
|
|
st.success("π SUCCESS: Real Facebook data extracted!") |
|
|
st.balloons() |
|
|
else: |
|
|
st.warning("β οΈ Using fallback data - Facebook blocked real extraction") |
|
|
|
|
|
else: |
|
|
error_msg = extracted_data.get("error", "Unknown error") |
|
|
st.error(f"β Extraction failed: {error_msg}") |
|
|
|
|
|
if st.session_state.facebook_data: |
|
|
st.markdown("---") |
|
|
if st.button("ποΈ Clear Data", type="secondary"): |
|
|
st.session_state.facebook_data = None |
|
|
st.session_state.vectorstore = None |
|
|
st.session_state.chatbot = None |
|
|
st.session_state.chat_history = [] |
|
|
st.session_state.last_user_input = "" |
|
|
st.rerun() |
|
|
|
|
|
|
|
|
st.header("π Extraction Results") |
|
|
|
|
|
if st.session_state.facebook_data: |
|
|
data = st.session_state.facebook_data |
|
|
page_info = data['page_info'] |
|
|
content_blocks = data['content_blocks'] |
|
|
source = data.get('source', 'unknown') |
|
|
|
|
|
if source == 'real': |
|
|
st.success("β
**REAL DATA** - Successfully extracted from Facebook!") |
|
|
else: |
|
|
st.warning("π **FALLBACK DATA** - Facebook blocked real extraction") |
|
|
|
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
with col1: |
|
|
st.metric("Content Blocks", len(content_blocks)) |
|
|
with col2: |
|
|
st.metric("Data Source", "REAL" if source == 'real' else "FALLBACK") |
|
|
with col3: |
|
|
st.metric("Status", "Success") |
|
|
|
|
|
|
|
|
st.subheader("π·οΈ Page Information") |
|
|
st.write(f"**Title:** {page_info['title']}") |
|
|
st.write(f"**Description:** {page_info.get('description', 'No description')}") |
|
|
st.write(f"**Access Note:** {page_info.get('access_note', 'Public content')}") |
|
|
st.write(f"**Response Code:** {page_info.get('response_code', 'N/A')}") |
|
|
|
|
|
|
|
|
st.subheader("π Content Analysis") |
|
|
for i, block in enumerate(content_blocks): |
|
|
with st.expander(f"Content {i+1} - {block['content_type']} ({block['word_count']} words)"): |
|
|
st.write(block['content']) |
|
|
st.caption(f"Public: {block['is_public_content']}") |
|
|
|
|
|
else: |
|
|
st.info(""" |
|
|
## π Facebook Real Data Extractor |
|
|
|
|
|
**Aggressive Approach - No Automatic Demo** |
|
|
|
|
|
**This version:** |
|
|
- Tries multiple extraction methods |
|
|
- Uses rotating user agents |
|
|
- Attempts mobile versions |
|
|
- Tries text-only alternatives |
|
|
- Only uses demo data as LAST RESORT |
|
|
|
|
|
**Technical Challenges:** |
|
|
- Facebook has strong anti-bot protection |
|
|
- Requires JavaScript execution |
|
|
- Needs session management |
|
|
- Heavy rate limiting |
|
|
|
|
|
**For your project:** |
|
|
- Shows real technical limitations |
|
|
- Demonstrates platform security |
|
|
- Provides educational value |
|
|
""") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.header("π¬ Analysis Chat") |
|
|
|
|
|
if st.session_state.chatbot and st.session_state.facebook_data: |
|
|
|
|
|
for chat in st.session_state.chat_history: |
|
|
if chat["role"] == "user": |
|
|
with st.chat_message("user"): |
|
|
st.write(chat['content']) |
|
|
elif chat["role"] == "assistant": |
|
|
with st.chat_message("assistant"): |
|
|
st.write(chat['content']) |
|
|
|
|
|
|
|
|
if not st.session_state.chat_history: |
|
|
st.subheader("π‘ Try asking:") |
|
|
suggestions = [ |
|
|
"What is this Facebook content about?", |
|
|
"Summarize the extracted data", |
|
|
"What kind of information was found?", |
|
|
"Analyze the content structure" |
|
|
] |
|
|
|
|
|
cols = st.columns(len(suggestions)) |
|
|
for i, suggestion in enumerate(suggestions): |
|
|
with cols[i]: |
|
|
if st.button(suggestion, key=f"fb_suggest_{suggestion}", use_container_width=True): |
|
|
st.info(f"Type: '{suggestion}' in the chat below") |
|
|
|
|
|
elif st.session_state.facebook_data: |
|
|
st.info("π¬ Start chatting about the Facebook data") |
|
|
else: |
|
|
st.info("π Extract Facebook data to enable analysis") |
|
|
|
|
|
|
|
|
if st.session_state.chatbot and st.session_state.facebook_data: |
|
|
user_input = st.chat_input("Ask about the Facebook data...") |
|
|
|
|
|
if user_input and user_input != st.session_state.last_user_input: |
|
|
st.session_state.last_user_input = user_input |
|
|
st.session_state.chat_history.append({"role": "user", "content": user_input}) |
|
|
|
|
|
with st.spinner("π€ Analyzing..."): |
|
|
try: |
|
|
response = simple_chat_analysis(user_input, st.session_state.facebook_data) |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": response}) |
|
|
st.rerun() |
|
|
except Exception as e: |
|
|
error_msg = f"Analysis Error: {str(e)}" |
|
|
st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
|
|
st.rerun() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |