Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| import uvicorn | |
| import asyncio | |
| import json | |
| import os | |
| from datetime import datetime | |
| import logging | |
| # Import your scrapers | |
| from app3 import PhoneDBScraper, GSMArenaScraperAlternative | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="Phone Specifications API", | |
| description="API for scraping phone specifications from PhoneDB and GSMArena", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, specify allowed origins | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files for frontend | |
| if os.path.exists("static"): | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Pydantic models | |
| class PhoneSearchRequest(BaseModel): | |
| phone_name: str | |
| source: str = "gsmarena" # "phonedb" or "gsmarena" | |
| class MultiplePhoneSearchRequest(BaseModel): | |
| phone_names: List[str] | |
| source: str = "gsmarena" | |
| class PhoneSpecification(BaseModel): | |
| name: str | |
| brand: str | |
| images: List[str] | |
| specifications: Dict[str, Any] | |
| source_url: str | |
| class ApiResponse(BaseModel): | |
| success: bool | |
| message: str | |
| data: Optional[Any] = None | |
| timestamp: str = datetime.now().isoformat() | |
| # Global scrapers | |
| phonedb_scraper = None | |
| gsmarena_scraper = None | |
| async def startup_event(): | |
| """Initialize scrapers on startup""" | |
| global phonedb_scraper, gsmarena_scraper | |
| try: | |
| phonedb_scraper = PhoneDBScraper() | |
| gsmarena_scraper = GSMArenaScraperAlternative() | |
| logger.info("Scrapers initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing scrapers: {e}") | |
| # Routes | |
| async def read_root(): | |
| """Serve the main HTML page""" | |
| try: | |
| with open("templates/index.html", "r", encoding="utf-8") as f: | |
| return HTMLResponse(content=f.read()) | |
| except FileNotFoundError: | |
| return HTMLResponse(content=""" | |
| <html> | |
| <head><title>Phone Specs API</title></head> | |
| <body> | |
| <h1>Phone Specifications API</h1> | |
| <p>API is running! Visit <a href="/docs">/docs</a> for API documentation.</p> | |
| </body> | |
| </html> | |
| """) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return ApiResponse( | |
| success=True, | |
| message="API is healthy", | |
| data={"status": "running", "scrapers": {"phonedb": phonedb_scraper is not None, "gsmarena": gsmarena_scraper is not None}} | |
| ) | |
| async def search_phone(request: PhoneSearchRequest): | |
| """Search for a single phone""" | |
| try: | |
| logger.info(f"Searching for phone: {request.phone_name} using {request.source}") | |
| # Choose scraper based on source | |
| if request.source.lower() == "phonedb" and phonedb_scraper: | |
| scraper = phonedb_scraper | |
| elif request.source.lower() == "gsmarena" and gsmarena_scraper: | |
| scraper = gsmarena_scraper | |
| else: | |
| # Default to GSMArena if available | |
| if gsmarena_scraper: | |
| scraper = gsmarena_scraper | |
| elif phonedb_scraper: | |
| scraper = phonedb_scraper | |
| else: | |
| raise HTTPException(status_code=503, detail="No scrapers available") | |
| # Run scraping in background to avoid blocking | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| scraper.scrape_phone_by_name, | |
| request.phone_name | |
| ) | |
| if result: | |
| return ApiResponse( | |
| success=True, | |
| message=f"Successfully found specifications for {result['name']}", | |
| data=result | |
| ) | |
| else: | |
| return ApiResponse( | |
| success=False, | |
| message=f"No results found for {request.phone_name}", | |
| data=None | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error searching for phone {request.phone_name}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def search_multiple_phones(request: MultiplePhoneSearchRequest): | |
| """Search for multiple phones""" | |
| try: | |
| logger.info(f"Searching for {len(request.phone_names)} phones using {request.source}") | |
| # Choose scraper | |
| if request.source.lower() == "phonedb" and phonedb_scraper: | |
| scraper = phonedb_scraper | |
| elif request.source.lower() == "gsmarena" and gsmarena_scraper: | |
| scraper = gsmarena_scraper | |
| else: | |
| if gsmarena_scraper: | |
| scraper = gsmarena_scraper | |
| elif phonedb_scraper: | |
| scraper = phonedb_scraper | |
| else: | |
| raise HTTPException(status_code=503, detail="No scrapers available") | |
| # Run scraping in background | |
| loop = asyncio.get_event_loop() | |
| results = await loop.run_in_executor( | |
| None, | |
| scraper.scrape_multiple_phones, | |
| request.phone_names | |
| ) | |
| success_count = len(results) if results else 0 | |
| total_count = len(request.phone_names) | |
| return ApiResponse( | |
| success=success_count > 0, | |
| message=f"Successfully scraped {success_count}/{total_count} phones", | |
| data={ | |
| "phones": results, | |
| "success_count": success_count, | |
| "total_count": total_count | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error searching for multiple phones: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_available_sources(): | |
| """Get available scraping sources""" | |
| sources = [] | |
| if phonedb_scraper: | |
| sources.append({ | |
| "id": "phonedb", | |
| "name": "PhoneDB", | |
| "description": "PhoneDB.net database", | |
| "available": True | |
| }) | |
| if gsmarena_scraper: | |
| sources.append({ | |
| "id": "gsmarena", | |
| "name": "GSMArena", | |
| "description": "GSMArena.com database", | |
| "available": True | |
| }) | |
| return ApiResponse( | |
| success=True, | |
| message="Available sources retrieved", | |
| data=sources | |
| ) | |
| async def export_phone_data(phone_name: str, source: str = "gsmarena"): | |
| """Export phone data as JSON file""" | |
| try: | |
| # Choose scraper | |
| if source.lower() == "phonedb" and phonedb_scraper: | |
| scraper = phonedb_scraper | |
| else: | |
| scraper = gsmarena_scraper | |
| if not scraper: | |
| raise HTTPException(status_code=503, detail="Scraper not available") | |
| # Get phone data | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| scraper.scrape_phone_by_name, | |
| phone_name | |
| ) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Phone not found") | |
| # Create temporary file | |
| filename = f"{phone_name.replace(' ', '_')}_specs.json" | |
| filepath = f"/tmp/{filename}" | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(result, f, indent=2, ensure_ascii=False) | |
| return FileResponse( | |
| filepath, | |
| media_type='application/json', | |
| filename=filename | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error exporting phone data: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Background tasks for long-running scraping jobs | |
| background_jobs = {} | |
| async def start_background_scraping(request: MultiplePhoneSearchRequest, background_tasks: BackgroundTasks): | |
| """Start background scraping job for multiple phones""" | |
| job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Initialize job status | |
| background_jobs[job_id] = { | |
| "status": "started", | |
| "progress": 0, | |
| "total": len(request.phone_names), | |
| "results": [], | |
| "started_at": datetime.now().isoformat() | |
| } | |
| # Add background task | |
| background_tasks.add_task( | |
| run_background_scraping, | |
| job_id, | |
| request.phone_names, | |
| request.source | |
| ) | |
| return ApiResponse( | |
| success=True, | |
| message="Background scraping job started", | |
| data={"job_id": job_id} | |
| ) | |
| async def get_scraping_status(job_id: str): | |
| """Get status of background scraping job""" | |
| if job_id not in background_jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return ApiResponse( | |
| success=True, | |
| message="Job status retrieved", | |
| data=background_jobs[job_id] | |
| ) | |
| async def run_background_scraping(job_id: str, phone_names: List[str], source: str): | |
| """Run background scraping job""" | |
| try: | |
| # Choose scraper | |
| if source.lower() == "phonedb" and phonedb_scraper: | |
| scraper = phonedb_scraper | |
| else: | |
| scraper = gsmarena_scraper | |
| if not scraper: | |
| background_jobs[job_id]["status"] = "failed" | |
| background_jobs[job_id]["error"] = "Scraper not available" | |
| return | |
| background_jobs[job_id]["status"] = "running" | |
| results = [] | |
| for i, phone_name in enumerate(phone_names): | |
| try: | |
| # Update progress | |
| background_jobs[job_id]["progress"] = i | |
| background_jobs[job_id]["current_phone"] = phone_name | |
| # Scrape phone | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| scraper.scrape_phone_by_name, | |
| phone_name | |
| ) | |
| if result: | |
| results.append(result) | |
| # Add delay between requests | |
| await asyncio.sleep(2) | |
| except Exception as e: | |
| logger.error(f"Error scraping {phone_name} in background job: {e}") | |
| continue | |
| # Update job status | |
| background_jobs[job_id]["status"] = "completed" | |
| background_jobs[job_id]["progress"] = len(phone_names) | |
| background_jobs[job_id]["results"] = results | |
| background_jobs[job_id]["completed_at"] = datetime.now().isoformat() | |
| except Exception as e: | |
| background_jobs[job_id]["status"] = "failed" | |
| background_jobs[job_id]["error"] = str(e) | |
| logger.error(f"Background job {job_id} failed: {e}") | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "main:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| reload=True, | |
| log_level="info" | |
| ) |