Spaces:
Sleeping
Sleeping
File size: 8,796 Bytes
322ae63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
import os
import shutil
def email_decorator(func):
"""
A decorator to handle common email sending setup and error handling.
It retrieves email credentials from environment variables, sets up the SMTP server,
and sends the email. It also includes error handling for common email-related issues.
Requires environment variables:
SENDER_EMAIL: Email account for sending
SENDER_PASSWORD: App password for email account
"""
import os
import smtplib
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
sender_email = os.environ.get('SENDER_EMAIL')
sender_password = os.environ.get('SENDER_PASSWORD')
if not all([sender_email, sender_password]):
raise ValueError("Missing email credentials in environment variables")
try:
msg = func(*args, **kwargs) # Execute the decorated function to get the EmailMessage object
msg['From'] = sender_email # Ensure 'From' is set here
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
return True
except smtplib.SMTPAuthenticationError:
print("Authentication failed. Check your email credentials.")
return False
except Exception as e:
print(f"Error sending email: {str(e)}")
return False
return wrapper
def get_files_in_folder(root_dir):
"""Recursively scan a folder and return list of files with full paths
Args:
root_dir - Directory to scan
Reutnrs:
List of file names with absolute path
"""
file_list = []
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
full_path = os.path.join(dirpath, filename)
file_list.append(full_path)
return file_list
def get_file_types(file_list):
"""Return dictionary mapping filenames to their file extensions.
Args:
List of file names with absolute path
Returns:
Dictionary with file name as key and file type as value
"""
type_dict = {}
for file_path in file_list:
# Split the file extension from the path
_, ext = os.path.splitext(file_path)
# Remove leading dot and convert to lowercase for consistency
clean_ext = ext.lower().lstrip('.') if ext else 'no_extension'
type_dict[file_path] = clean_ext
return type_dict
def organize_files(type_dict):
"""Organize files into folders based on their type in their respective directories.
Args:
Dictionary with file name as key and file type as value
"""
for file_path, file_type in type_dict.items():
# Get parent directory and filename
parent_dir = os.path.dirname(file_path)
filename = os.path.basename(file_path)
# Create target directory name (pluralize for readability)
target_dir_name = f"{file_type}s" # Example: 'pdf' becomes 'pdfs'
target_dir = os.path.join(parent_dir, target_dir_name)
# Create target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)
# Construct full target path
target_path = os.path.join(target_dir, filename)
# Only move if source and target paths are different
if file_path != target_path:
try:
shutil.move(file_path, target_path)
print(f"Moved: {filename} -> {target_dir_name}/")
except Exception as e:
print(f"Error moving {filename}: {str(e)}")
def zip_folder(folder_path):
"""Create a zip archive of the specified folder in its parent directory.
Args:
Folder name along with the absolute path
"""
import os
import shutil
try:
parent_dir = os.path.dirname(folder_path)
folder_name = os.path.basename(folder_path)
# Check if the path is a file or a folder
if os.path.isfile(folder_path):
# If it's a file, create a zip archive of the file
shutil.make_archive(os.path.splitext(folder_path)[0], 'zip', root_dir=parent_dir, base_dir=os.path.basename(parent_dir))
print(f"Created {folder_name}.zip in {parent_dir}")
else:
# If it's a folder, create a zip archive of the folder
shutil.make_archive(folder_path, 'zip', parent_dir, folder_name)
print(f"Created {folder_name}.zip in {parent_dir}")
return True
except FileExistsError as e:
print(f"FileExistsError: {str(e)}")
return False
except Exception as e:
print(f"Error zipping folder: {str(e)}")
return False
@email_decorator
def send_reminder_email(email_id, task):
"""Send a reminder email for a task
Args:
email_id: this is a positional argument. Recipient email address should be passed here
task: this is a positional argument, indicates the task description to include in the reminder.
"""
from email.message import EmailMessage
# Create email message
msg = EmailMessage()
msg['Subject'] = 'Task Reminder'
msg['To'] = email_id
msg.set_content(f'Reminder for your task:\n\n{task}')
return msg
@email_decorator
def add_calendar_invite(task, date, email):
"""Create and send a calendar invite for a task on specified date/time to given email.
Args:
task: Task description for the calendar event
date: datetime object for the event start time
email: Recipient email address
"""
import smtplib
import os
from email.message import EmailMessage
import datetime
import uuid
sender_email = os.environ.get('SENDER_EMAIL')
# Generate unique ID for the event
uid = uuid.uuid4()
# Calculate end time (1 hour after start)
start_time = date
end_time = start_time + datetime.timedelta(hours=1)
# Format times in UTC
dtstamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
dtstart = start_time.astimezone(datetime.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
dtend = end_time.astimezone(datetime.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
# Create ICS content
ics_content = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Task Manager//Calendar Invite//EN
BEGIN:VEVENT
UID:{uid}
DTSTAMP:{dtstamp}
DTSTART:{dtstart}
DTEND:{dtend}
SUMMARY:{task}
ORGANIZER:mailto:{sender_email}
ATTENDEE:mailto:{email}
END:VEVENT
END:VCALENDAR
""".replace('\n', '\r\n') # Convert to CRLF line endings
# Create email message
msg = EmailMessage()
msg['Subject'] = f'Calendar Invite: {task}'
msg['To'] = email
msg.set_content('Please find the calendar invite attached.')
# Attach ICS file
msg.add_attachment(ics_content, subtype='calendar', filename='invite.ics')
return msg
import yfinance as yf
import time
import os
import smtplib
from datetime import datetime, timedelta
from email.message import EmailMessage
from apscheduler.schedulers.background import BackgroundScheduler
def get_stock_price(stock_name):
"""Gets the stock price of a given stock
Args:
stock_name: Name of stock as listed in stock market
"""
stock = yf.Ticker(stock_name)
data = stock.history(period='1d')
return data['Close'].iloc[-1]
@email_decorator
def send_stock_email(email, stock_name):
"""sends the price of the given stock to the given email.
Args:
stock_name: Name of stock as listed in stock market
email: Recipient email address
"""
price = get_stock_price(stock_name)
msg = EmailMessage()
msg['Subject'] = f'NVIDIA Stock Price Update: ${price:.2f}'
msg['To'] = email
msg.set_content(f'Current NVIDIA stock price: ${price:.2f}')
return msg
def schedule_daily_stock_email(email, stock_name="NVDA"):
"""Schedules and sends the price of the given stock to the given email everyday at 5 pm.
Args:
stock_name: Name of stock as listed in stock market, Defaut value being Nvidia - NVDA
email: Recipient email address
"""
scheduler = BackgroundScheduler()
scheduler.add_job(
send_stock_email,
'cron',
hour=17,
minute=0,
args=[email, stock_name]
)
scheduler.start()
|