# pages/linkedin_extractor.py import streamlit as st import requests from bs4 import BeautifulSoup from langchain_text_splitters import CharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain_core.documents import Document from langchain_community.llms import HuggingFaceHub import re import time import os st.set_page_config( page_title="LinkedIn AI Analyzer", page_icon="💼", layout="wide" ) def get_embeddings(): """Initialize embeddings with better fallback options""" try: # Try multiple embedding models with different approaches model_options = [ "sentence-transformers/all-MiniLM-L6-v2", "sentence-transformers/all-mpnet-base-v2", "BAAI/bge-small-en-v1.5", "sentence-transformers/paraphrase-MiniLM-L6-v2" ] for model_name in model_options: try: st.info(f"🔄 Trying to load: {model_name}") embeddings = HuggingFaceEmbeddings( model_name=model_name, model_kwargs={'device': 'cpu'}, encode_kwargs={ 'normalize_embeddings': True, 'batch_size': 32 } ) # Test the embeddings test_text = "Hello world" test_embedding = embeddings.embed_query(test_text) if test_embedding and len(test_embedding) > 0: st.success(f"✅ Loaded embeddings: {model_name.split('/')[-1]}") return embeddings except Exception as e: st.warning(f"⚠️ Failed to load {model_name}: {str(e)}") continue # If all models fail, try a simpler approach st.warning("🔄 Trying fallback embedding method...") try: embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", cache_folder="/tmp/embeddings" ) st.success("✅ Loaded fallback embeddings") return embeddings except Exception as e: st.error(f"❌ Fallback also failed: {e}") return None except Exception as e: st.error(f"❌ Embeddings error: {e}") return None def get_llm(): """Initialize Mistral 7B LLM with better error handling""" try: api_key = os.getenv('HUGGINGFACEHUB_API_TOKEN') if not api_key: st.error(""" ❌ HuggingFace API Key not found! Please add your API key: 1. Go to Space Settings → Variables and Secrets 2. Add: HUGGINGFACEHUB_API_TOKEN = "your_hf_token_here" 3. Restart the Space Get free API key: https://huggingface.co/settings/tokens """) return None # Try multiple models model_options = [ "mistralai/Mistral-7B-Instruct-v0.1", "HuggingFaceH4/zephyr-7b-beta", "google/flan-t5-large" ] for model_id in model_options: try: st.info(f"🔄 Trying to load: {model_id}") llm = HuggingFaceHub( repo_id=model_id, huggingfacehub_api_token=api_key, model_kwargs={ "temperature": 0.7, "max_length": 2048, "max_new_tokens": 512, "top_p": 0.95, "repetition_penalty": 1.1, "do_sample": True } ) # Test the model test_response = llm.invoke("Hello") if test_response: st.success(f"✅ Loaded model: {model_id.split('/')[-1]}") return llm except Exception as e: st.warning(f"⚠️ Failed to load {model_id}: {str(e)}") continue st.error("❌ All AI models failed to load") return None except Exception as e: st.error(f"❌ AI Model error: {e}") return None def simple_chat_analysis(user_input, extracted_data): """Simple chat analysis without embeddings as fallback""" try: if not extracted_data: return "No data available for analysis." content_blocks = extracted_data.get('content_blocks', []) page_info = extracted_data.get('page_info', {}) # Create context from extracted data context = f"Page Title: {page_info.get('title', 'N/A')}\n" context += f"Content Type: {extracted_data.get('data_type', 'N/A')}\n" context += f"Extracted Content:\n" for i, block in enumerate(content_blocks[:5]): # Limit context context += f"Block {i+1}: {block}\n" # Simple rule-based responses user_input_lower = user_input.lower() if any(word in user_input_lower for word in ['summary', 'summarize', 'overview']): return f"Based on the LinkedIn data, here's a summary:\n\nTitle: {page_info.get('title', 'N/A')}\nContent Type: {extracted_data.get('data_type', 'N/A')}\nTotal Content Blocks: {len(content_blocks)}\nKey Content: {content_blocks[0][:200] if content_blocks else 'No content available'}..." elif any(word in user_input_lower for word in ['skills', 'expertise', 'technologies']): return "I can analyze the content for skills and expertise. The extracted data shows professional information that can be reviewed for specific skills mentioned in the content blocks." elif any(word in user_input_lower for word in ['experience', 'background', 'career']): return "The LinkedIn data contains professional experience information. I can help you analyze the career background and work history mentioned in the profile." else: return f"I've analyzed the LinkedIn data. {page_info.get('title', 'The profile')} contains {len(content_blocks)} content blocks with professional information. You can ask me about summaries, skills, experience, or specific details from the extracted content." except Exception as e: return f"Analysis error: {str(e)}" def extract_linkedin_data(url, data_type): """Extract data from LinkedIn URLs""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } st.info(f"🌐 Accessing: {url}") response = requests.get(url, headers=headers, timeout=25) if response.status_code != 200: return { "error": f"Failed to access page (Status: {response.status_code})", "status": "error" } soup = BeautifulSoup(response.text, 'html.parser') # Remove scripts and styles for script in soup(["script", "style", "meta", "link", "nav", "header", "footer"]): script.decompose() # Extract and clean text text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) clean_text = ' '.join(chunk for chunk in chunks if chunk) # Extract meaningful content paragraphs = [p.strip() for p in clean_text.split('.') if len(p.strip()) > 40] if not paragraphs: return { "error": "No meaningful content found. The page might require login or have restricted access.", "status": "error" } # Extract page title title = soup.find('title') page_title = title.text.strip() if title else "LinkedIn Page" # Structure the extracted data extracted_data = { "page_info": { "title": page_title, "url": url, "response_code": response.status_code, "content_length": len(clean_text) }, "content_blocks": paragraphs, "extraction_time": time.strftime('%Y-%m-%d %H:%M:%S'), "data_type": data_type, "status": "success" } return extracted_data except requests.exceptions.Timeout: return {"error": "Request timed out. Please try again.", "status": "error"} except requests.exceptions.ConnectionError: return {"error": "Connection failed. Please check the URL and try again.", "status": "error"} except Exception as e: return {"error": f"Extraction error: {str(e)}", "status": "error"} def process_extracted_data(extracted_data): """Process extracted data for AI analysis with fallbacks""" if not extracted_data or extracted_data.get("status") != "success": return None, [] try: page_info = extracted_data['page_info'] content_blocks = extracted_data['content_blocks'] # Structure the data for AI all_text = f"LINKEDIN DATA ANALYSIS REPORT\n" all_text += "=" * 70 + "\n\n" all_text += f"📄 PAGE INFORMATION:\n" all_text += f"Title: {page_info['title']}\n" all_text += f"URL: {page_info['url']}\n" all_text += f"Type: {extracted_data['data_type'].upper()}\n" all_text += f"Extracted: {extracted_data['extraction_time']}\n" all_text += f"Response Code: {page_info['response_code']}\n" all_text += f"Content Length: {page_info['content_length']} characters\n\n" all_text += f"📊 CONTENT ANALYSIS:\n" all_text += f"Total Content Blocks: {len(content_blocks)}\n\n" # Add content blocks for i, block in enumerate(content_blocks[:10]): # Limit for performance all_text += f"--- CONTENT BLOCK {i+1} ---\n" all_text += f"Words: {len(block.split())} | Characters: {len(block)}\n" all_text += f"Content: {block}\n\n" all_text += "=" * 70 + "\n" all_text += "END OF EXTRACTION REPORT" # Try to create vector store embeddings = get_embeddings() if embeddings is None: st.warning("⚠️ Using simple text processing (embeddings unavailable)") # Return simple document structure documents = [Document(page_content=all_text)] return "simple", documents # Split into chunks splitter = CharacterTextSplitter( separator="\n", chunk_size=800, # Smaller for better performance chunk_overlap=100, length_function=len ) chunks = splitter.split_text(all_text) documents = [Document(page_content=chunk) for chunk in chunks] # Create vector store vectorstore = FAISS.from_documents(documents, embeddings) return vectorstore, chunks except Exception as e: st.error(f"❌ Processing failed: {e}") # Fallback: return simple structure if extracted_data: simple_doc = Document(page_content=f"LinkedIn Data: {extracted_data['page_info']['title']}") return "simple", [simple_doc] return None, [] def create_chatbot(vectorstore): """Create conversational chatbot with fallbacks""" try: llm = get_llm() if llm is None: st.warning("⚠️ Using simple chat analysis (AI model unavailable)") return "simple" memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key="answer" ) chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=vectorstore.as_retriever(search_kwargs={"k": 3}), memory=memory, return_source_documents=True, output_key="answer" ) return chain except Exception as e: st.error(f"❌ Chatbot creation failed: {str(e)}") return "simple" def clear_chat_history(): """Clear chat history while keeping extracted data""" st.session_state.chat_history = [] st.success("🔄 Chat history cleared! Starting fresh conversation.") def display_metrics(extracted_data): """Display extraction metrics""" if not extracted_data: return page_info = extracted_data['page_info'] content_blocks = extracted_data['content_blocks'] col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Content Blocks", len(content_blocks)) with col2: total_words = sum(len(block.split()) for block in content_blocks) st.metric("Total Words", total_words) with col3: st.metric("Characters", f"{page_info['content_length']:,}") with col4: st.metric("Response Code", page_info['response_code']) def main(): st.title("💼 LinkedIn AI Analyzer") if st.button("← Back to Main Dashboard"): st.switch_page("app.py") # Initialize session state if "extracted_data" not in st.session_state: st.session_state.extracted_data = None if "vectorstore" not in st.session_state: st.session_state.vectorstore = None if "chatbot" not in st.session_state: st.session_state.chatbot = None if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "processing" not in st.session_state: st.session_state.processing = False if "current_url" not in st.session_state: st.session_state.current_url = "" # Sidebar with st.sidebar: st.markdown("### ⚙️ Configuration") # Data type selection data_type = st.selectbox( "📊 Content Type", ["profile", "company", "post"], help="Select the type of LinkedIn content" ) # URL input url_placeholder = { "profile": "https://www.linkedin.com/in/username/", "company": "https://www.linkedin.com/company/companyname/", "post": "https://www.linkedin.com/posts/username_postid/" } linkedin_url = st.text_input( "🌐 LinkedIn URL", placeholder=url_placeholder[data_type], help="Enter a public LinkedIn URL" ) # Suggested URLs st.markdown("### 🚀 Quick Test") suggested_urls = { "Microsoft": "https://www.linkedin.com/company/microsoft/", "Google": "https://www.linkedin.com/company/google/", "Apple": "https://www.linkedin.com/company/apple/", "Amazon": "https://www.linkedin.com/company/amazon/" } for name, url in suggested_urls.items(): if st.button(f"🏢 {name}", key=name, use_container_width=True): st.session_state.current_url = url st.rerun() # Extract button if st.button("🚀 Extract & Analyze", type="primary", use_container_width=True): url_to_use = linkedin_url.strip() or st.session_state.current_url if not url_to_use: st.warning("⚠️ Please enter a LinkedIn URL") elif not url_to_use.startswith('https://www.linkedin.com/'): st.error("❌ Please enter a valid LinkedIn URL") else: st.session_state.processing = True with st.spinner("🔄 Extracting and analyzing data..."): extracted_data = extract_linkedin_data(url_to_use, data_type) if extracted_data.get("status") == "success": st.session_state.extracted_data = extracted_data st.session_state.current_url = url_to_use # Process for AI (with fallbacks) result = process_extracted_data(extracted_data) if result: vectorstore, chunks = result st.session_state.vectorstore = vectorstore # Create chatbot (with fallbacks) chatbot = create_chatbot(vectorstore) st.session_state.chatbot = chatbot st.session_state.chat_history = [] if chatbot == "simple": st.warning("⚠️ Using simple chat mode (AI features limited)") else: st.success(f"✅ AI analysis ready! Processed {len(chunks) if chunks else 1} content chunks.") st.balloons() else: st.error("❌ Failed to process data for analysis") else: error_msg = extracted_data.get("error", "Unknown error occurred") st.error(f"❌ Extraction failed: {error_msg}") st.session_state.processing = False # Chat management if st.session_state.extracted_data and st.session_state.extracted_data.get("status") == "success": st.markdown("---") st.subheader("💬 Chat Management") if st.button("🗑️ Clear Chat History", type="secondary", use_container_width=True): clear_chat_history() # Debug info if st.checkbox("🔧 Show Debug Info", False): st.markdown("### Debug Information") st.write("Extracted Data:", st.session_state.extracted_data is not None) st.write("Vectorstore Type:", type(st.session_state.vectorstore).__name__ if st.session_state.vectorstore else "None") st.write("Chatbot Type:", "simple" if st.session_state.chatbot == "simple" else type(st.session_state.chatbot).__name__ if st.session_state.chatbot else "None") st.write("Chat History Length:", len(st.session_state.chat_history)) st.write("Processing:", st.session_state.processing) # Main content area col1, col2 = st.columns([1, 1]) with col1: st.markdown("### 📊 Extraction Results") if st.session_state.processing: st.info("🔄 Processing LinkedIn data...") elif st.session_state.extracted_data: data = st.session_state.extracted_data page_info = data['page_info'] content_blocks = data['content_blocks'] st.success("✅ Extraction Complete") # Display metrics display_metrics(data) # Display page info st.markdown("#### 🏷️ Page Information") st.write(f"**Title:** {page_info['title']}") st.write(f"**URL:** {page_info['url']}") st.write(f"**Data Type:** {data['data_type'].title()}") st.write(f"**Content Blocks:** {len(content_blocks)}") st.write(f"**Extraction Time:** {data['extraction_time']}") # Display sample content st.markdown("#### 📝 Sample Content") for i, block in enumerate(content_blocks[:3]): with st.expander(f"Content Block {i+1} ({len(block.split())} words)"): st.write(block) if len(content_blocks) > 3: st.info(f"📄 And {len(content_blocks) - 3} more content blocks...") else: st.info(""" 👋 **Welcome to LinkedIn AI Analyzer!** **To get started:** 1. Select content type 2. Enter a LinkedIn URL or click a suggested company 3. Click "Extract & Analyze" 4. Chat with AI about the extracted content **Supported URLs:** - 👤 Public Profiles - 🏢 Company Pages - 📝 Public Posts **Features:** - Content extraction - Basic analysis - Interactive chat - Data insights """) with col2: st.markdown("### 💬 AI Chat Analysis") has_extracted_data = st.session_state.extracted_data and st.session_state.extracted_data.get("status") == "success" if has_extracted_data: st.success("💬 Chat ready! Ask questions about the LinkedIn data.") # Display chat history for chat in st.session_state.chat_history: if chat["role"] == "user": with st.chat_message("user"): st.write(chat['content']) elif chat["role"] == "assistant": with st.chat_message("assistant"): st.write(chat['content']) # Chat input user_input = st.chat_input("Ask about the LinkedIn data...") if user_input: # Add user message to history st.session_state.chat_history.append({"role": "user", "content": user_input}) # Generate response based on available capabilities if st.session_state.chatbot == "simple" or st.session_state.chatbot is None: # Use simple analysis with st.spinner("🤔 Analyzing..."): response = simple_chat_analysis(user_input, st.session_state.extracted_data) st.session_state.chat_history.append({"role": "assistant", "content": response}) st.rerun() else: # Use AI chatbot with st.spinner("🤔 AI is analyzing..."): try: response = st.session_state.chatbot.invoke({"question": user_input}) answer = response.get("answer", "I couldn't generate a response based on the available data.") st.session_state.chat_history.append({"role": "assistant", "content": answer}) st.rerun() except Exception as e: error_msg = f"❌ AI Error: {str(e)}. Using simple analysis." simple_response = simple_chat_analysis(user_input, st.session_state.extracted_data) st.session_state.chat_history.append({"role": "assistant", "content": f"{error_msg}\n\n{simple_response}"}) st.rerun() # Suggested questions if len(st.session_state.chat_history) == 0: st.markdown("#### 💡 Try asking:") suggestions = [ "Summarize the main information", "What are the key highlights?", "Analyze the professional focus", "What insights can you extract?", "Tell me about the experience" ] for suggestion in suggestions: if st.button(suggestion, key=f"suggest_{suggestion}", use_container_width=True): st.info(f"💡 Type in chat: '{suggestion}'") elif st.session_state.processing: st.info("🔄 Extracting and processing LinkedIn data...") else: st.info("🔍 Extract LinkedIn data to enable analysis") # Features section st.markdown("---") st.markdown("### 🚀 Analysis Features") feature_cols = st.columns(3) with feature_cols[0]: st.markdown(""" **📊 Content Extraction** - LinkedIn data scraping - Text processing - Content analysis """) with feature_cols[1]: st.markdown(""" **💬 Smart Chat** - Interactive conversation - Data-driven responses - Context awareness """) with feature_cols[2]: st.markdown(""" **🔍 Insights** - Content summarization - Pattern recognition - Professional analysis """) if __name__ == "__main__": main()