| from fastapi import FastAPI, HTTPException, Query, Depends |
| from fastapi.responses import Response |
| from pydantic import BaseModel |
| import hrequests |
| import trafilatura |
| from fastapi.middleware.cors import CORSMiddleware |
| from typing import Optional |
| from pytrends.request import TrendReq |
| from datetime import datetime, timedelta |
| from fastapi_cache import FastAPICache |
| from fastapi_cache.backends.inmemory import InMemoryBackend |
| from fastapi_cache.decorator import cache |
| import pdfkit |
|
|
| app = FastAPI() |
|
|
| class URLRequest(BaseModel): |
| url: str |
|
|
| @app.post("/scrape") |
| async def scrape(url_request: URLRequest): |
| try: |
| response = hrequests.get(url_request.url, browser='chrome') |
|
|
|
|
| return {"content": response.text} |
| except Exception as e: |
| raise e |
|
|
| @app.get("/extract-article") |
| def extract_article( |
| url: str, |
| record_id: Optional[str] = Query(None, description="Add an ID to the metadata."), |
| no_fallback: Optional[bool] = Query(False, description="Skip the backup extraction with readability-lxml and justext."), |
| favor_precision: Optional[bool] = Query(False, description="Prefer less text but correct extraction."), |
| favor_recall: Optional[bool] = Query(False, description="When unsure, prefer more text."), |
| include_comments: Optional[bool] = Query(True, description="Extract comments along with the main text."), |
| output_format: Optional[str] = Query('txt', description="Define an output format: 'csv', 'json', 'markdown', 'txt', 'xml', 'xmltei'.", enum=["csv", "json", "markdown", "txt", "xml", "xmltei"]), |
| target_language: Optional[str] = Query(None, description="Define a language to discard invalid documents (ISO 639-1 format)."), |
| include_tables: Optional[bool] = Query(True, description="Take into account information within the HTML <table> element."), |
| include_images: Optional[bool] = Query(False, description="Take images into account (experimental)."), |
| include_links: Optional[bool] = Query(False, description="Keep links along with their targets (experimental)."), |
| deduplicate: Optional[bool] = Query(False, description="Remove duplicate segments and documents."), |
| max_tree_size: Optional[int] = Query(None, description="Discard documents with too many elements.") |
| ): |
| response = hrequests.get(url) |
| filecontent = response.text |
| extracted = trafilatura.extract( |
| filecontent, |
| url=url, |
| record_id=record_id, |
| no_fallback=no_fallback, |
| favor_precision=favor_precision, |
| favor_recall=favor_recall, |
| include_comments=include_comments, |
| output_format=output_format, |
| target_language=target_language, |
| include_tables=include_tables, |
| include_images=include_images, |
| include_links=include_links, |
| deduplicate=deduplicate, |
| max_tree_size=max_tree_size |
| ) |
| |
| if extracted: |
| return {"article": trafilatura.utils.sanitize(extracted)} |
| else: |
| return {"error": "Could not extract the article"} |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| pytrends = TrendReq() |
|
|
| @app.on_event("startup") |
| async def startup(): |
| FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache") |
|
|
| @app.get("/realtime_trending_searches") |
| @cache(expire=3600) |
| async def get_realtime_trending_searches(pn: str = Query('US', description="Country code for trending searches")): |
| trending_searches = pytrends.realtime_trending_searches(pn=pn) |
| return trending_searches.to_dict(orient='records') |
|
|
| @app.get("/", tags=["Home"]) |
| def api_home(): |
| return {'detail': 'Welcome to Web-Scraping API! Visit https://pvanand-web-scraping.hf.space/docs to test'} |
|
|
| class HTMLRequest(BaseModel): |
| html_content: str |
|
|
| @app.post("/html_to_pdf") |
| async def convert_to_pdf(request: HTMLRequest): |
| try: |
| options = { |
| 'page-size': 'A4', |
| 'margin-top': '0.75in', |
| 'margin-right': '0.75in', |
| 'margin-bottom': '0.75in', |
| 'margin-left': '0.75in', |
| 'encoding': "UTF-8", |
| } |
| |
| pdf = pdfkit.from_string(request.html_content, False, options=options) |
| return Response(content=pdf, media_type="application/pdf") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| from fastapi import FastAPI, HTTPException, Response |
| from pydantic import BaseModel |
| from html4docx import HtmlToDocx |
| import os |
|
|
|
|
| class HTMLInput(BaseModel): |
| html: str |
|
|
| |
| TEMP_FOLDER = "/app/temp" |
|
|
| @app.post("/convert") |
| async def convert_html_to_docx(input_data: HTMLInput): |
| temp_filename = None |
| try: |
| |
| parser = HtmlToDocx() |
| |
| |
| docx = parser.parse_html_string(input_data.html) |
| |
| |
| temp_filename = os.path.join(TEMP_FOLDER, f"temp_{os.urandom(8).hex()}.docx") |
| |
| |
| docx.save(temp_filename) |
| |
| |
| with open(temp_filename, 'rb') as file: |
| file_contents = file.read() |
| |
| |
| return Response( |
| content=file_contents, |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
| headers={"Content-Disposition": "attachment; filename=converted.docx"} |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| finally: |
| |
| if temp_filename and os.path.exists(temp_filename): |
| os.remove(temp_filename) |
|
|
| @app.post("/html_to_docx") |
| async def convert_html_to_docx(input_data: HTMLRequest): |
| temp_filename = None |
| try: |
| |
| parser = HtmlToDocx() |
| |
| |
| docx = parser.parse_html_string(input_data.html_content) |
| |
| |
| temp_filename = os.path.join(TEMP_FOLDER, f"temp_{os.urandom(8).hex()}.docx") |
| |
| |
| docx.save(temp_filename) |
| |
| |
| with open(temp_filename, 'rb') as file: |
| file_contents = file.read() |
| |
| |
| return Response( |
| content=file_contents, |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
| headers={"Content-Disposition": "attachment; filename=converted.docx"} |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| finally: |
| |
| if temp_filename and os.path.exists(temp_filename): |
| os.remove(temp_filename) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |