Spaces:
Sleeping
Sleeping
| import os | |
| from typing import Optional, Dict, Any | |
| from urllib.parse import urlparse | |
| import requests | |
| import vk_api | |
| class VKMemeParser: | |
| def __init__(self, token: str): | |
| """ | |
| Initialize the VK Meme Parser. | |
| Args: | |
| token (str): VK API access token. | |
| """ | |
| self.vk_session = vk_api.VkApi(token=token) | |
| self.vk = self.vk_session.get_api() | |
| def _process_post(self, post: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """ | |
| Process a single post and extract relevant information. | |
| Args: | |
| post (Dict[str, Any]): A dictionary containing post data. | |
| Returns: | |
| Optional[Dict[str, Any]]: A dictionary with post ID, text, and image URL if valid, | |
| None otherwise. | |
| """ | |
| # Check if the post is valid | |
| if (post.get("marked_as_ads") or | |
| "is_pinned" in post or | |
| "copy_history" in post or | |
| len(post.get("attachments", [])) != 1 or | |
| post["attachments"][0]["type"] != "photo"): | |
| return None | |
| post_id = post["id"] | |
| text = post["text"].strip() | |
| # Get the largest available photo | |
| photo_sizes = post["attachments"][0]["photo"]["sizes"] | |
| largest_photo = max( | |
| photo_sizes, | |
| key=lambda x: x["width"] * x["height"]) | |
| image_url = largest_photo["url"] | |
| return { | |
| "id": post_id, | |
| "text": text, | |
| "image_url": image_url | |
| } | |
| def get_memes(self, public_id: str) -> Dict[str, Any]: | |
| """ | |
| Retrieve and process all meme posts from a specified public page. | |
| Args: | |
| public_id (str): ID or short name of the public page. | |
| Returns: | |
| Dict[str, Any]: A dictionary containing the public's name and processed meme posts. | |
| """ | |
| memes = [] | |
| # Determine whether to use domain or owner_id | |
| if public_id.isdigit() or (public_id.startswith("-") | |
| and public_id[1:].isdigit()): | |
| params: Dict[str, Any] = {"owner_id": int(public_id)} | |
| else: | |
| params: Dict[str, Any] = {"domain": public_id} | |
| # Fetch public's name | |
| group_info = self.vk.groups.getById(group_id=public_id)[0] | |
| group_name = group_info['name'] | |
| # Process posts | |
| offset = 0 | |
| while True: | |
| # Fetch 100 posts at a time | |
| params["count"] = 100 | |
| params["offset"] = offset | |
| response = self.vk.wall.get(**params) | |
| posts = response["items"] | |
| for post in posts: | |
| processed_post = self._process_post(post) | |
| if processed_post: | |
| memes.append(processed_post) | |
| # Check if we've reached the end of posts | |
| if len(posts) < 100: | |
| break | |
| offset = response["next_from"] | |
| return { | |
| "name": group_name, | |
| "posts": memes | |
| } | |
| def download_image( | |
| self, | |
| image_url: str, | |
| folder_path: str) -> Optional[str]: | |
| """ | |
| Download an image from the given URL and save it to the specified folder. | |
| Args: | |
| image_url (str): The URL of the image to download. | |
| folder_path (str): The path to the folder where the image should be saved. | |
| Returns: | |
| Optional[str]: The path to the saved image file, or None if the download failed. | |
| """ | |
| try: | |
| # Create the folder if it doesn't exist | |
| os.makedirs(folder_path, exist_ok=True) | |
| filename = os.path.basename(urlparse(image_url).path) | |
| if not os.path.splitext(filename)[1]: | |
| return None | |
| image_path = os.path.join(folder_path, filename) | |
| response = requests.get(image_url, stream=True) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| with open(image_path, 'wb') as file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| file.write(chunk) | |
| return filename | |
| except Exception as e: | |
| print(f"Error downloading image from {image_url}: {str(e)}") | |
| return None | |