| import os |
| import json |
| import requests |
| from google.auth.transport.requests import Request |
| from google.oauth2.credentials import Credentials |
| from google_auth_oauthlib.flow import InstalledAppFlow |
|
|
| SCOPES = ['https://www.googleapis.com/auth/blogger'] |
| BLOG_ID = 'YOUR_BLOG_ID_HERE' |
|
|
| class BloggerClient: |
| def __init__(self, client_secrets_file='client_secret.json', token_file='token.json'): |
| self.client_secrets_file = client_secrets_file |
| self.token_file = token_file |
| self.creds = None |
| self.service_url = 'https://www.googleapis.com/blogger/v3/blogs' |
| self.authenticate() |
|
|
| def authenticate(self): |
| if os.path.exists(self.token_file): |
| self.creds = Credentials.from_authorized_user_file(self.token_file, SCOPES) |
| if not self.creds or not self.creds.valid: |
| if self.creds and self.creds.expired and self.creds.refresh_token: |
| self.creds.refresh(Request()) |
| else: |
| flow = InstalledAppFlow.from_client_secrets_file( |
| self.client_secrets_file, SCOPES) |
| self.creds = flow.run_console() |
| with open(self.token_file, 'w') as token: |
| token.write(self.creds.to_json()) |
|
|
| def get_blog_info(self): |
| url = f"{self.service_url}/{BLOG_ID}" |
| headers = {'Authorization': f'Bearer {self.creds.token}'} |
| response = requests.get(url, headers=headers) |
| return response.json() |
|
|
| def create_post(self, title, content, labels=[]): |
| url = f"{self.service_url}/{BLOG_ID}/posts/" |
| headers = { |
| 'Authorization': f'Bearer {self.creds.token}', |
| 'Content-Type': 'application/json' |
| } |
| post_data = { |
| 'kind': 'blogger#post', |
| 'blog': {'id': BLOG_ID}, |
| 'title': title, |
| 'content': content, |
| 'labels': labels |
| } |
| response = requests.post(url, headers=headers, data=json.dumps(post_data)) |
| return response.json() |