newsintel-agent / aws.py
Hasitha16's picture
Upload 11 files
ed42ca4 verified
import os, mimetypes, json, tempfile, time, pathlib
import boto3
from botocore.exceptions import BotoCoreError, ClientError
# ---------------------------------------------------------------------
# Environment
# ---------------------------------------------------------------------
AWS_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-2") # bucket/SES region
S3_BUCKET = os.getenv("AWS_S3_BUCKET")
SES_SENDER = os.getenv("SES_SENDER_EMAIL")
# Public read? (requires bucket policy and public access allowed)
AWS_S3_PUBLIC = os.getenv("AWS_S3_PUBLIC", "0").lower() in {"1", "true", "yes"}
# Simple free-tier safety rails (per day)
FREE_S3_MAX_UPLOADS = int(os.getenv("FREE_S3_MAX_UPLOADS", "10"))
FREE_S3_MAX_MB = int(os.getenv("FREE_S3_MAX_MB", "25"))
FREE_SES_MAX_EMAILS = int(os.getenv("FREE_SES_MAX_EMAILS", "10"))
# Create clients only if creds exist
s3_client = boto3.client("s3", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None
ses_client = boto3.client("ses", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None
comp_client = boto3.client("comprehend", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None
# ---------------------------------------------------------------------
# Small daily counters (temp dir) to avoid exceeding free tier
# ---------------------------------------------------------------------
_COUNTER_DIR = pathlib.Path(tempfile.gettempdir()) / "newsintel_counters"
_COUNTER_DIR.mkdir(exist_ok=True)
def _rollover_counter(name: str):
path = _COUNTER_DIR / f"{name}.json"
today = time.strftime("%Y-%m-%d")
if path.exists():
blob = json.loads(path.read_text() or "{}")
if blob.get("day") != today:
blob = {"day": today, "count": 0}
else:
blob = {"day": today, "count": 0}
return path, blob
def _bump(name: str) -> int:
path, blob = _rollover_counter(name)
blob["count"] += 1
path.write_text(json.dumps(blob))
return blob["count"]
# ---------------------------------------------------------------------
# Comprehend helpers
# ---------------------------------------------------------------------
SUPPORTED = {"en", "es", "de", "fr", "it", "pt"}
_COMP_MAX = 4500
def _safe_text(t: str) -> str:
t = (t or "").strip()
return t[:_COMP_MAX]
def detect_language(text: str) -> str:
if not comp_client:
return "en"
try:
langs = comp_client.detect_dominant_language(Text=_safe_text(text)).get("Languages", [])
lc = langs[0]["LanguageCode"] if langs else "en"
return lc if lc in SUPPORTED else "en"
except (BotoCoreError, ClientError):
return "en"
def analyze_text(text: str, lang: str = "en"):
if not comp_client:
return {"sentiment": "NEUTRAL", "entities": [], "key_phrases": []}
t = _safe_text(text)
try:
sent = comp_client.detect_sentiment(Text=t, LanguageCode=lang).get("Sentiment", "NEUTRAL")
ents = comp_client.detect_entities(Text=t, LanguageCode=lang).get("Entities", [])
keys = comp_client.detect_key_phrases(Text=t, LanguageCode=lang).get("KeyPhrases", [])
return {"sentiment": sent, "entities": ents, "key_phrases": keys}
except (BotoCoreError, ClientError):
return {"sentiment": "NEUTRAL", "entities": [], "key_phrases": []}
# ---------------------------------------------------------------------
# S3 upload (with safety guards)
# ---------------------------------------------------------------------
def s3_upload(file_path: str, key_prefix: str = "newsintel/") -> str | None:
"""
Upload a file to S3.
- If AWS_S3_PUBLIC=1 (and bucket policy allows), returns an https URL.
- Otherwise returns an s3:// URI (private).
- Free-tier guard: caps per-day uploads and file size.
"""
if not (s3_client and S3_BUCKET and file_path and os.path.isfile(file_path)):
return None
# size guard
mb = os.path.getsize(file_path) / (1024*1024)
if mb > FREE_S3_MAX_MB:
return None
# daily count guard
c_path, c_blob = _rollover_counter("s3")
if c_blob["count"] >= FREE_S3_MAX_UPLOADS:
return None
key = f"{key_prefix}{os.path.basename(file_path)}"
extra = {"ContentType": mimetypes.guess_type(file_path)[0] or "application/octet-stream"}
if AWS_S3_PUBLIC:
extra["ACL"] = "public-read"
try:
s3_client.upload_file(file_path, S3_BUCKET, key, ExtraArgs=extra)
_bump("s3")
if AWS_S3_PUBLIC:
return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}"
else:
return f"s3://{S3_BUCKET}/{key}"
except (BotoCoreError, ClientError):
return None
# ---------------------------------------------------------------------
# SES email (with safety guard)
# ---------------------------------------------------------------------
def ses_send_email(recipient: str, subject: str, html_body: str) -> bool:
"""
Send an HTML email via SES. Requires SES_SENDER to be verified (and recipient if in sandbox).
Free-tier guard: caps per-day emails.
"""
if not (ses_client and SES_SENDER and recipient):
return False
c_path, c_blob = _rollover_counter("ses")
if c_blob["count"] >= FREE_SES_MAX_EMAILS:
return False
try:
ses_client.send_email(
Source=SES_SENDER,
Destination={"ToAddresses": [recipient]},
Message={
"Subject": {"Data": subject},
"Body": {"Html": {"Data": html_body}}
},
)
_bump("ses")
return True
except (BotoCoreError, ClientError):
return False