|
|
import os |
|
|
import requests |
|
|
from fastapi import FastAPI, HTTPException, Depends |
|
|
from fastapi.security import OAuth2PasswordBearer |
|
|
from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader |
|
|
from langchain_community.document_loaders import OnlinePDFLoader |
|
|
from bs4 import BeautifulSoup |
|
|
from urllib.parse import urljoin |
|
|
import httpx |
|
|
app = FastAPI() |
|
|
|
|
|
API_KEY = os.environ["API_KEY"] |
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") |
|
|
|
|
|
async def validate_token(token: str = Depends(oauth2_scheme)): |
|
|
if token != API_KEY: |
|
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
|
|
|
|
@app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)]) |
|
|
def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True): |
|
|
try: |
|
|
if "youtube.com" in url or "youtu.be" in url: |
|
|
|
|
|
loader = YoutubeLoader.from_youtube_url( |
|
|
youtube_url=url, |
|
|
add_video_info=True, |
|
|
language=[language], |
|
|
) |
|
|
docs = loader.load() |
|
|
text_content = str(docs) |
|
|
elif url.endswith(".pdf"): |
|
|
|
|
|
loader = OnlinePDFLoader(url) |
|
|
docs = loader.load() |
|
|
text_content = docs[0].page_content |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if use_jina: |
|
|
response = requests.get("https://r.jina.ai/"+ url) |
|
|
text_content = response.text |
|
|
else: |
|
|
response = requests.get(url,timeout = 10) |
|
|
text_content = str(convert_to_markdown(response.text,url)) |
|
|
|
|
|
if len(text_content) < length: |
|
|
return {"text_content": text_content} |
|
|
else: |
|
|
return { |
|
|
"text_content": text_content[: int(length / 2)] |
|
|
+ text_content[len(text_content) - int(length / 2) :] |
|
|
} |
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
return {"message": error_msg} |
|
|
|
|
|
@app.post("/httpx_bs", tags=["Text Extraction and beautiful soup"], dependencies=[Depends(validate_token)]) |
|
|
def httpx_bs(url: str, length: int = 150000): |
|
|
try: |
|
|
response = httpx.get(url) |
|
|
text_content = str(convert_to_markdown(response,url)) |
|
|
|
|
|
if len(text_content) < length: |
|
|
return {"text_content": text_content} |
|
|
else: |
|
|
return { |
|
|
"text_content": text_content[: int(length / 2)] |
|
|
+ text_content[len(text_content) - int(length / 2) :] |
|
|
} |
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
return {"message": error_msg} |
|
|
|
|
|
@app.post("/extract_from_url", tags=["Text Extraction from URL"], dependencies=[Depends(validate_token)]) |
|
|
def extract_from_url(url: str, length: int = 150000, tool: str = "httpx"): |
|
|
try: |
|
|
if tool == "jina": |
|
|
response = requests.get("https://r.jina.ai/" + url) |
|
|
text_content = response.text |
|
|
elif tool == "httpx": |
|
|
response = httpx.get(url) |
|
|
text_content = str(convert_to_markdown(response.text, url)) |
|
|
elif tool == "requests": |
|
|
response = requests.get(url, timeout=10) |
|
|
text_content = str(convert_to_markdown(response.text, url)) |
|
|
elif tool == "webbaseloader": |
|
|
loader = WebBaseLoader(url) |
|
|
docs = loader.load() |
|
|
text_content = docs[0].page_content |
|
|
else: |
|
|
raise ValueError("Invalid tool specified. Choose from 'jina', 'httpx', 'requests', or 'webbaseloader'.") |
|
|
|
|
|
if len(text_content) < length: |
|
|
return {"text_content": text_content} |
|
|
else: |
|
|
return { |
|
|
"text_content": text_content[: int(length / 2)] |
|
|
+ text_content[len(text_content) - int(length / 2) :] |
|
|
} |
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
return {"message": error_msg} |
|
|
|
|
|
|
|
|
def convert_to_markdown(response_text,url): |
|
|
|
|
|
|
|
|
|
|
|
soup = BeautifulSoup(response_text, 'html.parser') |
|
|
markdown = "" |
|
|
|
|
|
|
|
|
if soup.title: |
|
|
markdown += f"# {soup.title.string.strip()}\n\n" |
|
|
|
|
|
|
|
|
main_content = soup.body |
|
|
if main_content: |
|
|
for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']): |
|
|
if element.name.startswith('h'): |
|
|
level = int(element.name[1]) |
|
|
markdown += f"{'#' * level} {element.get_text().strip()}\n\n" |
|
|
elif element.name == 'p': |
|
|
markdown += f"{element.get_text().strip()}\n\n" |
|
|
elif element.name == 'a': |
|
|
href = element.get('href') |
|
|
if href: |
|
|
full_url = urljoin(url, href) |
|
|
markdown += f"[{element.get_text().strip()}]({full_url})\n\n" |
|
|
elif element.name in ['ul', 'ol']: |
|
|
for li in element.find_all('li'): |
|
|
markdown += f"- {li.get_text().strip()}\n" |
|
|
markdown += "\n" |
|
|
|
|
|
return markdown |