Spaces:
Runtime error
Runtime error
| from email.mime.text import MIMEText | |
| from google.oauth2 import service_account | |
| from googleapiclient.http import MediaFileUpload | |
| import base64 | |
| import os | |
| import base64 | |
| import streamlit as st | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from email.mime.multipart import MIMEMultipart | |
| import requests | |
| import json | |
| from langchain.tools import tool | |
| # Constants | |
| SCOPES_DRIVE = ['https://www.googleapis.com/auth/drive'] | |
| SERVICE_ACCOUNT_FILE = 'service_account.json' | |
| PARENT_FOLDER_ID = "1REXfwxk9dcPdpZXJOFZSur3880soVN9y" | |
| # Authenticate and return credentials for Google Drive | |
| def authenticate_drive(): | |
| credentials = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, scopes=SCOPES_DRIVE) | |
| return credentials | |
| # Upload file to Google Drive | |
| def upload_file(filepath, filename, parent_folder_id): | |
| creds = authenticate_drive() | |
| service = build('drive', 'v3', credentials=creds) | |
| file_metadata = { | |
| 'name': filename, | |
| 'parents': [parent_folder_id] | |
| } | |
| media = MediaFileUpload(filepath, resumable=True) | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| print(f'File ID: {file.get("id")}') | |
| return file.get('id') | |
| SCOPES = ["https://www.googleapis.com/auth/gmail.send"] | |
| def authenticate_gmail(): | |
| """Authenticate and return the Gmail service.""" | |
| creds = None | |
| if os.path.exists("token.json"): | |
| creds = Credentials.from_authorized_user_file("token.json", SCOPES) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| with open("token.json", "w") as token: | |
| token.write(creds.to_json()) | |
| service = build("gmail", "v1", credentials=creds) | |
| return service | |
| def send_email_with_company_details(user_email, company_name, topic): | |
| blog_id = upload_file('blog_post.docx', 'blog post', PARENT_FOLDER_ID) | |
| video_id = upload_file('video.mp4', 'video', PARENT_FOLDER_ID) | |
| # Prepare email content | |
| blog_link = f"https://drive.google.com/file/d/{blog_id}/view?usp=sharing" | |
| video_link = f"https://drive.google.com/file/d/{video_id}/view?usp=sharing" | |
| email_subject = f"Blog and Video for the topic {topic}" | |
| email_body = f""" | |
| <html> | |
| <body> | |
| <p>Hello,</p> | |
| <p>The requested blog and video has been shared with you by <b>{company_name}</b>.</p> | |
| <p>You can view the files using the following links:</p> | |
| <b>Blog:</b> | |
| <a href="{blog_link}"> {topic}</a> | |
| <br> | |
| <br> | |
| <b>Video:</b> | |
| <a href="{video_link}"> {topic}</a> | |
| <br> | |
| <p>Best regards,<br>{company_name}</p> | |
| </body> | |
| </html> | |
| """ | |
| try: | |
| # Create message container - the correct MIME type is multipart/alternative. | |
| msg = MIMEMultipart('alternative') | |
| msg['to'] = user_email | |
| msg['subject'] = email_subject | |
| # Attach parts into message container | |
| part1 = MIMEText(email_body, 'plain') | |
| part2 = MIMEText(email_body, 'html') | |
| msg.attach(part1) | |
| msg.attach(part2) | |
| raw_string = base64.urlsafe_b64encode(msg.as_bytes()).decode() | |
| service = authenticate_gmail() | |
| sent_message = service.users().messages().send(userId='me', body={'raw': raw_string}).execute() | |
| # # Connect to the SMTP server | |
| # server = smtplib.SMTP('smtp.gmail.com', 587) | |
| # server.starttls() # Secure the connection | |
| # server.login(SENDER_EMAIL, PASSWORD) | |
| # server.sendmail(SENDER_EMAIL, user_email, msg.as_string()) | |
| print('Email sent successfully!') | |
| except Exception as e: | |
| print(f'Error sending email: {str(e)}') | |
| # Upload blog post and video, then share them and send an email | |
| # def main(): | |
| # blog_filepath = 'blog_post.docx' | |
| # video_filepath = 'video.mp4' | |
| # user_email = receiver_email | |
| # company_name = 'digiotai' | |
| # # Upload blog post | |
| # blog_file_id = upload_file(blog_filepath, 'blog post', PARENT_FOLDER_ID) | |
| # # Upload video | |
| # video_file_id = upload_file(video_filepath, 'video', PARENT_FOLDER_ID) | |
| # send_email_with_company_details(blog_file_id, video_file_id, user_email, company_name) | |
| # main() | |
| def get_urn(token): | |
| # access_token = '<your_access_token>' | |
| url = 'https://api.linkedin.com/v2/userinfo' | |
| headers = { | |
| 'Authorization': f'Bearer {token}' | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| user_info = response.json() | |
| print(user_info['sub']) | |
| return user_info['sub'] | |
| else: | |
| print(f'Failed to fetch user info: {response.status_code}') | |
| print(response.text) | |
| def post_image_and_text( | |
| token: str, title: str, image_path: str, text_content: str | |
| ): | |
| """ | |
| Posts an article on LinkedIn with an image. | |
| :param token: str. LinkedIn OAuth token. | |
| :param title: str. Article title. | |
| :param text_content: str. Article content. | |
| :param image_path: str. Local file path of the image to be used as a thumbnail. | |
| """ | |
| owner = get_urn(token) | |
| # Initialize the upload to get the upload URL and image URN | |
| init_url = "https://api.linkedin.com/rest/images?action=initializeUpload" | |
| headers = { | |
| "LinkedIn-Version": "202401", | |
| "X-RestLi-Protocol-Version": "2.0.0", | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {token}", | |
| } | |
| init_data = json.dumps({"initializeUploadRequest": {"owner": f'urn:li:person:{owner}'}}) | |
| init_response = requests.post(init_url, headers=headers, data=init_data) | |
| print(init_response.content) | |
| if init_response.status_code != 200: | |
| raise Exception(f"Failed to initialize upload: {init_response.text}") | |
| init_response_data = init_response.json()["value"] | |
| upload_url = init_response_data["uploadUrl"] | |
| image_urn = init_response_data["image"] | |
| # Upload the file | |
| with open(image_path, "rb") as f: | |
| upload_response = requests.post(upload_url, files={"file": f}) | |
| if upload_response.status_code not in [200, 201]: | |
| raise Exception(f"Failed to upload file: {upload_response.text}") | |
| # Create the post with the uploaded image URN as thumbnail | |
| post_url = "https://api.linkedin.com/rest/posts" | |
| post_data = json.dumps( | |
| { | |
| "author": f'urn:li:person:{owner}', | |
| "commentary": text_content, | |
| "visibility": "PUBLIC", | |
| "distribution": { | |
| "feedDistribution": "MAIN_FEED", | |
| "targetEntities": [], | |
| "thirdPartyDistributionChannels": [], | |
| }, | |
| "content": { | |
| "media": { | |
| "title": title, | |
| "id": image_urn, | |
| } | |
| }, | |
| "lifecycleState": "PUBLISHED", | |
| "isReshareDisabledByAuthor": False, | |
| } | |
| ) | |
| post_response = requests.post(post_url, headers=headers, data=post_data) | |
| print(post_response.content) | |
| if post_response.status_code in [200, 201]: | |
| return "Linkedin article has been posted successfully!" | |
| else: | |
| raise Exception(f"Failed to post article: {post_response.text}") |