streamflix / app.py
Bsweb1's picture
Update app.py
4f056bf verified
from fastapi import FastAPI, Request, Query
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
import requests
from bs4 import BeautifulSoup
import os
import logging
import time
from typing import Optional
# Configuração do app FastAPI
app = FastAPI(title="StreamFlix Addon", version="1.0.0")
# Configuração do logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Middleware CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configurações (usar variáveis de ambiente no Hugging Face)
BASE_URL = os.getenv("BASE_URL", "")
TMDB_API_KEY = os.getenv("TMDB_API_KEY", "")
TMDB_API_URL = "https://api.themoviedb.org/3"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
REQUEST_TIMEOUT = 15
MAX_RETRIES = 2
# Manifesto do Addon (atualizado para Hugging Face)
MANIFEST = {
"id": "community.streamflix.hf",
"version": "1.0.0",
"catalogs": [
{
"type": "movie",
"id": "streamflix_movies",
"name": "StreamFlix - Filmes",
"extra": [{"name": "search", "isRequired": False}]
},
{
"type": "series",
"id": "streamflix_series",
"name": "StreamFlix - Séries",
"extra": [{"name": "search", "isRequired": False}]
}
],
"resources": ["catalog", "stream", "meta"],
"types": ["movie", "series"],
"name": "StreamFlix (HF)",
"description": "Addon StreamFlix hospedado no Hugging Face",
"logo": "https://i.imgur.com/8t14k1R.png",
"background": "https://i.imgur.com/y6fryeO.jpeg",
"idPrefixes": ["tt", "tmdb"],
"behaviorHints": {"adult": False}
}
class VOD:
def __init__(self):
self.base = BASE_URL
self.session = requests.Session()
self.session.headers.update({"User-Agent": USER_AGENT})
self.cache = {}
self.cache_timeout = 3600 # 1 hora
def _request(self, url, method="get", data=None, referer=None, retry=0):
headers = {"User-Agent": USER_AGENT}
if referer:
headers["Referer"] = referer
if method == "post":
headers.update({
"X-Requested-With": "XMLHttpRequest",
"Content-Type": "application/x-www-form-urlencoded"
})
try:
if method == "get":
response = self.session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
else:
response = self.session.post(url, data=data, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if retry < MAX_RETRIES:
logger.warning(f"Retrying ({retry+1}/{MAX_RETRIES}) for {url}: {e}")
time.sleep(1)
return self._request(url, method, data, referer, retry+1)
logger.error(f"Request failed: {e}")
return None
def _get_soup(self, url):
cache_key = f"soup_{url}"
if cache_key in self.cache:
return self.cache[cache_key]
response = self._request(url)
if not response:
return None
soup = BeautifulSoup(response.text, 'html.parser')
self.cache[cache_key] = soup
return soup
def _post_api(self, api_url, data, referer=None):
cache_key = f"api_{api_url}_{str(data)}"
if cache_key in self.cache:
return self.cache[cache_key]
response = self._request(api_url, "post", data, referer)
if not response:
return None
try:
result = response.json()
self.cache[cache_key] = result
return result
except ValueError:
logger.error("Invalid JSON response")
return None
def _get_stream_data(self, player_data, referer_url):
if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'):
return None
video_url = player_data['data']['video_url']
parsed_url = requests.utils.urlparse(video_url)
video_hash = parsed_url.path.strip('/').split('/')[-1]
origin = f"{parsed_url.scheme}://{parsed_url.netloc}"
player_endpoint = f"{origin}/player/index.php?data={video_hash}&do=getVideo"
stream_data = self._post_api(
player_endpoint,
{'hash': video_hash, 'r': f"{self.base}/"},
referer=origin
)
if stream_data and stream_data.get('videoSource'):
return {
"url": stream_data['videoSource'],
"headers": {
"User-Agent": USER_AGENT,
"Referer": origin
}
}
return None
def tvshows(self, tmdb_id, season, episode):
cache_key = f"tv_{tmdb_id}_{season}_{episode}"
if cache_key in self.cache:
return self.cache[cache_key]
try:
url = f"{self.base}/serie/{tmdb_id}/{season}/{episode}"
soup = self._get_soup(url)
if not soup:
return None
div = soup.find('episode-item', class_='episodeOption active') or \
soup.find('div', class_='episodeOption active')
if not div:
return None
data_contentid = div.get('data-contentid')
if not data_contentid:
return None
api_url = f"{self.base}/api"
options_data = self._post_api(api_url, {'action': 'getOptions', 'contentid': data_contentid}, referer=url)
if not options_data or not options_data.get('data') or not options_data['data'].get('options'):
return None
# Find Portuguese option
selected_option = None
for option in options_data['data']['options']:
title = option.get('title', '').lower()
if 'português' in title or 'portuguese' in title or 'dublado' in title:
selected_option = option
break
if not selected_option and options_data['data']['options']:
selected_option = options_data['data']['options'][0]
if not selected_option:
return None
id_ = selected_option['ID']
player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': id_}, referer=url)
if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'):
return None
result = self._get_stream_data(player_data, url)
self.cache[cache_key] = result
return result
except Exception as e:
logger.error(f"Error in tvshows: {e}", exc_info=True)
return None
def movie(self, imdb_id):
cache_key = f"movie_{imdb_id}"
if cache_key in self.cache:
return self.cache[cache_key]
try:
url = f"{self.base}/filme/{imdb_id}"
soup = self._get_soup(url)
if not soup:
return None
players_div = soup.find('div', {'class': 'players_select'})
if not players_div:
return None
player_items = players_div.find_all('div', {'class': 'player_select_item'})
if not player_items:
return None
# Find Portuguese player
selected_player = None
for player in player_items:
title = player.get_text(strip=True).lower()
if 'português' in title or 'portuguese' in title or 'dublado' in title:
selected_player = player
break
if not selected_player and player_items:
selected_player = player_items[0]
if not selected_player:
return None
data_id = selected_player.get('data-id')
if not data_id:
return None
api_url = f"{self.base}/api"
player_data = self._post_api(api_url, {'action': 'getPlayer', 'video_id': data_id}, referer=url)
if not player_data or not player_data.get('data') or not player_data['data'].get('video_url'):
return None
result = self._get_stream_data(player_data, url)
self.cache[cache_key] = result
return result
except Exception as e:
logger.error(f"Error in movie: {e}", exc_info=True)
return None
vod_api = VOD()
def get_tmdb_media(media_type, page=1, search_query=None):
cache_key = f"tmdb_{media_type}_{page}_{search_query}"
if cache_key in vod_api.cache:
return vod_api.cache[cache_key]
try:
if search_query:
url = f"{TMDB_API_URL}/search/{media_type}"
params = {
"api_key": TMDB_API_KEY,
"query": search_query,
"page": page,
"language": "pt-BR"
}
else:
url = f"{TMDB_API_URL}/discover/{media_type}"
params = {
"api_key": TMDB_API_KEY,
"sort_by": "popularity.desc",
"page": page,
"language": "pt-BR"
}
response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
data = response.json()
vod_api.cache[cache_key] = data
return data
except Exception as e:
logger.error(f"TMDB request failed: {e}")
return None
def extract_tmdb_id(id_str):
if id_str.startswith("tmdb:"):
return id_str.split(":")[1]
if id_str.isdigit():
return id_str
return None
def convert_tmdb_to_imdb(tmdb_id, media_type="movie"):
cache_key = f"tmdb_to_imdb_{media_type}_{tmdb_id}"
if cache_key in vod_api.cache:
return vod_api.cache[cache_key]
try:
url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}/external_ids"
params = {"api_key": TMDB_API_KEY}
response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
data = response.json()
imdb_id = data.get("imdb_id", "")
vod_api.cache[cache_key] = imdb_id
return imdb_id
except Exception as e:
logger.error(f"TMDB conversion failed: {e}")
return ""
# Rotas do FastAPI
@app.get("/")
async def root():
return RedirectResponse(url="/manifest.json")
@app.get("/manifest.json")
async def get_manifest():
return JSONResponse(MANIFEST)
@app.get("/catalog/{type}/{id}.json")
async def get_catalog(
type: str,
id: str,
request: Request,
skip: int = Query(0),
search: str = Query(None)
):
logger.info(f"Catalog request: {type}/{id}?skip={skip}&search={search}")
if id not in ["streamflix_movies", "streamflix_series"]:
return JSONResponse({"metas": []})
page = (skip // 20) + 1
media_type = "movie" if id == "streamflix_movies" else "tv"
tmdb_data = get_tmdb_media(media_type, page, search)
if not tmdb_data or "results" not in tmdb_data:
return JSONResponse({"metas": []})
metas = []
for item in tmdb_data["results"]:
if not item.get("poster_path"):
continue
meta = {
"id": f"tmdb:{item['id']}",
"type": media_type,
"name": item.get("title") if media_type == "movie" else item.get("name"),
"genres": [genre["name"] for genre in item.get("genres", [])][:3],
"description": item.get("overview", "Sem descrição disponível")[:300],
"poster": f"https://image.tmdb.org/t/p/w500{item['poster_path']}",
"background": f"https://image.tmdb.org/t/p/original{item['backdrop_path']}" if item.get("backdrop_path") else MANIFEST["background"],
"releaseInfo": item.get("release_date", "")[:4] if media_type == "movie" else item.get("first_air_date", "")[:4],
"imdbRating": round(item.get("vote_average", 0), 1)
}
metas.append(meta)
return JSONResponse({"metas": metas})
@app.get("/meta/{type}/{id}.json")
async def get_meta(type: str, id: str):
try:
tmdb_id = extract_tmdb_id(id)
media_type = "movie" if type == "movie" else "tv"
if not tmdb_id and type == "movie" and id.startswith("tt"):
imdb_id = id
elif tmdb_id:
imdb_id = convert_tmdb_to_imdb(tmdb_id, media_type) if type == "movie" else ""
else:
raise ValueError("Invalid ID format")
if not tmdb_id and type != "movie":
return JSONResponse({
"meta": {
"id": id,
"type": type,
"name": id,
"poster": MANIFEST["logo"],
"description": "Conteúdo disponível no StreamFlix"
}
})
url = f"{TMDB_API_URL}/{media_type}/{tmdb_id}"
params = {
"api_key": TMDB_API_KEY,
"language": "pt-BR",
"append_to_response": "external_ids"
}
response = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
details = response.json()
meta = {
"id": id,
"type": type,
"name": details.get("title") if media_type == "movie" else details.get("name"),
"genres": [genre["name"] for genre in details.get("genres", [])],
"description": details.get("overview", "Sem descrição disponível"),
"poster": f"https://image.tmdb.org/t/p/w500{details['poster_path']}" if details.get("poster_path") else MANIFEST["logo"],
"background": f"https://image.tmdb.org/t/p/original{details['backdrop_path']}" if details.get("backdrop_path") else MANIFEST["background"],
"releaseInfo": details.get("release_date", "")[:4] if media_type == "movie" else details.get("first_air_date", "")[:4],
"imdbRating": round(details.get("vote_average", 0), 1),
"director": ", ".join(
[crew["name"] for crew in details.get("crew", [])
if crew.get("job") == "Director"
][:2]) if media_type == "movie" else None
}
if media_type == "tv":
meta["type"] = "series"
meta["videos"] = []
for season in details.get("seasons", []):
season_number = season["season_number"]
if season_number > 0:
meta["videos"].append({
"season": season_number,
"episode": 1,
"id": f"{id}:{season_number}:1",
"title": f"Temporada {season_number} - Episódio 1",
"overview": season.get("overview", f"Primeiro episódio da temporada {season_number}") or ""
})
return JSONResponse({"meta": meta})
except Exception as e:
logger.error(f"Meta request failed: {e}")
return JSONResponse({
"meta": {
"id": id,
"type": type,
"name": id,
"poster": MANIFEST["logo"],
"description": "Conteúdo disponível no StreamFlix"
}
}, status_code=200)
@app.get("/stream/{type}/{id}.json")
async def get_stream(type: str, id: str):
try:
if type == "movie":
if id.startswith("tmdb:"):
tmdb_id = extract_tmdb_id(id)
imdb_id = convert_tmdb_to_imdb(tmdb_id, "movie")
if not imdb_id:
logger.warning(f"Failed to convert TMDB to IMDB: {id}")
return JSONResponse({"streams": []})
stream_data = vod_api.movie(imdb_id)
else:
stream_data = vod_api.movie(id)
elif type == "series":
parts = id.split(":")
if len(parts) < 3:
return JSONResponse({"streams": []})
if parts[0] == "tmdb" and len(parts) >= 4:
tmdb_id = parts[1]
season = parts[2]
episode = parts[3]
else:
tmdb_id = parts[0]
season = parts[1]
episode = parts[2]
stream_data = vod_api.tvshows(tmdb_id, season, episode)
if stream_data:
return JSONResponse({
"streams": [
{
"url": stream_data["url"],
"title": "StreamFlix",
"quality": "HD",
"behaviorHints": {
"notWebReady": True,
"proxyHeaders": {
"request": {
"User-Agent": stream_data["headers"]["User-Agent"],
"Referer": stream_data["headers"]["Referer"]
}
}
}
}
]
})
except Exception as e:
logger.error(f"Stream request failed: {e}")
return JSONResponse({"streams": []})