File size: 3,457 Bytes
eb6408a
2ee51ff
 
6ef3c36
 
a768471
1fd5f04
6b02d5a
b29fe08
7a0c4e5
 
239c04d
6ef3c36
 
 
 
 
 
 
 
eb8dde2
7a0c4e5
 
 
 
 
 
 
 
 
 
 
 
1fd5f04
 
 
7a0c4e5
 
6b02d5a
 
 
eb8dde2
d0a97bd
eb8dde2
d0a97bd
 
7a0c4e5
 
 
 
 
 
 
 
 
 
6b02d5a
 
 
94c7fa7
6b02d5a
d0a97bd
6b02d5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# 
import os
import requests
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from langchain_community.document_loaders import YoutubeLoader, UnstructuredPDFLoader, WebBaseLoader
from langchain_community.document_loaders import OnlinePDFLoader
from bs4 import BeautifulSoup
from urllib.parse import urljoin
app = FastAPI()

API_KEY = os.environ["API_KEY"]

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def validate_token(token: str = Depends(oauth2_scheme)):
    if token != API_KEY:
        raise HTTPException(status_code=401, detail="Invalid API Key")

@app.post("/extract_text", tags=["Text Extraction"], dependencies=[Depends(validate_token)])
def extract_text(url: str, language: str = "ja", length: int = 150000,use_jina:bool = True):
    try:
        if "youtube.com" in url or "youtu.be" in url:
            # YouTubeの場合
            loader = YoutubeLoader.from_youtube_url(
                youtube_url=url,
                add_video_info=True,
                language=[language],
            )
            docs = loader.load()
            text_content = str(docs)
        elif url.endswith(".pdf"):
            # PDFの場合
            loader = OnlinePDFLoader(url)
            docs = loader.load()
            text_content = docs[0].page_content
        else:
            # それ以外の場合
            # loader = WebBaseLoader(url)
            # docs = loader.load()
            # text_content = docs[0].page_content
            if use_jina:
                response = requests.get("https://r.jina.ai/"+ url)
                text_content = response.text
            else:
                text_content = str(fetch_and_convert_to_markdown(url))

        if len(text_content) < length:
            return {"text_content": text_content}
        else:
            return {
                "text_content": text_content[: int(length / 2)]
                + text_content[len(text_content) - int(length / 2) :]
            }
    except Exception as e:
        error_msg = str(e)
        return {"message": error_msg}

def fetch_and_convert_to_markdown(url):
    response = requests.get(url,timeout = 10)
    if response.status_code != 200:
        return f"エラー: ステータスコード {response.status_code}"

    soup = BeautifulSoup(response.text, 'html.parser')
    markdown = ""

    # タイトル
    if soup.title:
        markdown += f"# {soup.title.string.strip()}\n\n"

    # メインコンテンツ(この例では body タグ内のコンテンツを対象とします)
    main_content = soup.body
    if main_content:
        for element in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'ul', 'ol']):
            if element.name.startswith('h'):
                level = int(element.name[1])
                markdown += f"{'#' * level} {element.get_text().strip()}\n\n"
            elif element.name == 'p':
                markdown += f"{element.get_text().strip()}\n\n"
            elif element.name == 'a':
                href = element.get('href')
                if href:
                    full_url = urljoin(url, href)
                    markdown += f"[{element.get_text().strip()}]({full_url})\n\n"
            elif element.name in ['ul', 'ol']:
                for li in element.find_all('li'):
                    markdown += f"- {li.get_text().strip()}\n"
                markdown += "\n"

    return markdown