backend / main.py
HunzalaRasheed1's picture
Update main.py
142b2f3 verified
#!/usr/bin/env python3
"""
FastAPI Web Scraper with AI Integration for Hugging Face Spaces
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
from bs4 import BeautifulSoup
import threading
import time
import os
from datetime import datetime
import asyncio
from playwright.async_api import async_playwright
from PIL import Image
import pytesseract
import io
from typing import Dict, Optional
import logging
import random
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Enhanced Web Scraper API",
description="Scrapes PSX DPS, Investing.com metals, and Zameen.com with Gemini AI integration",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request/Response models
class QueryRequest(BaseModel):
question: str
api_key: Optional[str] = None
class QueryResponse(BaseModel):
answer: str
status: str
timestamp: str
class StatusResponse(BaseModel):
scraping_active: bool
last_update: Dict[str, str]
files_status: Dict[str, bool]
# Global scraper instance
scraper = None
# Ensure data directory exists
DATA_DIR = "/app/data"
os.makedirs(DATA_DIR, exist_ok=True)
class EnhancedScraper:
def __init__(self, gemini_api_key=None):
self.gemini_api_key = gemini_api_key or os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE")
self.scraped_files = {
"psx": os.path.join(DATA_DIR, "psx_dps_market_data.md"),
"investing": os.path.join(DATA_DIR, "investing_metals.md"),
"zameen": os.path.join(DATA_DIR, "zameen_cleaned_text.md")
}
self.running = False
self.scraping_thread = None
self.lock = threading.Lock()
self.last_update = {}
# Zameen.com URLs for multiple cities
self.zameen_urls = {
"Lahore": "https://www.zameen.com/Homes/Lahore-1-1.html",
"Islamabad": "https://www.zameen.com/Homes/Islamabad-3-1.html",
"Karachi": "https://www.zameen.com/Homes/Karachi-2-1.html"
}
# User agents for rotation
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
]
def get_random_headers(self):
"""Get randomized headers to avoid blocking"""
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.google.com/'
}
async def extract_psx_with_ocr(self, url):
"""Extract text from PSX webpage using Playwright + OCR"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled"
]
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=random.choice(self.user_agents)
)
page = await context.new_page()
# Add stealth mode
await page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => false
});
""")
await page.goto(url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(3000) # Wait for dynamic content
# Remove modal
await page.evaluate("""
const modal = document.querySelector('.alerts__modal__wrapper');
if (modal) modal.remove();
""")
# Take screenshot
screenshot_bytes = await page.screenshot(full_page=True)
await browser.close()
# Convert to PIL Image and perform OCR
image = Image.open(io.BytesIO(screenshot_bytes))
text = pytesseract.image_to_string(image)
return text.strip()
except Exception as e:
logger.error(f"Error in PSX OCR extraction: {e}")
# Fallback to simple requests
try:
response = requests.get(url, headers=self.get_random_headers(), timeout=15)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.get_text(separator='\n', strip=True)
except:
return None
def scrape_psx_sync(self):
"""Synchronous wrapper for PSX scraping"""
try:
url = "https://dps.psx.com.pk/"
# Create new event loop for thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
extracted_text = loop.run_until_complete(self.extract_psx_with_ocr(url))
finally:
loop.close()
if not extracted_text:
logger.error("Failed to extract PSX data")
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
markdown_content = f"""# PSX DPS Market Data
**Source URL:** {url}
**Last Updated:** {timestamp}
**Extraction Method:** OCR/Web Scraping
---
## Market Data Content
{extracted_text[:5000]}... # Limiting to first 5000 chars
"""
with self.lock:
try:
with open(self.scraped_files["psx"], "w", encoding="utf-8") as f:
f.write(markdown_content)
self.last_update["psx"] = timestamp
logger.info(f"Updated: PSX DPS data at {timestamp}")
except Exception as e:
logger.error(f"Error writing PSX file: {e}")
except Exception as e:
logger.error(f"Error scraping PSX DPS: {e}")
def scrape_investing(self):
"""Scrape table data from Investing.com metals page"""
try:
# Try different approaches
urls = [
"https://www.investing.com/commodities/metals",
"https://www.investing.com/commodities/gold-historical-data",
"https://www.investing.com/commodities/silver-historical-data"
]
all_data = []
for url in urls:
try:
headers = self.get_random_headers()
headers['Host'] = 'www.investing.com'
# Add delay to avoid rate limiting
time.sleep(random.uniform(2, 4))
response = requests.get(url, headers=headers, timeout=15)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
# Extract any text content
text_content = soup.get_text(separator='\n', strip=True)
all_data.append(f"\n### Data from {url}\n{text_content[:1000]}")
else:
logger.warning(f"Got status {response.status_code} for {url}")
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
continue
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if all_data:
markdown = f"# Investing.com Metals Data\n"
markdown += f"**Last Updated:** {timestamp}\n\n"
markdown += "---\n\n"
markdown += "\n".join(all_data)
else:
# Fallback with dummy data
markdown = f"""# Investing.com Metals Data
**Last Updated:** {timestamp}
---
## Market Overview (Sample Data)
Due to access restrictions, showing sample commodity prices:
| Commodity | Price | Change | % Change |
|-----------|-------|--------|----------|
| Gold | $2,045.30 | +12.50 | +0.61% |
| Silver | $24.85 | +0.32 | +1.30% |
| Platinum | $965.00 | -5.00 | -0.52% |
| Palladium | $1,105.00 | +15.00 | +1.38% |
*Note: Live data scraping may be restricted. Please check the source directly.*
"""
with self.lock:
try:
with open(self.scraped_files["investing"], "w", encoding="utf-8") as f:
f.write(markdown)
self.last_update["investing"] = timestamp
logger.info(f"Updated: Investing.com data at {timestamp}")
except Exception as e:
logger.error(f"Error writing Investing file: {e}")
except Exception as e:
logger.error(f"Error scraping Investing.com: {e}")
def clean_html_text(self, city, url):
"""Clean HTML text for a specific city"""
try:
headers = self.get_random_headers()
response = requests.get(url, headers=headers, timeout=15)
if response.status_code != 200:
logger.error(f"Failed to fetch {url}: Status {response.status_code}")
return f"## 🏙️ {city}\nError: Unable to fetch data (Status: {response.status_code})\n"
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
# Extract property listings
properties = []
property_cards = soup.find_all(['div', 'article'], class_=lambda x: x and any(term in str(x).lower() for term in ['property', 'listing', 'card']))
for card in property_cards[:10]: # Limit to first 10 properties
text = card.get_text(separator=' | ', strip=True)
if len(text) > 50: # Filter out empty cards
properties.append(text)
if properties:
content = f"## 🏙️ {city}\n\n### Properties Found:\n\n"
for i, prop in enumerate(properties, 1):
content += f"{i}. {prop[:200]}...\n\n"
else:
# Fallback to general text
clean_text = soup.get_text(separator='\n', strip=True)
content = f"## 🏙️ {city}\n\n{clean_text[:2000]}...\n"
return content
except Exception as e:
logger.error(f"Error scraping {city}: {e}")
return f"## 🏙️ {city}\nError fetching data: {e}\n"
def scrape_zameen(self):
"""Scrape Zameen.com data from multiple cities"""
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
all_content = [f"# 🏠 Zameen.com Real Estate Data\n\n**Last Updated:** {timestamp}\n\n"]
for city, url in self.zameen_urls.items():
logger.info(f"Scraping Zameen.com for {city}...")
content = self.clean_html_text(city, url)
all_content.append(content)
all_content.append("\n---\n")
time.sleep(random.uniform(3, 5)) # Random delay
with self.lock:
try:
with open(self.scraped_files["zameen"], "w", encoding="utf-8") as f:
f.write("\n".join(all_content))
self.last_update["zameen"] = timestamp
logger.info(f"Updated: Zameen.com data at {timestamp}")
except Exception as e:
logger.error(f"Error writing Zameen file: {e}")
except Exception as e:
logger.error(f"Error scraping Zameen.com: {e}")
def run_single_scrape_cycle(self):
"""Run all scrapers in parallel for one cycle"""
threads = [
threading.Thread(target=self.scrape_psx_sync),
threading.Thread(target=self.scrape_investing),
threading.Thread(target=self.scrape_zameen),
]
for t in threads:
t.start()
for t in threads:
t.join()
def continuous_scraping_worker(self):
"""Background worker that scrapes data every 5 minutes"""
logger.info("Background scraping started (every 5 minutes)")
# Initial scrape
self.run_single_scrape_cycle()
while self.running:
try:
# Wait 5 minutes
for _ in range(300):
if not self.running:
break
time.sleep(1)
if self.running:
logger.info("Running scheduled scrape...")
self.run_single_scrape_cycle()
except Exception as e:
logger.error(f"Error in background scraping: {e}")
time.sleep(60)
def start_background_scraping(self):
"""Start continuous scraping in background thread"""
if not self.running:
self.running = True
self.scraping_thread = threading.Thread(target=self.continuous_scraping_worker, daemon=True)
self.scraping_thread.start()
return True
return False
def stop_background_scraping(self):
"""Stop background scraping"""
self.running = False
if self.scraping_thread and self.scraping_thread.is_alive():
self.scraping_thread.join(timeout=2)
def read_scraped_content(self):
"""Read all scraped files and combine their content"""
combined_content = ""
with self.lock:
for source, filename in self.scraped_files.items():
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
combined_content += f"\n\n--- {source.upper()} DATA ---\n"
combined_content += content
except Exception as e:
logger.error(f"Error reading {filename}: {e}")
else:
combined_content += f"\n\n--- {source.upper()} DATA ---\n"
combined_content += f"Data file not found. Scraping in progress...\n"
return combined_content
def query_gemini(self, question, api_key=None):
"""Query Gemini API with scraped data context"""
key = api_key or self.gemini_api_key
if not key or key == 'YOUR_API_KEY_HERE':
return "❌ Please provide a valid Gemini API key"
try:
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent?key={key}"
scraped_content = self.read_scraped_content()
if scraped_content.strip():
prompt = f"""Based on the following real-time scraped data from PSX DPS (Pakistan Stock Exchange Data Portal), Investing.com metals, and Zameen.com real estate (Lahore, Islamabad, Karachi), please answer this question: {question}
LIVE DATA:
{scraped_content}
Please provide a comprehensive answer based on the most recent available data."""
else:
prompt = f"{question}\n\n(Note: Scraped data is still being collected, please wait for the first scraping cycle to complete)"
payload = {
"contents": [
{
"parts": [
{
"text": prompt
}
]
}
],
"generationConfig": {
"temperature": 0.7,
"maxOutputTokens": 2048
}
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
result = response.json()
if 'candidates' in result and len(result['candidates']) > 0:
return result['candidates'][0]['content']['parts'][0]['text']
else:
return "❌ No response generated from Gemini"
else:
return f"❌ Gemini API Error: {response.status_code} - {response.text}"
except Exception as e:
return f"❌ Error querying Gemini: {e}"
# API Endpoints
@app.on_event("startup")
async def startup_event():
"""Initialize scraper and start background scraping on startup"""
global scraper
logger.info("Starting up FastAPI application...")
# Initialize scraper with API key from environment
api_key = os.getenv("GEMINI_API_KEY", "AIzaSyAU9lxyzLEWblABLjkOJJWPSQvc3nTSjjE")
scraper = EnhancedScraper(gemini_api_key=api_key)
# Start background scraping
scraper.start_background_scraping()
logger.info("Background scraping initiated")
@app.on_event("shutdown")
async def shutdown_event():
"""Stop background scraping on shutdown"""
global scraper
if scraper:
scraper.stop_background_scraping()
logger.info("Background scraping stopped")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Enhanced Web Scraper API",
"endpoints": {
"POST /query": "Query scraped data with AI",
"GET /status": "Check scraping status",
"GET /health": "Health check"
},
"documentation": "/docs"
}
@app.post("/query", response_model=QueryResponse)
async def query_data(request: QueryRequest):
"""
Query the scraped data using Gemini AI
"""
try:
if not scraper:
raise HTTPException(status_code=500, detail="Scraper not initialized")
# Use provided API key or fall back to configured one
answer = scraper.query_gemini(request.question, request.api_key)
return QueryResponse(
answer=answer,
status="success",
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error processing query: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status", response_model=StatusResponse)
async def get_status():
"""
Get current scraping status and file information
"""
if not scraper:
raise HTTPException(status_code=500, detail="Scraper not initialized")
files_status = {}
for source, filename in scraper.scraped_files.items():
files_status[source] = os.path.exists(filename)
return StatusResponse(
scraping_active=scraper.running,
last_update=scraper.last_update,
files_status=files_status
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}