| import os |
| from fastapi import FastAPI, HTTPException, UploadFile, Depends, Query |
| from fastapi.security import APIKeyHeader |
| import aiohttp |
| import asyncio |
| import json |
| import tempfile |
| from typing import List, Dict |
| import logging |
| import textract |
| import boto3 |
| from botocore.exceptions import NoCredentialsError |
| from duckduckgo_search import DDGS |
| from bs4 import BeautifulSoup |
|
|
| app = FastAPI() |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| API_KEY_NAME = "X-API-Key" |
| api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) |
|
|
| |
| INVIDIOUS_INSTANCES = [ |
| "https://invidious.privacydev.net", |
| "https://invidious.reallyaweso.me", |
| "https://invidious.adminforge.de" |
| ] |
| API_KEY = os.environ.get("API_KEY") |
|
|
| |
| S3_ACCESS_KEY_ID = os.environ.get("S3_ACCESS_KEY_ID") |
| S3_SECRET_ACCESS_KEY = os.environ.get("S3_SECRET_ACCESS_KEY") |
| S3_BUCKET = os.environ.get("S3_BUCKET") |
| S3_REGION = os.environ.get("S3_REGION") |
|
|
| if not all([API_KEY, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY, S3_BUCKET, S3_REGION]): |
| raise ValueError("Missing required environment variables") |
|
|
|
|
| async def search_and_get_videos(query: str, num_videos: int = 2) -> List[Dict]: |
| for instance in INVIDIOUS_INSTANCES: |
| url = f"{instance}/api/v1/search?q={query}&type=video" |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as response: |
| response.raise_for_status() |
| search_results = await response.json() |
| videos = [ |
| { |
| "id": video.get("videoId"), |
| "title": video.get("title"), |
| "thumbnail": video["videoThumbnails"][0]["url"] |
| if video.get("videoThumbnails") |
| else "", |
| } |
| for video in search_results |
| ][:num_videos] |
| return videos |
| except aiohttp.ClientError as e: |
| logger.error(f"Error performing video search on {instance}: {e}") |
| logger.error("All Invidious instances failed") |
| return [] |
|
|
| async def get_youtube_audio(video_id: str, max_retries: int = 3) -> Dict: |
| for instance in INVIDIOUS_INSTANCES: |
| for attempt in range(max_retries): |
| try: |
| url = f"{instance}/api/v1/videos/{video_id}" |
| |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url) as response: |
| response.raise_for_status() |
| video_data = await response.json() |
| |
| audio_format = next((format for format in video_data.get('adaptiveFormats', []) |
| if format.get('type', '').startswith('audio/mp4')), None) |
| |
| if audio_format: |
| audio_url = audio_format.get('url') |
| if audio_url: |
| try: |
| async with session.get(audio_url) as audio_response: |
| audio_content = await audio_response.read() |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.m4a') as temp_file: |
| temp_file.write(audio_content) |
| temp_file_path = temp_file.name |
| |
| return {'success': True, 'temp_file_path': temp_file_path} |
| except aiohttp.ServerDisconnectedError: |
| if attempt == max_retries - 1: |
| logger.error(f"Max retries reached for video ID {video_id} on {instance}") |
| break |
| await asyncio.sleep(1 * (attempt + 1)) |
| continue |
| |
| logger.warning(f"No suitable audio format found for video ID {video_id} on {instance}") |
| break |
| except aiohttp.ClientError as e: |
| logger.error(f"Network error fetching YouTube audio for video ID {video_id} on {instance}: {e}") |
| except json.JSONDecodeError: |
| logger.error(f"Error decoding JSON response for video ID {video_id} on {instance}") |
| except Exception as e: |
| logger.error(f"Unexpected error fetching YouTube audio for video ID {video_id} on {instance}: {e}") |
| if attempt == max_retries - 1: |
| break |
| await asyncio.sleep(1 * (attempt + 1)) |
| |
| return {'success': False, 'error': "Failed to fetch audio after multiple attempts on all instances"} |
|
|
| def extract_text_from_document(file: UploadFile) -> dict: |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: |
| content = file.file.read() |
| temp_file.write(content) |
| temp_file_path = temp_file.name |
|
|
| text = textract.process(temp_file_path).decode('utf-8') |
|
|
| os.unlink(temp_file_path) |
|
|
| return { |
| 'success': True, |
| 'extracted_text': text |
| } |
| except Exception as e: |
| return { |
| 'success': False, |
| 'error': f"Error extracting text from document: {str(e)}" |
| } |
|
|
| def upload_to_s3(local_file, s3_file): |
| s3_client = boto3.client( |
| "s3", |
| aws_access_key_id=S3_ACCESS_KEY_ID, |
| aws_secret_access_key=S3_SECRET_ACCESS_KEY, |
| region_name=S3_REGION, |
| ) |
| |
| try: |
| s3_client.upload_file(local_file, S3_BUCKET, s3_file) |
| s3_url = f"https://{S3_BUCKET}.s3.{S3_REGION}.amazonaws.com/{s3_file}" |
| return s3_url |
| except NoCredentialsError: |
| logger.error("Credentials not available") |
| return None |
|
|
| def image_search(query: str, num_results: int = 5) -> dict: |
| try: |
| with DDGS( |
| headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} |
| ) as ddgs: |
| results = list(ddgs.images(query, max_results=num_results)) |
| formatted_results = [ |
| { |
| 'title': result['title'], |
| 'image_url': result['image'], |
| 'thumbnail_url': result['thumbnail'], |
| 'source_url': result['url'], |
| 'width': result['width'], |
| 'height': result['height'] |
| } |
| for result in results |
| ] |
| return { |
| 'success': True, |
| 'results': formatted_results |
| } |
| except Exception as e: |
| logger.error(f"Error performing image search: {e}") |
| return { |
| 'success': False, |
| 'error': f"Error performing image search: {str(e)}" |
| } |
|
|
| async def verify_api_key(api_key: str = Depends(api_key_header)): |
| if api_key != API_KEY: |
| raise HTTPException(status_code=401, detail="Invalid API Key") |
| return api_key |
|
|
| @app.get("/search-videos/") |
| async def search_videos( |
| query: str, |
| num_videos: int = Query(default=2, ge=1, le=10), |
| api_key: str = Depends(verify_api_key) |
| ): |
| videos = await search_and_get_videos(query, num_videos) |
| if not videos: |
| raise HTTPException(status_code=404, detail="No videos found or an error occurred during the search.") |
| return {"videos": videos} |
|
|
| @app.get("/get-audio/{video_id}") |
| async def get_audio(video_id: str, api_key: str = Depends(verify_api_key)): |
| result = await get_youtube_audio(video_id) |
| if not result['success']: |
| raise HTTPException(status_code=404, detail=result['error']) |
| |
| s3_file_name = f"{video_id}.m4a" |
| s3_url = upload_to_s3(result['temp_file_path'], s3_file_name) |
| |
| if s3_url: |
| os.unlink(result['temp_file_path']) |
| return {"audio_url": s3_url} |
| else: |
| raise HTTPException(status_code=500, detail="Failed to upload audio to S3") |
|
|
| @app.post("/extract-text/") |
| async def extract_text(file: UploadFile, api_key: str = Depends(verify_api_key)): |
| result = extract_text_from_document(file) |
| if not result['success']: |
| raise HTTPException(status_code=500, detail=result['error']) |
| return {"extracted_text": result['extracted_text']} |
|
|
| @app.get("/image-search/") |
| async def image_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
| result = image_search(query, num_results) |
| if not result['success']: |
| raise HTTPException(status_code=500, detail=result['error']) |
| return result |
|
|
| class DuckDuckGoSearch: |
| async def search(self, query: str, num_results: int = 5) -> list: |
| url = f"https://html.duckduckgo.com/html/?q={query}" |
| headers = { |
| "User-Agent": "Mozilla/5.0", |
| "Referer": "https://google.com/", |
| "Cookie": "kl=wt-wt", |
| } |
|
|
| async with aiohttp.ClientSession() as session: |
| async with session.get(url, headers=headers) as response: |
| if response.status != 200: |
| raise Exception("Failed to fetch data from DuckDuckGo") |
|
|
| html = await response.text() |
| soup = BeautifulSoup(html, "html.parser") |
| results = [] |
|
|
| for result in soup.select(".result"): |
| title = result.select_one(".result__title .result__a") |
| url = result.select_one(".result__url") |
| desc = result.select_one(".result__snippet") |
|
|
| if title and url and desc: |
| results.append({ |
| "title": title.get_text(strip=True), |
| "body": desc.get_text(strip=True), |
| "href": f"https://{url.get_text(strip=True)}", |
| }) |
|
|
| if len(results) >= num_results: |
| break |
|
|
| return results |
|
|
| async def web_search(query: str, num_results: int = 5) -> dict: |
| try: |
| results = await DuckDuckGoSearch().search(query, num_results) |
| return { |
| 'success': True, |
| 'results': results |
| } |
| except Exception as e: |
| return { |
| 'success': False, |
| 'error': str(e) |
| } |
|
|
| @app.get("/web-search/") |
| async def web_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
| result = await web_search(query, num_results) |
| if not result['success']: |
| raise HTTPException(status_code=500, detail=result['error']) |
| return result |