Spaces:
Sleeping
Sleeping
| # | |
| import os | |
| import requests | |
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.security import OAuth2PasswordBearer | |
| from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader | |
| from langchain_community.document_loaders import OnlinePDFLoader | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urljoin | |
| app = FastAPI() | |
| API_KEY = os.environ["API_KEY"] | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| async def validate_token(token: str = Depends(oauth2_scheme)): | |
| if token != API_KEY: | |
| raise HTTPException(status_code=401, detail="Invalid API Key") | |
| def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True): | |
| try: | |
| if "youtube.com" in url or "youtu.be" in url: | |
| # YouTubeの場合 | |
| loader = YoutubeLoader.from_youtube_url( | |
| youtube_url=url, | |
| add_video_info=True, | |
| language=[language], | |
| ) | |
| docs = loader.load() | |
| text_content = str(docs) | |
| elif url.endswith(".pdf"): | |
| # PDFの場合 | |
| loader = OnlinePDFLoader(url) | |
| docs = loader.load() | |
| text_content = docs[0].page_content | |
| else: | |
| # それ以外の場合 | |
| # loader = WebBaseLoader(url) | |
| # docs = loader.load() | |
| # text_content = docs[0].page_content | |
| if use_jina: | |
| response = requests.get("https://r.jina.ai/"+ url) | |
| text_content = response.text | |
| else: | |
| text_content = str(fetch_and_convert_to_markdown(url)) | |
| if len(text_content) < length: | |
| return {"text_content": text_content} | |
| else: | |
| return { | |
| "text_content": text_content[: int(length / 2)] | |
| + text_content[len(text_content) - int(length / 2) :] | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| return {"message": error_msg} | |
| def fetch_and_convert_to_markdown(url): | |
| response = requests.get(url,timeout = 10) | |
| if response.status_code != 200: | |
| return f"エラー: ステータスコード {response.status_code}" | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| markdown = "" | |
| # タイトル | |
| if soup.title: | |
| markdown += f"# {soup.title.string.strip()}\n\n" | |
| # メインコンテンツ(この例では body タグ内のコンテンツを対象とします) | |
| main_content = soup.body | |
| if main_content: | |
| for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']): | |
| if element.name.startswith('h'): | |
| level = int(element.name[1]) | |
| markdown += f"{'#' * level} {element.get_text().strip()}\n\n" | |
| elif element.name == 'p': | |
| markdown += f"{element.get_text().strip()}\n\n" | |
| elif element.name == 'a': | |
| href = element.get('href') | |
| if href: | |
| full_url = urljoin(url, href) | |
| markdown += f"[{element.get_text().strip()}]({full_url})\n\n" | |
| elif element.name in ['ul', 'ol']: | |
| for li in element.find_all('li'): | |
| markdown += f"- {li.get_text().strip()}\n" | |
| markdown += "\n" | |
| return markdown |