File size: 5,393 Bytes
2ee51ff 6ef3c36 a768471 1fd5f04 6b02d5a b29fe08 e82f9ce 7a0c4e5 239c04d 6ef3c36 eb8dde2 7a0c4e5 1fd5f04 7a0c4e5 6b02d5a eb8dde2 d0a97bd eb8dde2 d0a97bd 6fabfc5 7a0c4e5 6b02d5a c373a8d c8bcd4e c373a8d 6fabfc5 c373a8d c8bcd4e 6fabfc5 6b02d5a 6fabfc5 6b02d5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import os
import requests
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader
from langchain_community.document_loaders import OnlinePDFLoader
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import httpx
app = FastAPI()
API_KEY = os.environ["API_KEY"]
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def validate_token(token: str = Depends(oauth2_scheme)):
if token != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API Key")
@app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)])
def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True):
try:
if "youtube.com" in url or "youtu.be" in url:
# YouTubeの場合
loader = YoutubeLoader.from_youtube_url(
youtube_url=url,
add_video_info=True,
language=[language],
)
docs = loader.load()
text_content = str(docs)
elif url.endswith(".pdf"):
# PDFの場合
loader = OnlinePDFLoader(url)
docs = loader.load()
text_content = docs[0].page_content
else:
# それ以外の場合
# loader = WebBaseLoader(url)
# docs = loader.load()
# text_content = docs[0].page_content
if use_jina:
response = requests.get("https://r.jina.ai/"+ url)
text_content = response.text
else:
response = requests.get(url,timeout = 10)
text_content = str(convert_to_markdown(response.text,url))
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
@app.post("/httpx_bs", tags=["Text Extraction and beautiful soup"], dependencies=[Depends(validate_token)])
def httpx_bs(url: str, length: int = 150000):
try:
response = httpx.get(url)
text_content = str(convert_to_markdown(response,url))
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
@app.post("/extract_from_url", tags=["Text Extraction from URL"], dependencies=[Depends(validate_token)])
def extract_from_url(url: str, length: int = 150000, tool: str = "httpx"):
try:
if tool == "jina":
response = requests.get("https://r.jina.ai/" + url)
text_content = response.text
elif tool == "httpx":
response = httpx.get(url)
text_content = str(convert_to_markdown(response.text, url))
elif tool == "requests":
response = requests.get(url, timeout=10)
text_content = str(convert_to_markdown(response.text, url))
elif tool == "webbaseloader":
loader = WebBaseLoader(url)
docs = loader.load()
text_content = docs[0].page_content
else:
raise ValueError("Invalid tool specified. Choose from 'jina', 'httpx', 'requests', or 'webbaseloader'.")
if len(text_content) < length:
return {"text_content": text_content}
else:
return {
"text_content": text_content[: int(length / 2)]
+ text_content[len(text_content) - int(length / 2) :]
}
except Exception as e:
error_msg = str(e)
return {"message": error_msg}
def convert_to_markdown(response_text,url):
# if response.status_code != 200:
# return f"エラー: ステータスコード {response.status_code}"
soup = BeautifulSoup(response_text, 'html.parser')
markdown = ""
# タイトル
if soup.title:
markdown += f"# {soup.title.string.strip()}\n\n"
# メインコンテンツ(この例では body タグ内のコンテンツを対象とします)
main_content = soup.body
if main_content:
for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']):
if element.name.startswith('h'):
level = int(element.name[1])
markdown += f"{'#' * level} {element.get_text().strip()}\n\n"
elif element.name == 'p':
markdown += f"{element.get_text().strip()}\n\n"
elif element.name == 'a':
href = element.get('href')
if href:
full_url = urljoin(url, href)
markdown += f"[{element.get_text().strip()}]({full_url})\n\n"
elif element.name in ['ul', 'ol']:
for li in element.find_all('li'):
markdown += f"- {li.get_text().strip()}\n"
markdown += "\n"
return markdown |