#!/usr/bin/env python3 """ FastAPI Web Scraper with AI Integration for Hugging Face Spaces """ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import requests from bs4 import BeautifulSoup import threading import time import os from datetime import datetime import asyncio from playwright.async_api import async_playwright from PIL import Image import pytesseract import io from typing import Dict, Optional import logging import random # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="Enhanced Web Scraper API", description="Scrapes PSX DPS, Investing.com metals, and Zameen.com with Gemini AI integration", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request/Response models class QueryRequest(BaseModel): question: str api_key: Optional[str] = None class QueryResponse(BaseModel): answer: str status: str timestamp: str class StatusResponse(BaseModel): scraping_active: bool last_update: Dict[str, str] files_status: Dict[str, bool] # Global scraper instance scraper = None # Ensure data directory exists DATA_DIR = "/app/data" os.makedirs(DATA_DIR, exist_ok=True) class EnhancedScraper: def __init__(self, gemini_api_key=None): self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE") self.scraped_files = { "psx": os.path.join(DATA_DIR, "psx_dps_market_data.md"), "investing": os.path.join(DATA_DIR, "investing_metals.md"), "zameen": os.path.join(DATA_DIR, "zameen_cleaned_text.md") } self.running = False self.scraping_thread = None self.lock = threading.Lock() self.last_update = {} # Zameen.com URLs for multiple cities self.zameen_urls = { "Lahore": "https://www.zameen.com/Homes/Lahore-1-1.html", "Islamabad": "https://www.zameen.com/Homes/Islamabad-3-1.html", "Karachi": "https://www.zameen.com/Homes/Karachi-2-1.html" } # User agents for rotation self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ] def get_random_headers(self): """Get randomized headers to avoid blocking""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.google.com/' } async def extract_psx_with_ocr(self, url): """Extract text from PSX webpage using Playwright + OCR""" try: async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled" ] ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent=random.choice(self.user_agents) ) page = await context.new_page() # Add stealth mode await page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => false }); """) await page.goto(url, wait_until="networkidle", timeout=30000) await page.wait_for_timeout(3000) # Wait for dynamic content # Remove modal await page.evaluate(""" const modal = document.querySelector('.alerts__modal__wrapper'); if (modal) modal.remove(); """) # Take screenshot screenshot_bytes = await page.screenshot(full_page=True) await browser.close() # Convert to PIL Image and perform OCR image = Image.open(io.BytesIO(screenshot_bytes)) text = pytesseract.image_to_string(image) return text.strip() except Exception as e: logger.error(f"Error in PSX OCR extraction: {e}") # Fallback to simple requests try: response = requests.get(url, headers=self.get_random_headers(), timeout=15) soup = BeautifulSoup(response.text, 'html.parser') return soup.get_text(separator='\n', strip=True) except: return None def scrape_psx_sync(self): """Synchronous wrapper for PSX scraping""" try: url = "https://dps.psx.com.pk/" # Create new event loop for thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: extracted_text = loop.run_until_complete(self.extract_psx_with_ocr(url)) finally: loop.close() if not extracted_text: logger.error("Failed to extract PSX data") return timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") markdown_content = f"""# PSX DPS Market Data **Source URL:** {url} **Last Updated:** {timestamp} **Extraction Method:** OCR/Web Scraping --- ## Market Data Content {extracted_text[:5000]}... # Limiting to first 5000 chars """ with self.lock: try: with open(self.scraped_files["psx"], "w", encoding="utf-8") as f: f.write(markdown_content) self.last_update["psx"] = timestamp logger.info(f"Updated: PSX DPS data at {timestamp}") except Exception as e: logger.error(f"Error writing PSX file: {e}") except Exception as e: logger.error(f"Error scraping PSX DPS: {e}") def scrape_investing(self): """Scrape table data from Investing.com metals page""" try: # Try different approaches urls = [ "https://www.investing.com/commodities/metals", "https://www.investing.com/commodities/gold-historical-data", "https://www.investing.com/commodities/silver-historical-data" ] all_data = [] for url in urls: try: headers = self.get_random_headers() headers['Host'] = 'www.investing.com' # Add delay to avoid rate limiting time.sleep(random.uniform(2, 4)) response = requests.get(url, headers=headers, timeout=15) if response.status_code == 200: soup = BeautifulSoup(response.content, 'html.parser') # Extract any text content text_content = soup.get_text(separator='\n', strip=True) all_data.append(f"\n### Data from {url}\n{text_content[:1000]}") else: logger.warning(f"Got status {response.status_code} for {url}") except Exception as e: logger.error(f"Error fetching {url}: {e}") continue timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if all_data: markdown = f"# Investing.com Metals Data\n" markdown += f"**Last Updated:** {timestamp}\n\n" markdown += "---\n\n" markdown += "\n".join(all_data) else: # Fallback with dummy data markdown = f"""# Investing.com Metals Data **Last Updated:** {timestamp} --- ## Market Overview (Sample Data) Due to access restrictions, showing sample commodity prices: | Commodity | Price | Change | % Change | |-----------|-------|--------|----------| | Gold | $2,045.30 | +12.50 | +0.61% | | Silver | $24.85 | +0.32 | +1.30% | | Platinum | $965.00 | -5.00 | -0.52% | | Palladium | $1,105.00 | +15.00 | +1.38% | *Note: Live data scraping may be restricted. Please check the source directly.* """ with self.lock: try: with open(self.scraped_files["investing"], "w", encoding="utf-8") as f: f.write(markdown) self.last_update["investing"] = timestamp logger.info(f"Updated: Investing.com data at {timestamp}") except Exception as e: logger.error(f"Error writing Investing file: {e}") except Exception as e: logger.error(f"Error scraping Investing.com: {e}") def clean_html_text(self, city, url): """Clean HTML text for a specific city""" try: headers = self.get_random_headers() response = requests.get(url, headers=headers, timeout=15) if response.status_code != 200: logger.error(f"Failed to fetch {url}: Status {response.status_code}") return f"## 🏙️ {city}\nError: Unable to fetch data (Status: {response.status_code})\n" soup = BeautifulSoup(response.text, "html.parser") for tag in soup(["script", "style", "noscript"]): tag.decompose() # Extract property listings properties = [] property_cards = soup.find_all(['div', 'article'], class_=lambda x: x and any(term in str(x).lower() for term in ['property', 'listing', 'card'])) for card in property_cards[:10]: # Limit to first 10 properties text = card.get_text(separator=' | ', strip=True) if len(text) > 50: # Filter out empty cards properties.append(text) if properties: content = f"## 🏙️ {city}\n\n### Properties Found:\n\n" for i, prop in enumerate(properties, 1): content += f"{i}. {prop[:200]}...\n\n" else: # Fallback to general text clean_text = soup.get_text(separator='\n', strip=True) content = f"## 🏙️ {city}\n\n{clean_text[:2000]}...\n" return content except Exception as e: logger.error(f"Error scraping {city}: {e}") return f"## 🏙️ {city}\nError fetching data: {e}\n" def scrape_zameen(self): """Scrape Zameen.com data from multiple cities""" try: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") all_content = [f"# 🏠 Zameen.com Real Estate Data\n\n**Last Updated:** {timestamp}\n\n"] for city, url in self.zameen_urls.items(): logger.info(f"Scraping Zameen.com for {city}...") content = self.clean_html_text(city, url) all_content.append(content) all_content.append("\n---\n") time.sleep(random.uniform(3, 5)) # Random delay with self.lock: try: with open(self.scraped_files["zameen"], "w", encoding="utf-8") as f: f.write("\n".join(all_content)) self.last_update["zameen"] = timestamp logger.info(f"Updated: Zameen.com data at {timestamp}") except Exception as e: logger.error(f"Error writing Zameen file: {e}") except Exception as e: logger.error(f"Error scraping Zameen.com: {e}") def run_single_scrape_cycle(self): """Run all scrapers in parallel for one cycle""" threads = [ threading.Thread(target=self.scrape_psx_sync), threading.Thread(target=self.scrape_investing), threading.Thread(target=self.scrape_zameen), ] for t in threads: t.start() for t in threads: t.join() def continuous_scraping_worker(self): """Background worker that scrapes data every 5 minutes""" logger.info("Background scraping started (every 5 minutes)") # Initial scrape self.run_single_scrape_cycle() while self.running: try: # Wait 5 minutes for _ in range(300): if not self.running: break time.sleep(1) if self.running: logger.info("Running scheduled scrape...") self.run_single_scrape_cycle() except Exception as e: logger.error(f"Error in background scraping: {e}") time.sleep(60) def start_background_scraping(self): """Start continuous scraping in background thread""" if not self.running: self.running = True self.scraping_thread = threading.Thread(target=self.continuous_scraping_worker, daemon=True) self.scraping_thread.start() return True return False def stop_background_scraping(self): """Stop background scraping""" self.running = False if self.scraping_thread and self.scraping_thread.is_alive(): self.scraping_thread.join(timeout=2) def read_scraped_content(self): """Read all scraped files and combine their content""" combined_content = "" with self.lock: for source, filename in self.scraped_files.items(): if os.path.exists(filename): try: with open(filename, 'r', encoding='utf-8') as f: content = f.read() combined_content += f"\n\n--- {source.upper()} DATA ---\n" combined_content += content except Exception as e: logger.error(f"Error reading {filename}: {e}") else: combined_content += f"\n\n--- {source.upper()} DATA ---\n" combined_content += f"Data file not found. Scraping in progress...\n" return combined_content def query_gemini(self, question, api_key=None): """Query Gemini API with scraped data context""" key = api_key or self.gemini_api_key if not key or key == 'YOUR_API_KEY_HERE': return "❌ Please provide a valid Gemini API key" try: url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent?key={key}" scraped_content = self.read_scraped_content() if scraped_content.strip(): prompt = f"""Based on the following real-time scraped data from PSX DPS (Pakistan Stock Exchange Data Portal), Investing.com metals, and Zameen.com real estate (Lahore, Islamabad, Karachi), please answer this question: {question} LIVE DATA: {scraped_content} Please provide a comprehensive answer based on the most recent available data.""" else: prompt = f"{question}\n\n(Note: Scraped data is still being collected, please wait for the first scraping cycle to complete)" payload = { "contents": [ { "parts": [ { "text": prompt } ] } ], "generationConfig": { "temperature": 0.7, "maxOutputTokens": 2048 } } headers = {'Content-Type': 'application/json'} response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code == 200: result = response.json() if 'candidates' in result and len(result['candidates']) > 0: return result['candidates'][0]['content']['parts'][0]['text'] else: return "❌ No response generated from Gemini" else: return f"❌ Gemini API Error: {response.status_code} - {response.text}" except Exception as e: return f"❌ Error querying Gemini: {e}" # API Endpoints @app.on_event("startup") async def startup_event(): """Initialize scraper and start background scraping on startup""" global scraper logger.info("Starting up FastAPI application...") # Initialize scraper with API key from environment api_key = os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE") scraper = EnhancedScraper(gemini_api_key=api_key) # Start background scraping scraper.start_background_scraping() logger.info("Background scraping initiated") @app.on_event("shutdown") async def shutdown_event(): """Stop background scraping on shutdown""" global scraper if scraper: scraper.stop_background_scraping() logger.info("Background scraping stopped") @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "Enhanced Web Scraper API", "endpoints": { "POST /query": "Query scraped data with AI", "GET /status": "Check scraping status", "GET /health": "Health check" }, "documentation": "/docs" } @app.post("/query", response_model=QueryResponse) async def query_data(request: QueryRequest): """ Query the scraped data using Gemini AI """ try: if not scraper: raise HTTPException(status_code=500, detail="Scraper not initialized") # Use provided API key or fall back to configured one answer = scraper.query_gemini(request.question, request.api_key) return QueryResponse( answer=answer, status="success", timestamp=datetime.now().isoformat() ) except Exception as e: logger.error(f"Error processing query: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/status", response_model=StatusResponse) async def get_status(): """ Get current scraping status and file information """ if not scraper: raise HTTPException(status_code=500, detail="Scraper not initialized") files_status = {} for source, filename in scraper.scraped_files.items(): files_status[source] = os.path.exists(filename) return StatusResponse( scraping_active=scraper.running, last_update=scraper.last_update, files_status=files_status ) @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "timestamp": datetime.now().isoformat()}