Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI Web Scraper with AI Integration for Hugging Face Spaces | |
| """ | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import threading | |
| import time | |
| import os | |
| from datetime import datetime | |
| import asyncio | |
| from playwright.async_api import async_playwright | |
| from PIL import Image | |
| import pytesseract | |
| import io | |
| from typing import Dict, Optional | |
| import logging | |
| import random | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Enhanced Web Scraper API", | |
| description="Scrapes PSX DPS, Investing.com metals, and Zameen.com with Gemini AI integration", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Request/Response models | |
| class QueryRequest(BaseModel): | |
| question: str | |
| api_key: Optional[str] = None | |
| class QueryResponse(BaseModel): | |
| answer: str | |
| status: str | |
| timestamp: str | |
| class StatusResponse(BaseModel): | |
| scraping_active: bool | |
| last_update: Dict[str, str] | |
| files_status: Dict[str, bool] | |
| # Global scraper instance | |
| scraper = None | |
| # Ensure data directory exists | |
| DATA_DIR = "/app/data" | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| class EnhancedScraper: | |
| def __init__(self, gemini_api_key=None): | |
| self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE") | |
| self.scraped_files = { | |
| "psx": os.path.join(DATA_DIR, "psx_dps_market_data.md"), | |
| "investing": os.path.join(DATA_DIR, "investing_metals.md"), | |
| "zameen": os.path.join(DATA_DIR, "zameen_cleaned_text.md") | |
| } | |
| self.running = False | |
| self.scraping_thread = None | |
| self.lock = threading.Lock() | |
| self.last_update = {} | |
| # Zameen.com URLs for multiple cities | |
| self.zameen_urls = { | |
| "Lahore": "https://www.zameen.com/Homes/Lahore-1-1.html", | |
| "Islamabad": "https://www.zameen.com/Homes/Islamabad-3-1.html", | |
| "Karachi": "https://www.zameen.com/Homes/Karachi-2-1.html" | |
| } | |
| # User agents for rotation | |
| self.user_agents = [ | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| ] | |
| def get_random_headers(self): | |
| """Get randomized headers to avoid blocking""" | |
| return { | |
| 'User-Agent': random.choice(self.user_agents), | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'Cache-Control': 'max-age=0', | |
| 'Referer': 'https://www.google.com/' | |
| } | |
| async def extract_psx_with_ocr(self, url): | |
| """Extract text from PSX webpage using Playwright + OCR""" | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch( | |
| headless=True, | |
| args=[ | |
| "--no-sandbox", | |
| "--disable-setuid-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--disable-blink-features=AutomationControlled" | |
| ] | |
| ) | |
| context = await browser.new_context( | |
| viewport={"width": 1920, "height": 1080}, | |
| user_agent=random.choice(self.user_agents) | |
| ) | |
| page = await context.new_page() | |
| # Add stealth mode | |
| await page.add_init_script(""" | |
| Object.defineProperty(navigator, 'webdriver', { | |
| get: () => false | |
| }); | |
| """) | |
| await page.goto(url, wait_until="networkidle", timeout=30000) | |
| await page.wait_for_timeout(3000) # Wait for dynamic content | |
| # Remove modal | |
| await page.evaluate(""" | |
| const modal = document.querySelector('.alerts__modal__wrapper'); | |
| if (modal) modal.remove(); | |
| """) | |
| # Take screenshot | |
| screenshot_bytes = await page.screenshot(full_page=True) | |
| await browser.close() | |
| # Convert to PIL Image and perform OCR | |
| image = Image.open(io.BytesIO(screenshot_bytes)) | |
| text = pytesseract.image_to_string(image) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Error in PSX OCR extraction: {e}") | |
| # Fallback to simple requests | |
| try: | |
| response = requests.get(url, headers=self.get_random_headers(), timeout=15) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| return soup.get_text(separator='\n', strip=True) | |
| except: | |
| return None | |
| def scrape_psx_sync(self): | |
| """Synchronous wrapper for PSX scraping""" | |
| try: | |
| url = "https://dps.psx.com.pk/" | |
| # Create new event loop for thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| extracted_text = loop.run_until_complete(self.extract_psx_with_ocr(url)) | |
| finally: | |
| loop.close() | |
| if not extracted_text: | |
| logger.error("Failed to extract PSX data") | |
| return | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| markdown_content = f"""# PSX DPS Market Data | |
| **Source URL:** {url} | |
| **Last Updated:** {timestamp} | |
| **Extraction Method:** OCR/Web Scraping | |
| --- | |
| ## Market Data Content | |
| {extracted_text[:5000]}... # Limiting to first 5000 chars | |
| """ | |
| with self.lock: | |
| try: | |
| with open(self.scraped_files["psx"], "w", encoding="utf-8") as f: | |
| f.write(markdown_content) | |
| self.last_update["psx"] = timestamp | |
| logger.info(f"Updated: PSX DPS data at {timestamp}") | |
| except Exception as e: | |
| logger.error(f"Error writing PSX file: {e}") | |
| except Exception as e: | |
| logger.error(f"Error scraping PSX DPS: {e}") | |
| def scrape_investing(self): | |
| """Scrape table data from Investing.com metals page""" | |
| try: | |
| # Try different approaches | |
| urls = [ | |
| "https://www.investing.com/commodities/metals", | |
| "https://www.investing.com/commodities/gold-historical-data", | |
| "https://www.investing.com/commodities/silver-historical-data" | |
| ] | |
| all_data = [] | |
| for url in urls: | |
| try: | |
| headers = self.get_random_headers() | |
| headers['Host'] = 'www.investing.com' | |
| # Add delay to avoid rate limiting | |
| time.sleep(random.uniform(2, 4)) | |
| response = requests.get(url, headers=headers, timeout=15) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract any text content | |
| text_content = soup.get_text(separator='\n', strip=True) | |
| all_data.append(f"\n### Data from {url}\n{text_content[:1000]}") | |
| else: | |
| logger.warning(f"Got status {response.status_code} for {url}") | |
| except Exception as e: | |
| logger.error(f"Error fetching {url}: {e}") | |
| continue | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| if all_data: | |
| markdown = f"# Investing.com Metals Data\n" | |
| markdown += f"**Last Updated:** {timestamp}\n\n" | |
| markdown += "---\n\n" | |
| markdown += "\n".join(all_data) | |
| else: | |
| # Fallback with dummy data | |
| markdown = f"""# Investing.com Metals Data | |
| **Last Updated:** {timestamp} | |
| --- | |
| ## Market Overview (Sample Data) | |
| Due to access restrictions, showing sample commodity prices: | |
| | Commodity | Price | Change | % Change | | |
| |-----------|-------|--------|----------| | |
| | Gold | $2,045.30 | +12.50 | +0.61% | | |
| | Silver | $24.85 | +0.32 | +1.30% | | |
| | Platinum | $965.00 | -5.00 | -0.52% | | |
| | Palladium | $1,105.00 | +15.00 | +1.38% | | |
| *Note: Live data scraping may be restricted. Please check the source directly.* | |
| """ | |
| with self.lock: | |
| try: | |
| with open(self.scraped_files["investing"], "w", encoding="utf-8") as f: | |
| f.write(markdown) | |
| self.last_update["investing"] = timestamp | |
| logger.info(f"Updated: Investing.com data at {timestamp}") | |
| except Exception as e: | |
| logger.error(f"Error writing Investing file: {e}") | |
| except Exception as e: | |
| logger.error(f"Error scraping Investing.com: {e}") | |
| def clean_html_text(self, city, url): | |
| """Clean HTML text for a specific city""" | |
| try: | |
| headers = self.get_random_headers() | |
| response = requests.get(url, headers=headers, timeout=15) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to fetch {url}: Status {response.status_code}") | |
| return f"## 🏙️ {city}\nError: Unable to fetch data (Status: {response.status_code})\n" | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| for tag in soup(["script", "style", "noscript"]): | |
| tag.decompose() | |
| # Extract property listings | |
| properties = [] | |
| property_cards = soup.find_all(['div', 'article'], class_=lambda x: x and any(term in str(x).lower() for term in ['property', 'listing', 'card'])) | |
| for card in property_cards[:10]: # Limit to first 10 properties | |
| text = card.get_text(separator=' | ', strip=True) | |
| if len(text) > 50: # Filter out empty cards | |
| properties.append(text) | |
| if properties: | |
| content = f"## 🏙️ {city}\n\n### Properties Found:\n\n" | |
| for i, prop in enumerate(properties, 1): | |
| content += f"{i}. {prop[:200]}...\n\n" | |
| else: | |
| # Fallback to general text | |
| clean_text = soup.get_text(separator='\n', strip=True) | |
| content = f"## 🏙️ {city}\n\n{clean_text[:2000]}...\n" | |
| return content | |
| except Exception as e: | |
| logger.error(f"Error scraping {city}: {e}") | |
| return f"## 🏙️ {city}\nError fetching data: {e}\n" | |
| def scrape_zameen(self): | |
| """Scrape Zameen.com data from multiple cities""" | |
| try: | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| all_content = [f"# 🏠 Zameen.com Real Estate Data\n\n**Last Updated:** {timestamp}\n\n"] | |
| for city, url in self.zameen_urls.items(): | |
| logger.info(f"Scraping Zameen.com for {city}...") | |
| content = self.clean_html_text(city, url) | |
| all_content.append(content) | |
| all_content.append("\n---\n") | |
| time.sleep(random.uniform(3, 5)) # Random delay | |
| with self.lock: | |
| try: | |
| with open(self.scraped_files["zameen"], "w", encoding="utf-8") as f: | |
| f.write("\n".join(all_content)) | |
| self.last_update["zameen"] = timestamp | |
| logger.info(f"Updated: Zameen.com data at {timestamp}") | |
| except Exception as e: | |
| logger.error(f"Error writing Zameen file: {e}") | |
| except Exception as e: | |
| logger.error(f"Error scraping Zameen.com: {e}") | |
| def run_single_scrape_cycle(self): | |
| """Run all scrapers in parallel for one cycle""" | |
| threads = [ | |
| threading.Thread(target=self.scrape_psx_sync), | |
| threading.Thread(target=self.scrape_investing), | |
| threading.Thread(target=self.scrape_zameen), | |
| ] | |
| for t in threads: | |
| t.start() | |
| for t in threads: | |
| t.join() | |
| def continuous_scraping_worker(self): | |
| """Background worker that scrapes data every 5 minutes""" | |
| logger.info("Background scraping started (every 5 minutes)") | |
| # Initial scrape | |
| self.run_single_scrape_cycle() | |
| while self.running: | |
| try: | |
| # Wait 5 minutes | |
| for _ in range(300): | |
| if not self.running: | |
| break | |
| time.sleep(1) | |
| if self.running: | |
| logger.info("Running scheduled scrape...") | |
| self.run_single_scrape_cycle() | |
| except Exception as e: | |
| logger.error(f"Error in background scraping: {e}") | |
| time.sleep(60) | |
| def start_background_scraping(self): | |
| """Start continuous scraping in background thread""" | |
| if not self.running: | |
| self.running = True | |
| self.scraping_thread = threading.Thread(target=self.continuous_scraping_worker, daemon=True) | |
| self.scraping_thread.start() | |
| return True | |
| return False | |
| def stop_background_scraping(self): | |
| """Stop background scraping""" | |
| self.running = False | |
| if self.scraping_thread and self.scraping_thread.is_alive(): | |
| self.scraping_thread.join(timeout=2) | |
| def read_scraped_content(self): | |
| """Read all scraped files and combine their content""" | |
| combined_content = "" | |
| with self.lock: | |
| for source, filename in self.scraped_files.items(): | |
| if os.path.exists(filename): | |
| try: | |
| with open(filename, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| combined_content += f"\n\n--- {source.upper()} DATA ---\n" | |
| combined_content += content | |
| except Exception as e: | |
| logger.error(f"Error reading {filename}: {e}") | |
| else: | |
| combined_content += f"\n\n--- {source.upper()} DATA ---\n" | |
| combined_content += f"Data file not found. Scraping in progress...\n" | |
| return combined_content | |
| def query_gemini(self, question, api_key=None): | |
| """Query Gemini API with scraped data context""" | |
| key = api_key or self.gemini_api_key | |
| if not key or key == 'YOUR_API_KEY_HERE': | |
| return "❌ Please provide a valid Gemini API key" | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent?key={key}" | |
| scraped_content = self.read_scraped_content() | |
| if scraped_content.strip(): | |
| prompt = f"""Based on the following real-time scraped data from PSX DPS (Pakistan Stock Exchange Data Portal), Investing.com metals, and Zameen.com real estate (Lahore, Islamabad, Karachi), please answer this question: {question} | |
| LIVE DATA: | |
| {scraped_content} | |
| Please provide a comprehensive answer based on the most recent available data.""" | |
| else: | |
| prompt = f"{question}\n\n(Note: Scraped data is still being collected, please wait for the first scraping cycle to complete)" | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [ | |
| { | |
| "text": prompt | |
| } | |
| ] | |
| } | |
| ], | |
| "generationConfig": { | |
| "temperature": 0.7, | |
| "maxOutputTokens": 2048 | |
| } | |
| } | |
| headers = {'Content-Type': 'application/json'} | |
| response = requests.post(url, headers=headers, json=payload, timeout=30) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if 'candidates' in result and len(result['candidates']) > 0: | |
| return result['candidates'][0]['content']['parts'][0]['text'] | |
| else: | |
| return "❌ No response generated from Gemini" | |
| else: | |
| return f"❌ Gemini API Error: {response.status_code} - {response.text}" | |
| except Exception as e: | |
| return f"❌ Error querying Gemini: {e}" | |
| # API Endpoints | |
| async def startup_event(): | |
| """Initialize scraper and start background scraping on startup""" | |
| global scraper | |
| logger.info("Starting up FastAPI application...") | |
| # Initialize scraper with API key from environment | |
| api_key = os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE") | |
| scraper = EnhancedScraper(gemini_api_key=api_key) | |
| # Start background scraping | |
| scraper.start_background_scraping() | |
| logger.info("Background scraping initiated") | |
| async def shutdown_event(): | |
| """Stop background scraping on shutdown""" | |
| global scraper | |
| if scraper: | |
| scraper.stop_background_scraping() | |
| logger.info("Background scraping stopped") | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "Enhanced Web Scraper API", | |
| "endpoints": { | |
| "POST /query": "Query scraped data with AI", | |
| "GET /status": "Check scraping status", | |
| "GET /health": "Health check" | |
| }, | |
| "documentation": "/docs" | |
| } | |
| async def query_data(request: QueryRequest): | |
| """ | |
| Query the scraped data using Gemini AI | |
| """ | |
| try: | |
| if not scraper: | |
| raise HTTPException(status_code=500, detail="Scraper not initialized") | |
| # Use provided API key or fall back to configured one | |
| answer = scraper.query_gemini(request.question, request.api_key) | |
| return QueryResponse( | |
| answer=answer, | |
| status="success", | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing query: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_status(): | |
| """ | |
| Get current scraping status and file information | |
| """ | |
| if not scraper: | |
| raise HTTPException(status_code=500, detail="Scraper not initialized") | |
| files_status = {} | |
| for source, filename in scraper.scraped_files.items(): | |
| files_status[source] = os.path.exists(filename) | |
| return StatusResponse( | |
| scraping_active=scraper.running, | |
| last_update=scraper.last_update, | |
| files_status=files_status | |
| ) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} |