Devang1290
feat: deploy News Whisper on-demand search API (FastAPI + Docker)
2cb327c
import json
import os
import sys
from pathlib import Path
from datetime import datetime
from typing import List, Dict
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
from backend.core.logger import logger
from backend.core.config import config
from backend.services.cloud import upload_file
from backend.services.database import DatabaseManager
class DeliveryService:
def __init__(self):
self.db = DatabaseManager()
def _get_timestamp_folder(self) -> str:
now = datetime.now()
day = now.day
month = now.strftime("%b").lower()
hour = now.strftime("%I").lstrip("0")
minute = now.strftime("%M")
am_pm = now.strftime("%p").lower()
return f"{day}_{month}_{hour}_{minute}_{am_pm}"
def get_audio_output_dir(self, language: str, target: str, is_search: bool) -> Path:
"""Determines the local directory for saving TTS audios."""
parent_folder = "search_queries" if is_search else "categories"
safe_target = target.replace(" ", "_").lower()
timestamp = self._get_timestamp_folder()
path = config.AUDIOS_DIR / language / parent_folder / safe_target / timestamp
path.mkdir(parents=True, exist_ok=True)
return path
def filter_processed(self, articles: List[Dict], disable_dedup: bool = False) -> List[Dict]:
"""Filters articles against Supabase deduplication registry."""
if disable_dedup:
return articles
return self.db.filter_unprocessed(articles)
def deliver(self, articles: List[Dict], language: str, target: str, is_search: bool) -> None:
"""
Coordinates final delivery:
1. Cloudinary Audio Uploads
2. Clean up dicts (remove local paths)
3. Save JSON locally
4. Upload JSON to Cloudinary
5. Insert to Supabase
"""
if not articles:
logger.info("No articles to deliver.")
return
logger.info(f"Starting delivery phase for {len(articles)} articles...")
parent_folder = "search_queries" if is_search else "categories"
safe_target = target.replace(" ", "_").lower()
timestamp = self._get_timestamp_folder()
# 1. Cloudinary Audio Uploads
for idx, article in enumerate(articles):
local_audio = article.get("local_audio_path")
if local_audio and os.path.exists(local_audio):
# Calculate remote folder path: audios/english/categories/sports/timestamp
cloud_folder = f"audios/{language}/{parent_folder}/{safe_target}/{timestamp}"
audio_url = upload_file(local_audio, cloud_folder, resource_type="auto")
if audio_url:
article["audio_url"] = audio_url
# 2. Clean Dicts (remove local_audio_path)
for article in articles:
if "local_audio_path" in article:
del article["local_audio_path"]
# 3. Save JSON locally
json_dir = config.SUMMARIZED_DIR / language / parent_folder / safe_target
json_dir.mkdir(parents=True, exist_ok=True)
json_path = json_dir / f"{timestamp}.json"
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(articles, f, indent=2, ensure_ascii=False)
logger.success(f"Saved final JSON to {json_path}")
# 4. Upload JSON to Cloudinary
json_cloud_folder = f"summarized-articles/{language}/{parent_folder}/{safe_target}"
upload_file(str(json_path), json_cloud_folder, resource_type="raw")
except Exception as e:
logger.error(f"Failed to save/upload local JSON: {e}")
# 5. Insert to Supabase DB
db_success_count = 0
for article in articles:
# We insert to DB regardless of audio_url presence (some articles might not have audio)
if self.db.insert_article(article):
db_success_count += 1
logger.success(f"Successfully inserted {db_success_count}/{len(articles)} articles into Database.")
logger.success("Delivery Phase Complete.")