Spaces:
Paused
Paused
| from fastapi import FastAPI, Request, Query | |
| from fastapi.responses import JSONResponse, RedirectResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| import logging | |
| import time | |
| from typing import Optional | |
| # Configuração do app FastAPI | |
| app = FastAPI(title="StreamFlix Addon", version="1.0.0") | |
| # Configuração do logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Middleware CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Configurações (usar variáveis de ambiente no Hugging Face) | |
| BASE_URL = os.getenv("BASE_URL", "") | |
| TMDB_API_KEY = os.getenv("TMDB_API_KEY", "") | |
| TMDB_API_URL = "https://api.themoviedb.org/3" | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| REQUEST_TIMEOUT = 15 | |
| MAX_RETRIES = 2 | |
| # Manifesto do Addon (atualizado para Hugging Face) | |
| MANIFEST = { | |
| "id": "community.streamflix.hf", | |
| "version": "1.0.0", | |
| "catalogs": [ | |
| { | |
| "type": "movie", | |
| "id": "streamflix_movies", | |
| "name": "StreamFlix - Filmes", | |
| "extra": [{"name": "search", "isRequired": False}] | |
| }, | |
| { | |
| "type": "series", | |
| "id": "streamflix_series", | |
| "name": "StreamFlix - Séries", | |
| "extra": [{"name": "search", "isRequired": False}] | |
| } | |
| ], | |
| "resources": ["catalog", "stream", "meta"], | |
| "types": ["movie", "series"], | |
| "name": "StreamFlix (HF)", | |
| "description": "Addon StreamFlix hospedado no Hugging Face", | |
| "logo": "https://i.imgur.com/8t14k1R.png", | |
| "background": "https://i.imgur.com/y6fryeO.jpeg", | |
| "idPrefixes": ["tt", "tmdb"], | |
| "behaviorHints": {"adult": False} | |
| } | |
| class VOD: | |
| def __init__(self): | |
| self.base = BASE_URL | |
| self.session = requests.Session() | |
| self.session.headers.update({"User-Agent": USER_AGENT}) | |
| self.cache = {} | |
| self.cache_timeout = 3600 # 1 hora | |
| def _request(self, url, method="get", data=None, referer=None, retry=0): | |
| headers = {"User-Agent": USER_AGENT} | |
| if referer: | |
| headers["Referer"] = referer | |
| if method == "post": | |
| headers.update({ | |
| "X-Requested-With": "XMLHttpRequest", | |
| "Content-Type": "application/x-www-form-urlencoded" | |
| }) | |
| try: | |
| if method == "get": | |
| response = self.session.get(url, headers=headers, timeout=REQUEST_TIMEOUT) | |
| else: | |
| response = self.session.post(url, data=data, headers=headers, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| return response | |
| except requests.exceptions.RequestException as e: | |
| if retry < MAX_RETRIES: | |
| logger.warning(f"Retrying ({retry+1}/{MAX_RETRIES}) for {url}: {e}") | |
| time.sleep(1) | |
| return self._request(url, method, data, referer, retry+1) | |
| logger.error(f"Request failed: {e}") | |
| return None | |
| def _get_soup(self, url): | |
| cache_key = f"soup_{url}" | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| response = self._request(url) | |
| if not response: | |
| return None | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| self.cache[cache_key] = soup | |
| return soup | |
| def _post_api(self, api_url, data, referer=None): | |
| cache_key = f"api_{api_url}_{str(data)}" | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| response = self._request(api_url, "post", data, referer) | |
| if not response: | |
| return None | |
| try: | |
| result = response.json() | |
| self.cache[cache_key] = result | |
| return result | |
| except ValueError: | |
| logger.error("Invalid JSON response") | |
| return None | |
| def _get_stream_data(self, player_data, referer_url): | |
| if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): | |
| return None | |
| video_url = player_data['data']['video_url'] | |
| parsed_url = requests.utils.urlparse(video_url) | |
| video_hash = parsed_url.path.strip('/').split('/')[-1] | |
| origin = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
| player_endpoint = f"{origin}/player/index.php?data={video_hash}&do=getVideo" | |
| stream_data = self._post_api( | |
| player_endpoint, | |
| {'hash': video_hash, 'r': f"{self.base}/"}, | |
| referer=origin | |
| ) | |
| if stream_data and stream_data.get('videoSource'): | |
| return { | |
| "url": stream_data['videoSource'], | |
| "headers": { | |
| "User-Agent": USER_AGENT, | |
| "Referer": origin | |
| } | |
| } | |
| return None | |
| def tvshows(self, tmdb_id, season, episode): | |
| cache_key = f"tv_{tmdb_id}_{season}_{episode}" | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| try: | |
| url = f"{self.base}/serie/{tmdb_id}/{season}/{episode}" | |
| soup = self._get_soup(url) | |
| if not soup: | |
| return None | |
| div = soup.find('episode-item', class_='episodeOption active') or \ | |
| soup.find('div', class_='episodeOption active') | |
| if not div: | |
| return None | |
| data_contentid = div.get('data-contentid') | |
| if not data_contentid: | |
| return None | |
| api_url = f"{self.base}/api" | |
| options_data = self._post_api(api_url, {'action': 'getOptions', 'contentid': data_contentid}, referer=url) | |
| if not options_data or not options_data.get('data') or not options_data['data'].get('options'): | |
| return None | |
| # Find Portuguese option | |
| selected_option = None | |
| for option in options_data['data']['options']: | |
| title = option.get('title', '').lower() | |
| if 'português' in title or 'portuguese' in title or 'dublado' in title: | |
| selected_option = option | |
| break | |
| if not selected_option and options_data['data']['options']: | |
| selected_option = options_data['data']['options'][0] | |
| if not selected_option: | |
| return None | |
| id_ = selected_option['ID'] | |
| player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': id_}, referer=url) | |
| if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): | |
| return None | |
| result = self._get_stream_data(player_data, url) | |
| self.cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in tvshows: {e}", exc_info=True) | |
| return None | |
| def movie(self, imdb_id): | |
| cache_key = f"movie_{imdb_id}" | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| try: | |
| url = f"{self.base}/filme/{imdb_id}" | |
| soup = self._get_soup(url) | |
| if not soup: | |
| return None | |
| players_div = soup.find('div', {'class': 'players_select'}) | |
| if not players_div: | |
| return None | |
| player_items = players_div.find_all('div', {'class': 'player_select_item'}) | |
| if not player_items: | |
| return None | |
| # Find Portuguese player | |
| selected_player = None | |
| for player in player_items: | |
| title = player.get_text(strip=True).lower() | |
| if 'português' in title or 'portuguese' in title or 'dublado' in title: | |
| selected_player = player | |
| break | |
| if not selected_player and player_items: | |
| selected_player = player_items[0] | |
| if not selected_player: | |
| return None | |
| data_id = selected_player.get('data-id') | |
| if not data_id: | |
| return None | |
| api_url = f"{self.base}/api" | |
| player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': data_id}, referer=url) | |
| if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): | |
| return None | |
| result = self._get_stream_data(player_data, url) | |
| self.cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in movie: {e}", exc_info=True) | |
| return None | |
| vod_api = VOD() | |
| def get_tmdb_media(media_type, page=1, search_query=None): | |
| cache_key = f"tmdb_{media_type}_{page}_{search_query}" | |
| if cache_key in vod_api.cache: | |
| return vod_api.cache[cache_key] | |
| try: | |
| if search_query: | |
| url = f"{TMDB_API_URL}/search/{media_type}" | |
| params = { | |
| "api_key": TMDB_API_KEY, | |
| "query": search_query, | |
| "page": page, | |
| "language": "pt-BR" | |
| } | |
| else: | |
| url = f"{TMDB_API_URL}/discover/{media_type}" | |
| params = { | |
| "api_key": TMDB_API_KEY, | |
| "sort_by": "popularity.desc", | |
| "page": page, | |
| "language": "pt-BR" | |
| } | |
| response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| data = response.json() | |
| vod_api.cache[cache_key] = data | |
| return data | |
| except Exception as e: | |
| logger.error(f"TMDB request failed: {e}") | |
| return None | |
| def extract_tmdb_id(id_str): | |
| if id_str.startswith("tmdb:"): | |
| return id_str.split(":")[1] | |
| if id_str.isdigit(): | |
| return id_str | |
| return None | |
| def convert_tmdb_to_imdb(tmdb_id, media_type="movie"): | |
| cache_key = f"tmdb_to_imdb_{media_type}_{tmdb_id}" | |
| if cache_key in vod_api.cache: | |
| return vod_api.cache[cache_key] | |
| try: | |
| url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}/external_ids" | |
| params = {"api_key": TMDB_API_KEY} | |
| response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| data = response.json() | |
| imdb_id = data.get("imdb_id", "") | |
| vod_api.cache[cache_key] = imdb_id | |
| return imdb_id | |
| except Exception as e: | |
| logger.error(f"TMDB conversion failed: {e}") | |
| return "" | |
| # Rotas do FastAPI | |
| async def root(): | |
| return RedirectResponse(url="/manifest.json") | |
| async def get_manifest(): | |
| return JSONResponse(MANIFEST) | |
| async def get_catalog( | |
| type: str, | |
| id: str, | |
| request: Request, | |
| skip: int = Query(0), | |
| search: str = Query(None) | |
| ): | |
| logger.info(f"Catalog request: {type}/{id}?skip={skip}&search={search}") | |
| if id not in ["streamflix_movies", "streamflix_series"]: | |
| return JSONResponse({"metas": []}) | |
| page = (skip // 20) + 1 | |
| media_type = "movie" if id == "streamflix_movies" else "tv" | |
| tmdb_data = get_tmdb_media(media_type, page, search) | |
| if not tmdb_data or "results" not in tmdb_data: | |
| return JSONResponse({"metas": []}) | |
| metas = [] | |
| for item in tmdb_data["results"]: | |
| if not item.get("poster_path"): | |
| continue | |
| meta = { | |
| "id": f"tmdb:{item['id']}", | |
| "type": media_type, | |
| "name": item.get("title") if media_type == "movie" else item.get("name"), | |
| "genres": [genre["name"] for genre in item.get("genres", [])][:3], | |
| "description": item.get("overview", "Sem descrição disponível")[:300], | |
| "poster": f"https://image.tmdb.org/t/p/w500{item['poster_path']}", | |
| "background": f"https://image.tmdb.org/t/p/original{item['backdrop_path']}" if item.get("backdrop_path") else MANIFEST["background"], | |
| "releaseInfo": item.get("release_date", "")[:4] if media_type == "movie" else item.get("first_air_date", "")[:4], | |
| "imdbRating": round(item.get("vote_average", 0), 1) | |
| } | |
| metas.append(meta) | |
| return JSONResponse({"metas": metas}) | |
| async def get_meta(type: str, id: str): | |
| try: | |
| tmdb_id = extract_tmdb_id(id) | |
| media_type = "movie" if type == "movie" else "tv" | |
| if not tmdb_id and type == "movie" and id.startswith("tt"): | |
| imdb_id = id | |
| elif tmdb_id: | |
| imdb_id = convert_tmdb_to_imdb(tmdb_id, media_type) if type == "movie" else "" | |
| else: | |
| raise ValueError("Invalid ID format") | |
| if not tmdb_id and type != "movie": | |
| return JSONResponse({ | |
| "meta": { | |
| "id": id, | |
| "type": type, | |
| "name": id, | |
| "poster": MANIFEST["logo"], | |
| "description": "Conteúdo disponível no StreamFlix" | |
| } | |
| }) | |
| url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}" | |
| params = { | |
| "api_key": TMDB_API_KEY, | |
| "language": "pt-BR", | |
| "append_to_response": "external_ids" | |
| } | |
| response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| details = response.json() | |
| meta = { | |
| "id": id, | |
| "type": type, | |
| "name": details.get("title") if media_type == "movie" else details.get("name"), | |
| "genres": [genre["name"] for genre in details.get("genres", [])], | |
| "description": details.get("overview", "Sem descrição disponível"), | |
| "poster": f"https://image.tmdb.org/t/p/w500{details['poster_path']}" if details.get("poster_path") else MANIFEST["logo"], | |
| "background": f"https://image.tmdb.org/t/p/original{details['backdrop_path']}" if details.get("backdrop_path") else MANIFEST["background"], | |
| "releaseInfo": details.get("release_date", "")[:4] if media_type == "movie" else details.get("first_air_date", "")[:4], | |
| "imdbRating": round(details.get("vote_average", 0), 1), | |
| "director": ", ".join( | |
| [crew["name"] for crew in details.get("crew", []) | |
| if crew.get("job") == "Director" | |
| ][:2]) if media_type == "movie" else None | |
| } | |
| if media_type == "tv": | |
| meta["type"] = "series" | |
| meta["videos"] = [] | |
| for season in details.get("seasons", []): | |
| season_number = season["season_number"] | |
| if season_number > 0: | |
| meta["videos"].append({ | |
| "season": season_number, | |
| "episode": 1, | |
| "id": f"{id}:{season_number}:1", | |
| "title": f"Temporada {season_number} - Episódio 1", | |
| "overview": season.get("overview", f"Primeiro episódio da temporada {season_number}") or "" | |
| }) | |
| return JSONResponse({"meta": meta}) | |
| except Exception as e: | |
| logger.error(f"Meta request failed: {e}") | |
| return JSONResponse({ | |
| "meta": { | |
| "id": id, | |
| "type": type, | |
| "name": id, | |
| "poster": MANIFEST["logo"], | |
| "description": "Conteúdo disponível no StreamFlix" | |
| } | |
| }, status_code=200) | |
| async def get_stream(type: str, id: str): | |
| try: | |
| if type == "movie": | |
| if id.startswith("tmdb:"): | |
| tmdb_id = extract_tmdb_id(id) | |
| imdb_id = convert_tmdb_to_imdb(tmdb_id, "movie") | |
| if not imdb_id: | |
| logger.warning(f"Failed to convert TMDB to IMDB: {id}") | |
| return JSONResponse({"streams": []}) | |
| stream_data = vod_api.movie(imdb_id) | |
| else: | |
| stream_data = vod_api.movie(id) | |
| elif type == "series": | |
| parts = id.split(":") | |
| if len(parts) < 3: | |
| return JSONResponse({"streams": []}) | |
| if parts[0] == "tmdb" and len(parts) >= 4: | |
| tmdb_id = parts[1] | |
| season = parts[2] | |
| episode = parts[3] | |
| else: | |
| tmdb_id = parts[0] | |
| season = parts[1] | |
| episode = parts[2] | |
| stream_data = vod_api.tvshows(tmdb_id, season, episode) | |
| if stream_data: | |
| return JSONResponse({ | |
| "streams": [ | |
| { | |
| "url": stream_data["url"], | |
| "title": "StreamFlix", | |
| "quality": "HD", | |
| "behaviorHints": { | |
| "notWebReady": True, | |
| "proxyHeaders": { | |
| "request": { | |
| "User-Agent": stream_data["headers"]["User-Agent"], | |
| "Referer": stream_data["headers"]["Referer"] | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Stream request failed: {e}") | |
| return JSONResponse({"streams": []}) |