from fastapi import FastAPI, Request, Query from fastapi.responses import JSONResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware import requests from bs4 import BeautifulSoup import os import logging import time from typing import Optional # Configuração do app FastAPI app = FastAPI(title="StreamFlix Addon", version="1.0.0") # Configuração do logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Middleware CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configurações (usar variáveis de ambiente no Hugging Face) BASE_URL = os.getenv("BASE_URL", "") TMDB_API_KEY = os.getenv("TMDB_API_KEY", "") TMDB_API_URL = "https://api.themoviedb.org/3" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" REQUEST_TIMEOUT = 15 MAX_RETRIES = 2 # Manifesto do Addon (atualizado para Hugging Face) MANIFEST = { "id": "community.streamflix.hf", "version": "1.0.0", "catalogs": [ { "type": "movie", "id": "streamflix_movies", "name": "StreamFlix - Filmes", "extra": [{"name": "search", "isRequired": False}] }, { "type": "series", "id": "streamflix_series", "name": "StreamFlix - Séries", "extra": [{"name": "search", "isRequired": False}] } ], "resources": ["catalog", "stream", "meta"], "types": ["movie", "series"], "name": "StreamFlix (HF)", "description": "Addon StreamFlix hospedado no Hugging Face", "logo": "https://i.imgur.com/8t14k1R.png", "background": "https://i.imgur.com/y6fryeO.jpeg", "idPrefixes": ["tt", "tmdb"], "behaviorHints": {"adult": False} } class VOD: def __init__(self): self.base = BASE_URL self.session = requests.Session() self.session.headers.update({"User-Agent": USER_AGENT}) self.cache = {} self.cache_timeout = 3600 # 1 hora def _request(self, url, method="get", data=None, referer=None, retry=0): headers = {"User-Agent": USER_AGENT} if referer: headers["Referer"] = referer if method == "post": headers.update({ "X-Requested-With": "XMLHttpRequest", "Content-Type": "application/x-www-form-urlencoded" }) try: if method == "get": response = self.session.get(url, headers=headers, timeout=REQUEST_TIMEOUT) else: response = self.session.post(url, data=data, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() return response except requests.exceptions.RequestException as e: if retry < MAX_RETRIES: logger.warning(f"Retrying ({retry+1}/{MAX_RETRIES}) for {url}: {e}") time.sleep(1) return self._request(url, method, data, referer, retry+1) logger.error(f"Request failed: {e}") return None def _get_soup(self, url): cache_key = f"soup_{url}" if cache_key in self.cache: return self.cache[cache_key] response = self._request(url) if not response: return None soup = BeautifulSoup(response.text, 'html.parser') self.cache[cache_key] = soup return soup def _post_api(self, api_url, data, referer=None): cache_key = f"api_{api_url}_{str(data)}" if cache_key in self.cache: return self.cache[cache_key] response = self._request(api_url, "post", data, referer) if not response: return None try: result = response.json() self.cache[cache_key] = result return result except ValueError: logger.error("Invalid JSON response") return None def _get_stream_data(self, player_data, referer_url): if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): return None video_url = player_data['data']['video_url'] parsed_url = requests.utils.urlparse(video_url) video_hash = parsed_url.path.strip('/').split('/')[-1] origin = f"{parsed_url.scheme}://{parsed_url.netloc}" player_endpoint = f"{origin}/player/index.php?data={video_hash}&do=getVideo" stream_data = self._post_api( player_endpoint, {'hash': video_hash, 'r': f"{self.base}/"}, referer=origin ) if stream_data and stream_data.get('videoSource'): return { "url": stream_data['videoSource'], "headers": { "User-Agent": USER_AGENT, "Referer": origin } } return None def tvshows(self, tmdb_id, season, episode): cache_key = f"tv_{tmdb_id}_{season}_{episode}" if cache_key in self.cache: return self.cache[cache_key] try: url = f"{self.base}/serie/{tmdb_id}/{season}/{episode}" soup = self._get_soup(url) if not soup: return None div = soup.find('episode-item', class_='episodeOption active') or \ soup.find('div', class_='episodeOption active') if not div: return None data_contentid = div.get('data-contentid') if not data_contentid: return None api_url = f"{self.base}/api" options_data = self._post_api(api_url, {'action': 'getOptions', 'contentid': data_contentid}, referer=url) if not options_data or not options_data.get('data') or not options_data['data'].get('options'): return None # Find Portuguese option selected_option = None for option in options_data['data']['options']: title = option.get('title', '').lower() if 'português' in title or 'portuguese' in title or 'dublado' in title: selected_option = option break if not selected_option and options_data['data']['options']: selected_option = options_data['data']['options'][0] if not selected_option: return None id_ = selected_option['ID'] player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': id_}, referer=url) if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): return None result = self._get_stream_data(player_data, url) self.cache[cache_key] = result return result except Exception as e: logger.error(f"Error in tvshows: {e}", exc_info=True) return None def movie(self, imdb_id): cache_key = f"movie_{imdb_id}" if cache_key in self.cache: return self.cache[cache_key] try: url = f"{self.base}/filme/{imdb_id}" soup = self._get_soup(url) if not soup: return None players_div = soup.find('div', {'class': 'players_select'}) if not players_div: return None player_items = players_div.find_all('div', {'class': 'player_select_item'}) if not player_items: return None # Find Portuguese player selected_player = None for player in player_items: title = player.get_text(strip=True).lower() if 'português' in title or 'portuguese' in title or 'dublado' in title: selected_player = player break if not selected_player and player_items: selected_player = player_items[0] if not selected_player: return None data_id = selected_player.get('data-id') if not data_id: return None api_url = f"{self.base}/api" player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': data_id}, referer=url) if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'): return None result = self._get_stream_data(player_data, url) self.cache[cache_key] = result return result except Exception as e: logger.error(f"Error in movie: {e}", exc_info=True) return None vod_api = VOD() def get_tmdb_media(media_type, page=1, search_query=None): cache_key = f"tmdb_{media_type}_{page}_{search_query}" if cache_key in vod_api.cache: return vod_api.cache[cache_key] try: if search_query: url = f"{TMDB_API_URL}/search/{media_type}" params = { "api_key": TMDB_API_KEY, "query": search_query, "page": page, "language": "pt-BR" } else: url = f"{TMDB_API_URL}/discover/{media_type}" params = { "api_key": TMDB_API_KEY, "sort_by": "popularity.desc", "page": page, "language": "pt-BR" } response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() vod_api.cache[cache_key] = data return data except Exception as e: logger.error(f"TMDB request failed: {e}") return None def extract_tmdb_id(id_str): if id_str.startswith("tmdb:"): return id_str.split(":")[1] if id_str.isdigit(): return id_str return None def convert_tmdb_to_imdb(tmdb_id, media_type="movie"): cache_key = f"tmdb_to_imdb_{media_type}_{tmdb_id}" if cache_key in vod_api.cache: return vod_api.cache[cache_key] try: url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}/external_ids" params = {"api_key": TMDB_API_KEY} response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) response.raise_for_status() data = response.json() imdb_id = data.get("imdb_id", "") vod_api.cache[cache_key] = imdb_id return imdb_id except Exception as e: logger.error(f"TMDB conversion failed: {e}") return "" # Rotas do FastAPI @app.get("/") async def root(): return RedirectResponse(url="/manifest.json") @app.get("/manifest.json") async def get_manifest(): return JSONResponse(MANIFEST) @app.get("/catalog/{type}/{id}.json") async def get_catalog( type: str, id: str, request: Request, skip: int = Query(0), search: str = Query(None) ): logger.info(f"Catalog request: {type}/{id}?skip={skip}&search={search}") if id not in ["streamflix_movies", "streamflix_series"]: return JSONResponse({"metas": []}) page = (skip // 20) + 1 media_type = "movie" if id == "streamflix_movies" else "tv" tmdb_data = get_tmdb_media(media_type, page, search) if not tmdb_data or "results" not in tmdb_data: return JSONResponse({"metas": []}) metas = [] for item in tmdb_data["results"]: if not item.get("poster_path"): continue meta = { "id": f"tmdb:{item['id']}", "type": media_type, "name": item.get("title") if media_type == "movie" else item.get("name"), "genres": [genre["name"] for genre in item.get("genres", [])][:3], "description": item.get("overview", "Sem descrição disponível")[:300], "poster": f"https://image.tmdb.org/t/p/w500{item['poster_path']}", "background": f"https://image.tmdb.org/t/p/original{item['backdrop_path']}" if item.get("backdrop_path") else MANIFEST["background"], "releaseInfo": item.get("release_date", "")[:4] if media_type == "movie" else item.get("first_air_date", "")[:4], "imdbRating": round(item.get("vote_average", 0), 1) } metas.append(meta) return JSONResponse({"metas": metas}) @app.get("/meta/{type}/{id}.json") async def get_meta(type: str, id: str): try: tmdb_id = extract_tmdb_id(id) media_type = "movie" if type == "movie" else "tv" if not tmdb_id and type == "movie" and id.startswith("tt"): imdb_id = id elif tmdb_id: imdb_id = convert_tmdb_to_imdb(tmdb_id, media_type) if type == "movie" else "" else: raise ValueError("Invalid ID format") if not tmdb_id and type != "movie": return JSONResponse({ "meta": { "id": id, "type": type, "name": id, "poster": MANIFEST["logo"], "description": "Conteúdo disponível no StreamFlix" } }) url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}" params = { "api_key": TMDB_API_KEY, "language": "pt-BR", "append_to_response": "external_ids" } response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT) response.raise_for_status() details = response.json() meta = { "id": id, "type": type, "name": details.get("title") if media_type == "movie" else details.get("name"), "genres": [genre["name"] for genre in details.get("genres", [])], "description": details.get("overview", "Sem descrição disponível"), "poster": f"https://image.tmdb.org/t/p/w500{details['poster_path']}" if details.get("poster_path") else MANIFEST["logo"], "background": f"https://image.tmdb.org/t/p/original{details['backdrop_path']}" if details.get("backdrop_path") else MANIFEST["background"], "releaseInfo": details.get("release_date", "")[:4] if media_type == "movie" else details.get("first_air_date", "")[:4], "imdbRating": round(details.get("vote_average", 0), 1), "director": ", ".join( [crew["name"] for crew in details.get("crew", []) if crew.get("job") == "Director" ][:2]) if media_type == "movie" else None } if media_type == "tv": meta["type"] = "series" meta["videos"] = [] for season in details.get("seasons", []): season_number = season["season_number"] if season_number > 0: meta["videos"].append({ "season": season_number, "episode": 1, "id": f"{id}:{season_number}:1", "title": f"Temporada {season_number} - Episódio 1", "overview": season.get("overview", f"Primeiro episódio da temporada {season_number}") or "" }) return JSONResponse({"meta": meta}) except Exception as e: logger.error(f"Meta request failed: {e}") return JSONResponse({ "meta": { "id": id, "type": type, "name": id, "poster": MANIFEST["logo"], "description": "Conteúdo disponível no StreamFlix" } }, status_code=200) @app.get("/stream/{type}/{id}.json") async def get_stream(type: str, id: str): try: if type == "movie": if id.startswith("tmdb:"): tmdb_id = extract_tmdb_id(id) imdb_id = convert_tmdb_to_imdb(tmdb_id, "movie") if not imdb_id: logger.warning(f"Failed to convert TMDB to IMDB: {id}") return JSONResponse({"streams": []}) stream_data = vod_api.movie(imdb_id) else: stream_data = vod_api.movie(id) elif type == "series": parts = id.split(":") if len(parts) < 3: return JSONResponse({"streams": []}) if parts[0] == "tmdb" and len(parts) >= 4: tmdb_id = parts[1] season = parts[2] episode = parts[3] else: tmdb_id = parts[0] season = parts[1] episode = parts[2] stream_data = vod_api.tvshows(tmdb_id, season, episode) if stream_data: return JSONResponse({ "streams": [ { "url": stream_data["url"], "title": "StreamFlix", "quality": "HD", "behaviorHints": { "notWebReady": True, "proxyHeaders": { "request": { "User-Agent": stream_data["headers"]["User-Agent"], "Referer": stream_data["headers"]["Referer"] } } } } ] }) except Exception as e: logger.error(f"Stream request failed: {e}") return JSONResponse({"streams": []})