Spaces:
Sleeping
Sleeping
| import os, mimetypes, json, tempfile, time, pathlib | |
| import boto3 | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| # --------------------------------------------------------------------- | |
| # Environment | |
| # --------------------------------------------------------------------- | |
| AWS_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-2") # bucket/SES region | |
| S3_BUCKET = os.getenv("AWS_S3_BUCKET") | |
| SES_SENDER = os.getenv("SES_SENDER_EMAIL") | |
| # Public read? (requires bucket policy and public access allowed) | |
| AWS_S3_PUBLIC = os.getenv("AWS_S3_PUBLIC", "0").lower() in {"1", "true", "yes"} | |
| # Simple free-tier safety rails (per day) | |
| FREE_S3_MAX_UPLOADS = int(os.getenv("FREE_S3_MAX_UPLOADS", "10")) | |
| FREE_S3_MAX_MB = int(os.getenv("FREE_S3_MAX_MB", "25")) | |
| FREE_SES_MAX_EMAILS = int(os.getenv("FREE_SES_MAX_EMAILS", "10")) | |
| # Create clients only if creds exist | |
| s3_client = boto3.client("s3", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None | |
| ses_client = boto3.client("ses", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None | |
| comp_client = boto3.client("comprehend", region_name=AWS_REGION) if os.getenv("AWS_ACCESS_KEY_ID") else None | |
| # --------------------------------------------------------------------- | |
| # Small daily counters (temp dir) to avoid exceeding free tier | |
| # --------------------------------------------------------------------- | |
| _COUNTER_DIR = pathlib.Path(tempfile.gettempdir()) / "newsintel_counters" | |
| _COUNTER_DIR.mkdir(exist_ok=True) | |
| def _rollover_counter(name: str): | |
| path = _COUNTER_DIR / f"{name}.json" | |
| today = time.strftime("%Y-%m-%d") | |
| if path.exists(): | |
| blob = json.loads(path.read_text() or "{}") | |
| if blob.get("day") != today: | |
| blob = {"day": today, "count": 0} | |
| else: | |
| blob = {"day": today, "count": 0} | |
| return path, blob | |
| def _bump(name: str) -> int: | |
| path, blob = _rollover_counter(name) | |
| blob["count"] += 1 | |
| path.write_text(json.dumps(blob)) | |
| return blob["count"] | |
| # --------------------------------------------------------------------- | |
| # Comprehend helpers | |
| # --------------------------------------------------------------------- | |
| SUPPORTED = {"en", "es", "de", "fr", "it", "pt"} | |
| _COMP_MAX = 4500 | |
| def _safe_text(t: str) -> str: | |
| t = (t or "").strip() | |
| return t[:_COMP_MAX] | |
| def detect_language(text: str) -> str: | |
| if not comp_client: | |
| return "en" | |
| try: | |
| langs = comp_client.detect_dominant_language(Text=_safe_text(text)).get("Languages", []) | |
| lc = langs[0]["LanguageCode"] if langs else "en" | |
| return lc if lc in SUPPORTED else "en" | |
| except (BotoCoreError, ClientError): | |
| return "en" | |
| def analyze_text(text: str, lang: str = "en"): | |
| if not comp_client: | |
| return {"sentiment": "NEUTRAL", "entities": [], "key_phrases": []} | |
| t = _safe_text(text) | |
| try: | |
| sent = comp_client.detect_sentiment(Text=t, LanguageCode=lang).get("Sentiment", "NEUTRAL") | |
| ents = comp_client.detect_entities(Text=t, LanguageCode=lang).get("Entities", []) | |
| keys = comp_client.detect_key_phrases(Text=t, LanguageCode=lang).get("KeyPhrases", []) | |
| return {"sentiment": sent, "entities": ents, "key_phrases": keys} | |
| except (BotoCoreError, ClientError): | |
| return {"sentiment": "NEUTRAL", "entities": [], "key_phrases": []} | |
| # --------------------------------------------------------------------- | |
| # S3 upload (with safety guards) | |
| # --------------------------------------------------------------------- | |
| def s3_upload(file_path: str, key_prefix: str = "newsintel/") -> str | None: | |
| """ | |
| Upload a file to S3. | |
| - If AWS_S3_PUBLIC=1 (and bucket policy allows), returns an https URL. | |
| - Otherwise returns an s3:// URI (private). | |
| - Free-tier guard: caps per-day uploads and file size. | |
| """ | |
| if not (s3_client and S3_BUCKET and file_path and os.path.isfile(file_path)): | |
| return None | |
| # size guard | |
| mb = os.path.getsize(file_path) / (1024*1024) | |
| if mb > FREE_S3_MAX_MB: | |
| return None | |
| # daily count guard | |
| c_path, c_blob = _rollover_counter("s3") | |
| if c_blob["count"] >= FREE_S3_MAX_UPLOADS: | |
| return None | |
| key = f"{key_prefix}{os.path.basename(file_path)}" | |
| extra = {"ContentType": mimetypes.guess_type(file_path)[0] or "application/octet-stream"} | |
| if AWS_S3_PUBLIC: | |
| extra["ACL"] = "public-read" | |
| try: | |
| s3_client.upload_file(file_path, S3_BUCKET, key, ExtraArgs=extra) | |
| _bump("s3") | |
| if AWS_S3_PUBLIC: | |
| return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}" | |
| else: | |
| return f"s3://{S3_BUCKET}/{key}" | |
| except (BotoCoreError, ClientError): | |
| return None | |
| # --------------------------------------------------------------------- | |
| # SES email (with safety guard) | |
| # --------------------------------------------------------------------- | |
| def ses_send_email(recipient: str, subject: str, html_body: str) -> bool: | |
| """ | |
| Send an HTML email via SES. Requires SES_SENDER to be verified (and recipient if in sandbox). | |
| Free-tier guard: caps per-day emails. | |
| """ | |
| if not (ses_client and SES_SENDER and recipient): | |
| return False | |
| c_path, c_blob = _rollover_counter("ses") | |
| if c_blob["count"] >= FREE_SES_MAX_EMAILS: | |
| return False | |
| try: | |
| ses_client.send_email( | |
| Source=SES_SENDER, | |
| Destination={"ToAddresses": [recipient]}, | |
| Message={ | |
| "Subject": {"Data": subject}, | |
| "Body": {"Html": {"Data": html_body}} | |
| }, | |
| ) | |
| _bump("ses") | |
| return True | |
| except (BotoCoreError, ClientError): | |
| return False | |