Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| π ScribdHarvester - Cookie-Based Document & Image Extraction | |
| ============================================================= | |
| Features: | |
| - Automatically reads cookies from Chrome browser (no login needed!) | |
| - Extracts favorites/saved items from Scribd | |
| - Downloads documents and extracts images for presentations | |
| - Deduplication via MD5 hashing | |
| - Stores metadata in Neo4j AuraDB Cloud | |
| Usage: | |
| pip install -r scribd_requirements.txt | |
| python scribd_harvester.py | |
| @author WidgeTDC Neural Network | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import hashlib | |
| import requests | |
| import re | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Any | |
| from dataclasses import dataclass, asdict | |
| from urllib.parse import urljoin, urlparse | |
| import time | |
| # Neo4j | |
| from neo4j import GraphDatabase | |
| # Cookie extraction | |
| try: | |
| import browser_cookie3 | |
| HAS_BROWSER_COOKIES = True | |
| except ImportError: | |
| HAS_BROWSER_COOKIES = False | |
| print("β οΈ browser_cookie3 not installed. Run: pip install browser_cookie3") | |
| # HTML parsing | |
| from bs4 import BeautifulSoup | |
| # Image processing | |
| try: | |
| from PIL import Image | |
| import io | |
| HAS_PIL = True | |
| except ImportError: | |
| HAS_PIL = False | |
| # PDF handling | |
| try: | |
| import fitz # PyMuPDF | |
| HAS_PYMUPDF = True | |
| except ImportError: | |
| HAS_PYMUPDF = False | |
| class ScribdDocument: | |
| id: str | |
| title: str | |
| author: str | |
| url: str | |
| doc_type: str # book, document, audiobook, sheet_music | |
| thumbnail: str | |
| description: str | |
| content_hash: str | |
| saved_at: str | |
| class ExtractedImage: | |
| id: str | |
| source_doc_id: str | |
| url: str | |
| caption: str | |
| page_number: int | |
| content_hash: str | |
| local_path: str | |
| width: int | |
| height: int | |
| class ScribdHarvester: | |
| """ | |
| Autonomous Scribd harvester using browser cookies | |
| """ | |
| # Neo4j AuraDB Cloud credentials | |
| NEO4J_URI = "neo4j+s://054eff27.databases.neo4j.io" | |
| NEO4J_USER = "neo4j" | |
| NEO4J_PASSWORD = "Qrt37mkb0xBZ7_ts5tG1J70K2mVDGPMF2L7Njlm7cg8" | |
| # Scribd URLs | |
| SCRIBD_BASE = "https://www.scribd.com" | |
| SCRIBD_SAVED_URL = "https://www.scribd.com/saved" | |
| SCRIBD_LIBRARY_URL = "https://www.scribd.com/library" | |
| # Headers to mimic browser | |
| HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| def __init__(self, output_dir: str = None): | |
| self.output_dir = Path(output_dir or "data/scribd_harvest") | |
| self.image_dir = self.output_dir / "images" | |
| self.docs_dir = self.output_dir / "documents" | |
| self.cookies_file = self.output_dir / "scribd_cookies.json" | |
| # Create directories | |
| for d in [self.output_dir, self.image_dir, self.docs_dir]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| # Initialize session | |
| self.session = requests.Session() | |
| self.session.headers.update(self.HEADERS) | |
| # Initialize Neo4j | |
| self.driver = GraphDatabase.driver( | |
| self.NEO4J_URI, | |
| auth=(self.NEO4J_USER, self.NEO4J_PASSWORD) | |
| ) | |
| # Stats | |
| self.stats = { | |
| "documents_found": 0, | |
| "documents_saved": 0, | |
| "documents_skipped": 0, | |
| "images_extracted": 0, | |
| "images_saved": 0 | |
| } | |
| print("π [ScribdHarvester] Initialized") | |
| print(f" Output: {self.output_dir.absolute()}") | |
| def generate_hash(self, content: str) -> str: | |
| """Generate MD5 hash for deduplication""" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def load_cookies_from_browser(self) -> bool: | |
| """ | |
| Load cookies directly from Chrome browser | |
| This works because you're already logged in via Google | |
| """ | |
| if not HAS_BROWSER_COOKIES: | |
| print("β browser_cookie3 not available") | |
| return False | |
| try: | |
| print("πͺ Loading cookies from Chrome browser...") | |
| # Try Chrome first | |
| try: | |
| cj = browser_cookie3.chrome(domain_name=".scribd.com") | |
| cookies_found = 0 | |
| for cookie in cj: | |
| self.session.cookies.set(cookie.name, cookie.value, domain=cookie.domain) | |
| cookies_found += 1 | |
| if cookies_found > 0: | |
| print(f" β Loaded {cookies_found} cookies from Chrome") | |
| self._save_cookies_to_file() | |
| return True | |
| except Exception as e: | |
| print(f" β οΈ Chrome cookies failed: {e}") | |
| # Try Edge as fallback | |
| try: | |
| cj = browser_cookie3.edge(domain_name=".scribd.com") | |
| cookies_found = 0 | |
| for cookie in cj: | |
| self.session.cookies.set(cookie.name, cookie.value, domain=cookie.domain) | |
| cookies_found += 1 | |
| if cookies_found > 0: | |
| print(f" β Loaded {cookies_found} cookies from Edge") | |
| self._save_cookies_to_file() | |
| return True | |
| except Exception as e: | |
| print(f" β οΈ Edge cookies failed: {e}") | |
| # Try Firefox | |
| try: | |
| cj = browser_cookie3.firefox(domain_name=".scribd.com") | |
| cookies_found = 0 | |
| for cookie in cj: | |
| self.session.cookies.set(cookie.name, cookie.value, domain=cookie.domain) | |
| cookies_found += 1 | |
| if cookies_found > 0: | |
| print(f" β Loaded {cookies_found} cookies from Firefox") | |
| self._save_cookies_to_file() | |
| return True | |
| except Exception as e: | |
| print(f" β οΈ Firefox cookies failed: {e}") | |
| print("β No browser cookies found. Please login to Scribd in your browser first.") | |
| return False | |
| except Exception as e: | |
| print(f"β Failed to load browser cookies: {e}") | |
| return False | |
| def _save_cookies_to_file(self): | |
| """Save cookies for future use""" | |
| cookies_dict = dict(self.session.cookies) | |
| with open(self.cookies_file, 'w') as f: | |
| json.dump(cookies_dict, f, indent=2) | |
| print(f" πΎ Cookies saved to {self.cookies_file}") | |
| def load_cookies_from_file(self) -> bool: | |
| """Load previously saved cookies""" | |
| if not self.cookies_file.exists(): | |
| return False | |
| try: | |
| with open(self.cookies_file, 'r') as f: | |
| cookies = json.load(f) | |
| for name, value in cookies.items(): | |
| self.session.cookies.set(name, value) | |
| print(f"πͺ Loaded {len(cookies)} cookies from file") | |
| return True | |
| except Exception as e: | |
| print(f"β οΈ Failed to load cookies from file: {e}") | |
| return False | |
| def verify_login(self) -> bool: | |
| """Verify we're logged into Scribd""" | |
| try: | |
| response = self.session.get(self.SCRIBD_SAVED_URL, allow_redirects=False) | |
| # If redirected to login, we're not authenticated | |
| if response.status_code in [301, 302, 303]: | |
| location = response.headers.get('Location', '') | |
| if 'login' in location.lower(): | |
| print("β Not logged in - redirected to login page") | |
| return False | |
| # Check if we can see the saved page | |
| if response.status_code == 200: | |
| if 'saved' in response.text.lower() or 'library' in response.text.lower(): | |
| print("β Successfully authenticated with Scribd!") | |
| return True | |
| print(f"β οΈ Unexpected response: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| print(f"β Login verification failed: {e}") | |
| return False | |
| def fetch_saved_items(self) -> List[Dict]: | |
| """Fetch saved/favorite items from Scribd""" | |
| print("\nπ Fetching saved items from Scribd...") | |
| all_items = [] | |
| # Try multiple endpoints | |
| endpoints = [ | |
| self.SCRIBD_SAVED_URL, | |
| self.SCRIBD_LIBRARY_URL, | |
| f"{self.SCRIBD_BASE}/account/saved", | |
| f"{self.SCRIBD_BASE}/your-library", | |
| ] | |
| for endpoint in endpoints: | |
| try: | |
| print(f" Trying: {endpoint}") | |
| response = self.session.get(endpoint) | |
| if response.status_code != 200: | |
| continue | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Find document links - multiple patterns | |
| patterns = [ | |
| ('a[href*="/document/"]', 'document'), | |
| ('a[href*="/book/"]', 'book'), | |
| ('a[href*="/read/"]', 'book'), | |
| ('a[href*="/audiobook/"]', 'audiobook'), | |
| ('[data-object-type]', 'mixed'), | |
| ] | |
| for selector, doc_type in patterns: | |
| elements = soup.select(selector) | |
| for el in elements: | |
| href = el.get('href', '') | |
| if not href or '/login' in href: | |
| continue | |
| # Build full URL | |
| if not href.startswith('http'): | |
| href = urljoin(self.SCRIBD_BASE, href) | |
| # Extract info | |
| item = { | |
| 'url': href, | |
| 'title': el.get_text(strip=True) or el.get('title', 'Unknown'), | |
| 'type': doc_type if doc_type != 'mixed' else self._detect_type(href), | |
| } | |
| # Find thumbnail | |
| img = el.find('img') | |
| if img: | |
| item['thumbnail'] = img.get('src', '') | |
| # Avoid duplicates | |
| if not any(i['url'] == item['url'] for i in all_items): | |
| all_items.append(item) | |
| # Also try JSON data embedded in page | |
| scripts = soup.find_all('script', type='application/json') | |
| for script in scripts: | |
| try: | |
| data = json.loads(script.string) | |
| if isinstance(data, dict): | |
| items = self._extract_items_from_json(data) | |
| for item in items: | |
| if not any(i['url'] == item['url'] for i in all_items): | |
| all_items.append(item) | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f" β οΈ Error fetching {endpoint}: {e}") | |
| print(f" π Found {len(all_items)} saved items") | |
| self.stats["documents_found"] = len(all_items) | |
| return all_items | |
| def _detect_type(self, url: str) -> str: | |
| """Detect document type from URL""" | |
| if '/book/' in url or '/read/' in url: | |
| return 'book' | |
| elif '/audiobook/' in url: | |
| return 'audiobook' | |
| elif '/sheet_music/' in url: | |
| return 'sheet_music' | |
| return 'document' | |
| def _extract_items_from_json(self, data: Dict) -> List[Dict]: | |
| """Extract document items from JSON data""" | |
| items = [] | |
| def traverse(obj, depth=0): | |
| if depth > 10: # Prevent infinite recursion | |
| return | |
| if isinstance(obj, dict): | |
| # Check if this looks like a document | |
| if 'document_id' in obj or 'book_id' in obj: | |
| doc_id = obj.get('document_id') or obj.get('book_id') | |
| title = obj.get('title', 'Unknown') | |
| doc_type = 'book' if 'book_id' in obj else 'document' | |
| items.append({ | |
| 'url': f"{self.SCRIBD_BASE}/{doc_type}/{doc_id}", | |
| 'title': title, | |
| 'type': doc_type, | |
| 'thumbnail': obj.get('thumbnail_url', obj.get('cover_url', '')), | |
| }) | |
| for v in obj.values(): | |
| traverse(v, depth + 1) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| traverse(item, depth + 1) | |
| traverse(data) | |
| return items | |
| def document_exists_in_neo4j(self, content_hash: str) -> bool: | |
| """Check if document already exists""" | |
| with self.driver.session() as session: | |
| result = session.run( | |
| "MATCH (d:ScribdDocument {contentHash: $hash}) RETURN d LIMIT 1", | |
| hash=content_hash | |
| ) | |
| return len(list(result)) > 0 | |
| def save_document_to_neo4j(self, doc: ScribdDocument) -> bool: | |
| """Save document to Neo4j with deduplication""" | |
| if self.document_exists_in_neo4j(doc.content_hash): | |
| print(f" βοΈ Skipping duplicate: {doc.title[:50]}...") | |
| self.stats["documents_skipped"] += 1 | |
| return False | |
| with self.driver.session() as session: | |
| session.run(""" | |
| MERGE (d:ScribdDocument {id: $id}) | |
| SET d.title = $title, | |
| d.author = $author, | |
| d.url = $url, | |
| d.type = $doc_type, | |
| d.thumbnail = $thumbnail, | |
| d.description = $description, | |
| d.contentHash = $content_hash, | |
| d.savedAt = datetime(), | |
| d.source = 'Scribd', | |
| d.harvestedBy = 'ScribdHarvester' | |
| MERGE (s:DataSource {name: 'Scribd'}) | |
| SET s.type = 'DocumentPlatform', | |
| s.lastHarvest = datetime() | |
| MERGE (d)-[:HARVESTED_FROM]->(s) | |
| WITH d | |
| MERGE (cat:Category {name: $doc_type}) | |
| MERGE (d)-[:BELONGS_TO]->(cat) | |
| """, | |
| id=doc.id, | |
| title=doc.title, | |
| author=doc.author, | |
| url=doc.url, | |
| doc_type=doc.doc_type, | |
| thumbnail=doc.thumbnail, | |
| description=doc.description, | |
| content_hash=doc.content_hash | |
| ) | |
| print(f" β Saved: {doc.title[:50]}...") | |
| self.stats["documents_saved"] += 1 | |
| return True | |
| def save_image_to_neo4j(self, image: ExtractedImage, doc_title: str) -> bool: | |
| """Save extracted image to Neo4j""" | |
| with self.driver.session() as session: | |
| # Check for duplicate | |
| result = session.run( | |
| "MATCH (i:ScribdImage {contentHash: $hash}) RETURN i LIMIT 1", | |
| hash=image.content_hash | |
| ) | |
| if len(list(result)) > 0: | |
| return False | |
| session.run(""" | |
| MERGE (i:ScribdImage {id: $id}) | |
| SET i.url = $url, | |
| i.caption = $caption, | |
| i.pageNumber = $page_number, | |
| i.contentHash = $content_hash, | |
| i.localPath = $local_path, | |
| i.width = $width, | |
| i.height = $height, | |
| i.savedAt = datetime(), | |
| i.usableForPresentations = true | |
| WITH i | |
| MATCH (d:ScribdDocument {id: $source_doc_id}) | |
| MERGE (i)-[:EXTRACTED_FROM]->(d) | |
| MERGE (cat:AssetCategory {name: 'Presentation Images'}) | |
| MERGE (i)-[:AVAILABLE_FOR]->(cat) | |
| """, | |
| id=image.id, | |
| url=image.url, | |
| caption=image.caption, | |
| page_number=image.page_number, | |
| content_hash=image.content_hash, | |
| local_path=image.local_path, | |
| width=image.width, | |
| height=image.height, | |
| source_doc_id=image.source_doc_id | |
| ) | |
| self.stats["images_saved"] += 1 | |
| return True | |
| def extract_images_from_document(self, doc_url: str, doc_id: str, doc_title: str) -> List[ExtractedImage]: | |
| """Extract images from a Scribd document page""" | |
| images = [] | |
| try: | |
| print(f" πΌοΈ Extracting images from: {doc_title[:40]}...") | |
| response = self.session.get(doc_url) | |
| if response.status_code != 200: | |
| return images | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Find all images | |
| img_elements = soup.find_all('img') | |
| for idx, img in enumerate(img_elements): | |
| src = img.get('src', '') or img.get('data-src', '') | |
| if not src or len(src) < 10: | |
| continue | |
| # Skip small icons, avatars, logos | |
| skip_patterns = ['avatar', 'icon', 'logo', 'button', 'sprite', 'tracking', '1x1'] | |
| if any(p in src.lower() for p in skip_patterns): | |
| continue | |
| # Get dimensions if available | |
| width = int(img.get('width', 0) or 0) | |
| height = int(img.get('height', 0) or 0) | |
| # Skip if too small (likely icons) | |
| if width > 0 and width < 100: | |
| continue | |
| if height > 0 and height < 100: | |
| continue | |
| # Build full URL | |
| if not src.startswith('http'): | |
| src = urljoin(doc_url, src) | |
| # Generate hash | |
| content_hash = self.generate_hash(src) | |
| # Get caption | |
| caption = img.get('alt', '') or img.get('title', '') | |
| figure = img.find_parent('figure') | |
| if figure: | |
| figcaption = figure.find('figcaption') | |
| if figcaption: | |
| caption = figcaption.get_text(strip=True) | |
| # Download image | |
| try: | |
| img_response = self.session.get(src, timeout=30) | |
| if img_response.status_code == 200: | |
| # Determine extension | |
| content_type = img_response.headers.get('content-type', '') | |
| if 'png' in content_type: | |
| ext = 'png' | |
| elif 'gif' in content_type: | |
| ext = 'gif' | |
| elif 'webp' in content_type: | |
| ext = 'webp' | |
| else: | |
| ext = 'jpg' | |
| # Save locally | |
| image_id = f"{doc_id}_img_{idx}" | |
| local_path = self.image_dir / f"{image_id}.{ext}" | |
| with open(local_path, 'wb') as f: | |
| f.write(img_response.content) | |
| # Get actual dimensions | |
| if HAS_PIL: | |
| try: | |
| pil_img = Image.open(io.BytesIO(img_response.content)) | |
| width, height = pil_img.size | |
| except: | |
| pass | |
| # Only save if reasonably sized | |
| if width >= 100 and height >= 100: | |
| image = ExtractedImage( | |
| id=image_id, | |
| source_doc_id=doc_id, | |
| url=src, | |
| caption=caption, | |
| page_number=idx + 1, | |
| content_hash=content_hash, | |
| local_path=str(local_path), | |
| width=width, | |
| height=height | |
| ) | |
| images.append(image) | |
| self.stats["images_extracted"] += 1 | |
| except Exception as e: | |
| pass # Skip failed downloads silently | |
| except Exception as e: | |
| print(f" β οΈ Error extracting images: {e}") | |
| if images: | |
| print(f" Found {len(images)} usable images") | |
| return images | |
| def process_document(self, item: Dict) -> Optional[ScribdDocument]: | |
| """Process a single document item""" | |
| url = item['url'] | |
| # Extract document ID | |
| match = re.search(r'/(document|book|audiobook)/(\d+)', url) | |
| doc_id = match.group(2) if match else self.generate_hash(url)[:12] | |
| # Generate content hash for deduplication | |
| content_hash = self.generate_hash(f"{item['title']}-{url}") | |
| doc = ScribdDocument( | |
| id=doc_id, | |
| title=item.get('title', 'Unknown'), | |
| author=item.get('author', 'Unknown'), | |
| url=url, | |
| doc_type=item.get('type', 'document'), | |
| thumbnail=item.get('thumbnail', ''), | |
| description=item.get('description', ''), | |
| content_hash=content_hash, | |
| saved_at=datetime.now().isoformat() | |
| ) | |
| # Save to Neo4j | |
| if self.save_document_to_neo4j(doc): | |
| # Extract images | |
| images = self.extract_images_from_document(url, doc_id, doc.title) | |
| for img in images: | |
| self.save_image_to_neo4j(img, doc.title) | |
| return doc | |
| return None | |
| def run(self) -> Dict: | |
| """Main harvesting execution""" | |
| print("") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β π SCRIBD HARVESTER - WidgeTDC Neural Intelligence β") | |
| print("β Cookie-based extraction with Neo4j Cloud storage β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("") | |
| # Step 1: Load cookies | |
| print("π STEP 1: Authentication") | |
| # Try saved cookies first | |
| if not self.load_cookies_from_file(): | |
| # Try browser cookies | |
| if not self.load_cookies_from_browser(): | |
| print("") | |
| print("β AUTHENTICATION FAILED") | |
| print(" Please ensure you are logged into Scribd in Chrome browser") | |
| print(" Then run this script again.") | |
| return self.stats | |
| # Verify login | |
| if not self.verify_login(): | |
| print("") | |
| print("β Session verification failed") | |
| print(" Try logging into Scribd in your browser again") | |
| return self.stats | |
| # Step 2: Fetch saved items | |
| print("\nπ₯ STEP 2: Fetching saved items") | |
| items = self.fetch_saved_items() | |
| if not items: | |
| print(" No saved items found. Make sure you have favorites in Scribd.") | |
| return self.stats | |
| # Step 3: Process each item | |
| print(f"\nβοΈ STEP 3: Processing {len(items)} documents") | |
| for i, item in enumerate(items, 1): | |
| print(f"\n[{i}/{len(items)}] {item.get('title', 'Unknown')[:50]}...") | |
| try: | |
| self.process_document(item) | |
| # Be nice to Scribd | |
| time.sleep(1) | |
| except Exception as e: | |
| print(f" β Error: {e}") | |
| # Summary | |
| print("") | |
| print("β" * 60) | |
| print("π HARVEST COMPLETE") | |
| print("β" * 60) | |
| print(f" π Documents found: {self.stats['documents_found']}") | |
| print(f" β Documents saved: {self.stats['documents_saved']}") | |
| print(f" βοΈ Documents skipped: {self.stats['documents_skipped']}") | |
| print(f" πΌοΈ Images extracted: {self.stats['images_extracted']}") | |
| print(f" πΎ Images saved: {self.stats['images_saved']}") | |
| print(f" π Output directory: {self.output_dir.absolute()}") | |
| print("β" * 60) | |
| return self.stats | |
| def close(self): | |
| """Cleanup""" | |
| self.driver.close() | |
| print("π Resources cleaned up") | |
| def main(): | |
| """Entry point""" | |
| harvester = ScribdHarvester() | |
| try: | |
| harvester.run() | |
| finally: | |
| harvester.close() | |
| if __name__ == "__main__": | |
| main() | |