Spaces:
Sleeping
Sleeping
File size: 4,329 Bytes
7e1f5f6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import os
from typing import Optional, Dict, Any
from urllib.parse import urlparse
import requests
import vk_api
class VKMemeParser:
def __init__(self, token: str):
"""
Initialize the VK Meme Parser.
Args:
token (str): VK API access token.
"""
self.vk_session = vk_api.VkApi(token=token)
self.vk = self.vk_session.get_api()
def _process_post(self, post: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Process a single post and extract relevant information.
Args:
post (Dict[str, Any]): A dictionary containing post data.
Returns:
Optional[Dict[str, Any]]: A dictionary with post ID, text, and image URL if valid,
None otherwise.
"""
# Check if the post is valid
if (post.get("marked_as_ads") or
"is_pinned" in post or
"copy_history" in post or
len(post.get("attachments", [])) != 1 or
post["attachments"][0]["type"] != "photo"):
return None
post_id = post["id"]
text = post["text"].strip()
# Get the largest available photo
photo_sizes = post["attachments"][0]["photo"]["sizes"]
largest_photo = max(
photo_sizes,
key=lambda x: x["width"] * x["height"])
image_url = largest_photo["url"]
return {
"id": post_id,
"text": text,
"image_url": image_url
}
def get_memes(self, public_id: str) -> Dict[str, Any]:
"""
Retrieve and process all meme posts from a specified public page.
Args:
public_id (str): ID or short name of the public page.
Returns:
Dict[str, Any]: A dictionary containing the public's name and processed meme posts.
"""
memes = []
# Determine whether to use domain or owner_id
if public_id.isdigit() or (public_id.startswith("-")
and public_id[1:].isdigit()):
params: Dict[str, Any] = {"owner_id": int(public_id)}
else:
params: Dict[str, Any] = {"domain": public_id}
# Fetch public's name
group_info = self.vk.groups.getById(group_id=public_id)[0]
group_name = group_info['name']
# Process posts
offset = 0
while True:
# Fetch 100 posts at a time
params["count"] = 100
params["offset"] = offset
response = self.vk.wall.get(**params)
posts = response["items"]
for post in posts:
processed_post = self._process_post(post)
if processed_post:
memes.append(processed_post)
# Check if we've reached the end of posts
if len(posts) < 100:
break
offset = response["next_from"]
return {
"name": group_name,
"posts": memes
}
def download_image(
self,
image_url: str,
folder_path: str) -> Optional[str]:
"""
Download an image from the given URL and save it to the specified folder.
Args:
image_url (str): The URL of the image to download.
folder_path (str): The path to the folder where the image should be saved.
Returns:
Optional[str]: The path to the saved image file, or None if the download failed.
"""
try:
# Create the folder if it doesn't exist
os.makedirs(folder_path, exist_ok=True)
filename = os.path.basename(urlparse(image_url).path)
if not os.path.splitext(filename)[1]:
return None
image_path = os.path.join(folder_path, filename)
response = requests.get(image_url, stream=True)
response.raise_for_status() # Raise an exception for bad status codes
with open(image_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
return filename
except Exception as e:
print(f"Error downloading image from {image_url}: {str(e)}")
return None
|