| | import os |
| | import json |
| | import hashlib |
| | import time |
| | import re |
| | import argparse |
| | from pathlib import Path |
| |
|
| | import markdown |
| | from markdown.extensions import tables, fenced_code, codehilite, toc |
| | from googleapiclient.discovery import build |
| | from googleapiclient.errors import HttpError |
| | from google.auth.transport.requests import Request |
| | from google.oauth2.credentials import Credentials |
| |
|
| | BLOG_ID = os.environ["BLOG_ID"] |
| | TOKEN_FILE = os.environ["TOKEN_FILE"] |
| | PUBLISHED_FILE = "published_posts.json" |
| | GH_PAGES_BASE = "https://kagvi13.github.io/HMP/" |
| |
|
| |
|
| | def convert_md_links(md_text: str) -> str: |
| | """Конвертирует относительные ссылки (*.md) в абсолютные ссылки на GitHub Pages.""" |
| | def replacer(match): |
| | text = match.group(1) |
| | link = match.group(2) |
| | if link.startswith("http://") or link.startswith("https://") or not link.endswith(".md"): |
| | return match.group(0) |
| | abs_link = GH_PAGES_BASE + link.replace(".md", "").lstrip("./") |
| | return f"[{text}]({abs_link})" |
| | return re.sub(r"\[([^\]]+)\]\(([^)]+)\)", replacer, md_text) |
| |
|
| |
|
| | def load_published(): |
| | if Path(PUBLISHED_FILE).exists(): |
| | with open(PUBLISHED_FILE, "r", encoding="utf-8") as f: |
| | return json.load(f) |
| | print("⚠ published_posts.json не найден — начинаем с нуля.") |
| | return {} |
| |
|
| |
|
| | def save_published(data): |
| | with open(PUBLISHED_FILE, "w", encoding="utf-8") as f: |
| | json.dump(data, f, ensure_ascii=False, indent=2) |
| |
|
| |
|
| | def file_hash(path): |
| | return hashlib.md5(Path(path).read_bytes()).hexdigest() |
| |
|
| |
|
| | def get_existing_posts(service): |
| | """Возвращает словарь title → post_id для постов, которые можно редактировать.""" |
| | existing = {} |
| | nextPageToken = None |
| | while True: |
| | try: |
| | response = service.posts().list(blogId=BLOG_ID, maxResults=500, pageToken=nextPageToken).execute() |
| | for post in response.get("items", []): |
| | post_id = post["id"] |
| | title = post["title"] |
| | existing[title] = post_id |
| | nextPageToken = response.get("nextPageToken") |
| | if not nextPageToken: |
| | break |
| | except HttpError as e: |
| | print(f"❌ Ошибка при получении списка постов: {e}") |
| | break |
| | return existing |
| |
|
| |
|
| | def main(force: bool = False): |
| | creds = None |
| | if os.path.exists(TOKEN_FILE): |
| | creds = Credentials.from_authorized_user_file(TOKEN_FILE, ["https://www.googleapis.com/auth/blogger"]) |
| |
|
| | if creds and creds.expired and creds.refresh_token: |
| | creds.refresh(Request()) |
| | with open(TOKEN_FILE, "w") as token_file: |
| | token_file.write(creds.to_json()) |
| |
|
| | service = build("blogger", "v3", credentials=creds) |
| | published = load_published() |
| | existing_posts = get_existing_posts(service) |
| |
|
| | md_files = list(Path("docs").rglob("*.md")) |
| | for md_file in md_files: |
| | name = md_file.stem |
| | h = file_hash(md_file) |
| |
|
| | md_text = md_file.read_text(encoding="utf-8") |
| | source_link = f"Источник: [ {md_file.name} ](https://github.com/kagvi13/HMP/blob/main/docs/{md_file.name})\n\n" |
| | md_text = source_link + md_text |
| | md_text = convert_md_links(md_text) |
| |
|
| | html_content = markdown.markdown( |
| | md_text, |
| | extensions=["tables", "fenced_code", "codehilite", "toc"] |
| | ) |
| |
|
| | style = """ |
| | <style> |
| | table { display: block; max-width: 100%; overflow-x: auto; border-collapse: collapse; } |
| | th, td { border: 1px solid #ccc; padding: 6px 12px; } |
| | pre { display: block; max-width: 100%; overflow-x: auto; padding: 10px; background-color: #f8f8f8; border: 1px solid #ddd; border-radius: 6px; font-family: monospace; white-space: pre; } |
| | </style> |
| | """ |
| | html_content = style + html_content |
| |
|
| | body = { |
| | "kind": "blogger#post", |
| | "title": name, |
| | "content": html_content, |
| | } |
| |
|
| | try: |
| | post_id = existing_posts.get(name) |
| |
|
| | if post_id: |
| | |
| | if not force and name in published and published[name]["hash"] == h: |
| | print(f"✅ Пост '{name}' без изменений — пропускаем.") |
| | continue |
| |
|
| | try: |
| | post = service.posts().update(blogId=BLOG_ID, postId=post_id, body=body).execute() |
| | print(f"♻ Обновлён пост: {post['url']}") |
| | except HttpError as e: |
| | if e.resp.status == 403: |
| | post = service.posts().insert(blogId=BLOG_ID, body=body).execute() |
| | print(f"⚠ Пост существовал, но прав нет. Создан новый: {post['url']}") |
| | post_id = post["id"] |
| | else: |
| | raise e |
| | else: |
| | post = service.posts().insert(blogId=BLOG_ID, body=body).execute() |
| | print(f"🆕 Пост опубликован: {post['url']}") |
| | post_id = post["id"] |
| |
|
| | published[name] = {"id": post_id, "hash": h} |
| | save_published(published) |
| |
|
| | print("⏱ Пауза 60 секунд перед следующим постом...") |
| | time.sleep(60) |
| |
|
| | except HttpError as e: |
| | print(f"❌ Ошибка при публикации {name}: {e}") |
| | save_published(published) |
| | break |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--force", action="store_true", help="Обновить все посты, даже без изменений") |
| | args = parser.parse_args() |
| |
|
| | main(force=args.force) |
| |
|