Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import cv2 | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| from io import BytesIO | |
| import requests | |
| import shutil | |
| import vk_api | |
| from bs4 import BeautifulSoup | |
| from deepface import DeepFace | |
| from googletrans import Translator | |
| from reportlab.lib import colors | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.platypus import Image, SimpleDocTemplate, Table, TableStyle | |
| from ultralytics import YOLO | |
| with open("config.json", "r") as f: | |
| config = json.load(f) | |
| FACE_DET_TRESH = config["FACE_DET_TRESH"] | |
| FACE_DIST_TRESH = config["FACE_DIST_TRESH"] | |
| YOLO_WEIGHTS_URL = config["YOLO_WEIGHTS_URL"] | |
| AVATARS_URI = config["AVATARS_URI"] | |
| APP_NAME = config["APP_NAME"] | |
| APP_DESCRIPTION = config["APP_DESCRIPTION"] | |
| APP_LOGO = config["APP_LOGO"] | |
| def load_detector(): | |
| yolo_weights_filename = os.path.basename(YOLO_WEIGHTS_URL) | |
| if not os.path.exists(yolo_weights_filename): | |
| response = requests.get(YOLO_WEIGHTS_URL) | |
| with open(yolo_weights_filename, "wb") as file: | |
| file.write(response.content) | |
| return YOLO(yolo_weights_filename) | |
| model = load_detector() | |
| styles = getSampleStyleSheet() | |
| style_table = TableStyle([ | |
| ("BACKGROUND", (0, 0), (-1, 0), colors.grey), | |
| ("TEXTCOLOR", (0, 0), (-1, 0), colors.whitesmoke), | |
| ("ALIGN", (0, 0), (-1, -1), "CENTER"), | |
| ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), | |
| ("FONTSIZE", (0, 0), (-1, 0), 14), | |
| ("BOTTOMPADDING", (0, 0), (-1, 0), 12), | |
| ("BACKGROUND", (0, 1), (-1, -1), colors.beige), | |
| ("GRID", (0, 0), (-1, -1), 1, colors.black), | |
| ]) | |
| def parse_album(data): | |
| album_info = data["NGRX_STATE"]["game"]["info"]["data"]["photoSetUrl"] | |
| album_info = album_info.split("-")[-1].split("_") | |
| owner_id = - int(album_info[0]) | |
| album_id = int(album_info[1]) | |
| return owner_id, album_id | |
| def get_photos(owner_id, album_id, vk): | |
| offset = 0 | |
| total_count = float("inf") | |
| count_per_request = 50 | |
| output = [] | |
| while offset < total_count: | |
| params = { | |
| "owner_id": owner_id, | |
| "album_id": album_id, | |
| "count": count_per_request, | |
| "offset": offset, | |
| "extended": "1" | |
| } | |
| response = vk.photos.get(**params) | |
| for item in response["items"]: | |
| max_item = max(item["sizes"], key=lambda item: item["height"]) | |
| output.append(max_item["url"]) | |
| total_count = response["count"] | |
| offset += count_per_request | |
| return output | |
| def download_images(photos, players): | |
| current_datetime = datetime.now() | |
| folder_name = current_datetime.strftime("%Y-%m-%d_%H-%M-%S") | |
| os.mkdir(folder_name) | |
| players_path = os.path.join(folder_name, "players") | |
| photos_path = os.path.join(folder_name, "photos") | |
| temp_path = os.path.join(folder_name, "temp") | |
| os.mkdir(players_path) | |
| os.mkdir(photos_path) | |
| os.mkdir(temp_path) | |
| update_progress(0, "Downloading photos...") | |
| for i, photo_url in enumerate(photos): | |
| filename = f"{i}.jpg" | |
| response = requests.get(photo_url) | |
| with open(os.path.join(photos_path, filename), "wb") as file: | |
| file.write(response.content) | |
| update_progress((i+1)/len(photos), "Downloading photos...") | |
| for team_state in players.keys(): | |
| update_progress(0, f"Downloading {team_state} players' avatars...") | |
| for i, player in enumerate(players[team_state]): | |
| filename = f"{player['id']}.jpg" | |
| response = requests.get(player["avatar_url"]) | |
| with open(os.path.join(players_path, filename), "wb") as file: | |
| file.write(response.content) | |
| update_progress((i+1)/len(players[team_state]), f"Downloading {team_state} players' avatars...") | |
| return { | |
| "photos_path": photos_path, | |
| "players_path": players_path, | |
| "temp_path": temp_path, | |
| "folder_name": folder_name | |
| } | |
| def find_photos(data, vk): | |
| pattern = re.compile('<script id="axl-desktop-state" type="application/json">(.+?)</script>') | |
| script_content = pattern.search(data).group(1).replace('&q;', '"') | |
| data = json.loads(script_content) | |
| owner_id, album_id = parse_album(data) | |
| return get_photos(owner_id, album_id, vk) | |
| def translate(text): | |
| translator = Translator() | |
| output = translator.translate(text, src="ru", dest="en") | |
| return output.text | |
| def get_players(data): | |
| output = {} | |
| team_states = ["home", "away"] | |
| soup = BeautifulSoup(data, "lxml") | |
| for team_state in team_states: | |
| update_progress(0, f"Getting information about {team_state} players...") | |
| output[team_state] = [] | |
| player_roots = soup.find_all("div", {"class": f"{team_state} ng-star-inserted"}) | |
| for i, player_root in enumerate(player_roots): | |
| player_info = player_root.find("a", {"class": "wrapper ng-star-inserted"}) | |
| id = re.findall(r"\d+", player_info["href"])[-1] | |
| avatar_url = AVATARS_URI.replace("PLAYER_ID", id) | |
| name = player_info.find("span", {"class": "name"}).get_text() | |
| name = translate(name) | |
| position = player_info.find("span", {"class": "position"}).get_text() | |
| output[team_state].append({ | |
| "id": id, | |
| "name": name, | |
| "position": position, | |
| "avatar_url": avatar_url | |
| }) | |
| update_progress((i+1)/len(player_roots), f"Getting information about {team_state} players...") | |
| return output | |
| def load_players_avatars(players, images_path, face_det_tresh): | |
| for team_state in players.keys(): | |
| update_progress(0, f"Reading avatars of {team_state} team...") | |
| for i, player in enumerate(players[team_state]): | |
| image_name = f"{player['id']}.jpg" | |
| player["image"] = read_image_from_path(os.path.join(images_path, image_name)) | |
| faces = find_faces(player["image"], face_det_tresh) | |
| if faces: | |
| player["face"] = faces[0] | |
| update_progress((i+1)/len(players[team_state]), f"Reading avatars of {team_state} team...") | |
| return players | |
| def find_distance(base_face, check_face): | |
| result = DeepFace.verify(base_face, check_face, enforce_detection=False) | |
| return result["distance"] | |
| def read_image_from_path(path): | |
| return cv2.imread(path) | |
| def read_images_from_path(path): | |
| images = [] | |
| files = os.listdir(path) | |
| update_progress(0, "Reading photos...") | |
| for i, filename in enumerate(files): | |
| if filename.endswith(".jpg"): | |
| image = read_image_from_path(os.path.join(path, filename)) | |
| if image is not None: | |
| images.append(image) | |
| update_progress((i+1)/len(files), "Reading photos...") | |
| return images | |
| def cv2_to_reportlab(cv2_image): | |
| buffer = BytesIO() | |
| _, buffer = cv2.imencode(".jpg", cv2_image) | |
| io_buf = BytesIO(buffer) | |
| return Image(io_buf) | |
| def find_faces(image, face_det_tresh): | |
| outputs = model(image) | |
| faces = [] | |
| for box in outputs[0].boxes: | |
| if float(box.conf) >= face_det_tresh: | |
| x, y, w, h = [int(coord) for coord in box.xywh[0]] | |
| x_center, y_center = x + w / 2, y + h / 2 | |
| x1 = int(x_center - w) | |
| y1 = int(y_center - h) | |
| crop_img = image[y1:y1+h, x1:x1+w] | |
| faces.append(crop_img) | |
| return faces | |
| def is_face_exists(players, face, face_dist_tresh): | |
| for team_state in players.keys(): | |
| for player in players[team_state]: | |
| if "face" in player: | |
| distance = find_distance(player["face"], face) | |
| if distance <= face_dist_tresh: | |
| return player["id"], player["face"] | |
| return None, None | |
| def add_players_table(elements, players): | |
| data = [ | |
| ["Player ID", "Name", "Position", "Avatar", "Face"] | |
| ] | |
| for team_state in players.keys(): | |
| update_progress(0, f"Creating dump of {team_state}'s squad...") | |
| for i, player in enumerate(players[team_state]): | |
| face = cv2_to_reportlab(player["face"]) if "face" in player else None | |
| avatar = cv2_to_reportlab(player["image"]) | |
| line = [ | |
| player["id"], | |
| player["name"], | |
| player["position"], | |
| avatar, | |
| face | |
| ] | |
| data.append(line) | |
| update_progress((i+1)/len(players[team_state]), f"Creating dump of {team_state}'s squad...") | |
| table = Table(data) | |
| table.setStyle(style_table) | |
| elements.append(table) | |
| return elements | |
| def check_faces(elements, photos, players, face_det_tresh, face_dist_tresh): | |
| data = [ | |
| ["Face", "Player ID", "Player Face"] | |
| ] | |
| update_progress(0, "Comparing faces...") | |
| for i, photo in enumerate(photos): | |
| faces = find_faces(photo, face_det_tresh) | |
| for j, face in enumerate(faces): | |
| player_id, player_face = is_face_exists(players, face, face_dist_tresh) | |
| face = cv2_to_reportlab(face) | |
| tmp_arr = [face, player_id] | |
| if player_face is not None: | |
| player_face = cv2_to_reportlab(player_face) | |
| tmp_arr.append(player_face) | |
| data.append(tmp_arr) | |
| update_progress((j+1)/len(faces), f"[{i + 1}/{len(photos)}] Comparing faces...") | |
| table = Table(data) | |
| table.setStyle(style_table) | |
| elements.append(table) | |
| return elements | |
| def update_progress(percent, description): | |
| progress_bar.progress(percent) | |
| progress_status_text.text(description) | |
| def process(token, afl_link, face_dist_tresh, face_det_tresh): | |
| update_progress(0, "Connecting to vk...") | |
| vk_session = vk_api.VkApi(token=token) | |
| vk = vk_session.get_api() | |
| update_progress(100, "Connected to vk") | |
| update_progress(0, "Getting information from afl...") | |
| response = requests.get(afl_link) | |
| update_progress(100, "Got information from afl") | |
| update_progress(0, "Getting information about photos...") | |
| photos = find_photos(response.text, vk) | |
| update_progress(100, "Got information about photos") | |
| players = get_players(response.text) | |
| result = download_images(photos, players) | |
| photos = read_images_from_path(result["photos_path"]) | |
| players = load_players_avatars(players, result["players_path"], face_det_tresh) | |
| table_file = os.path.join(result["temp_path"], "table.pdf") | |
| doc = SimpleDocTemplate(table_file, pagesize=letter) | |
| elements = [] | |
| elements = check_faces(elements, photos, players, face_det_tresh, face_dist_tresh) | |
| elements = add_players_table(elements, players) | |
| doc.build(elements) | |
| with open(table_file, "rb") as file: | |
| pdf_bytes = file.read() | |
| shutil.rmtree(result["folder_name"]) | |
| update_progress(100, "Process completed") | |
| return pdf_bytes | |
| st.set_page_config(page_title=APP_NAME) | |
| st.title(APP_NAME) | |
| st.image(APP_LOGO, use_column_width=True) | |
| st.write(APP_DESCRIPTION) | |
| access_token = st.text_input("Your VK API access token", help="You can obtain your token from https://vkhost.github.io/") | |
| afl_url = st.text_input("AFL match url", help="Example: https://afl.ru/football/afl-moscow-8x8/afl-cup-krasnaya-presnya-3097/matches/463676") | |
| face_det_tresh = st.slider("face_det_tresh:", 0.0, 1.0, FACE_DET_TRESH, 0.01, help="Adjust the threshold value for face detection.") | |
| face_dist_tresh = st.slider("face_dist_tresh:", 0.0, 1.0, FACE_DIST_TRESH, 0.01, help="Adjust the threshold to determine the maximum acceptable distance between faces.") | |
| button_clicked = st.button("Process") | |
| if button_clicked: | |
| progress_bar = st.progress(0) | |
| progress_status_text = st.empty() | |
| pdf_bytes = process(access_token, afl_url, face_dist_tresh, face_det_tresh) | |
| st.download_button(label="Download PDF", data=pdf_bytes, file_name="output.pdf") |