Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import logging | |
| import smtplib | |
| import requests | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| # ------------------ Configuration ------------------ | |
| # It's recommended to place your keys in a .env file and use a library like python-dotenv | |
| # For example: | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| GOOGLE_CX = os.getenv("GOOGLE_CX") | |
| EMAIL_USER = os.getenv("EMAIL_USER") | |
| EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD") | |
| SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com") | |
| SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) | |
| MODEL_ID = "HuggingFaceH4/zephyr-7b-beta" | |
| # Setup basic logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # ------------------ Clients Initialization ------------------ | |
| # Initialize clients and handle potential missing keys gracefully | |
| try: | |
| client = InferenceClient(model=MODEL_ID, token=HF_TOKEN) if HF_TOKEN else None | |
| except Exception as e: | |
| logging.error(f"Failed to initialize Hugging Face client: {e}") | |
| client = None | |
| try: | |
| google_service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) if GOOGLE_API_KEY and GOOGLE_CX else None | |
| except Exception as e: | |
| logging.error(f"Failed to initialize Google Search client: {e}") | |
| google_service = None | |
| # ------------------ Core Functions ------------------ | |
| def generate_outline(topic: str) -> list[str]: | |
| """Generates a 4-point outline for a given topic using HuggingFace chat model.""" | |
| if not client: | |
| return ["❌ Hugging Face client is not initialized."] | |
| sys_prompt = ( | |
| "You are a strategic newsletter planner. Create a structured 4-point outline for a professional newsletter. " | |
| "Each point should be clear, concise, non-overlapping, and based on key aspects of the topic. " | |
| "Avoid any intro or outro. Just give the 4 outline points as bullet list or numbered list." | |
| ) | |
| user_prompt = f"Generate a 4-point newsletter outline for the topic: {topic}" | |
| try: | |
| response = client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": sys_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=300, | |
| temperature=0.5 | |
| ) | |
| raw_lines = response.choices[0].message.content.strip().splitlines() | |
| cleaned = [re.sub(r"^\d+\.\s*|\-\s*", "", line).strip() for line in raw_lines if line.strip()] | |
| return cleaned[:4] # Always limit to 4 points | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 503: | |
| return ["⚠️ HuggingFace model server is temporarily unavailable. Please try again later."] | |
| return [f"❌ HTTP error during outline generation: {e}"] | |
| except Exception as e: | |
| return [f"❌ Unexpected error generating outline: {e}"] | |
| def research_with_google(subtopics: list[str]) -> dict: | |
| """Performs Google search for each subtopic to gather research snippets.""" | |
| if not google_service: | |
| logging.error("Google Search client not initialized.") | |
| return {topic: "Google Search is not configured. Check GOOGLE_API_KEY and GOOGLE_CX." for topic in subtopics} | |
| results = {} | |
| # **ERROR FIX**: Strip whitespace from API key and CX to prevent invalid argument errors. | |
| clean_cx = GOOGLE_CX.strip() | |
| for topic in subtopics: | |
| if "Error:" in topic: # Skip research for error messages from the outline step | |
| results[topic] = "Skipped due to previous error." | |
| continue | |
| try: | |
| res = google_service.cse().list(q=topic, cx=clean_cx, num=3).execute() | |
| snippets = [item.get("snippet", "No snippet available.") for item in res.get("items", [])] | |
| # Format snippets for better readability | |
| formatted_snippets = "\n".join(f"- {s.replace('...', '').strip()}" for s in snippets) | |
| results[topic] = formatted_snippets if formatted_snippets else "No information found for this sub-topic." | |
| time.sleep(1) # Respect API rate limits | |
| except HttpError as e: | |
| logging.error(f"HTTP Error during Google search for '{topic}': {e}") | |
| results[topic] = f"Error: An API error occurred. Details: {e.reason}" | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred during Google search for '{topic}': {e}") | |
| results[topic] = f"Error: An unexpected error occurred." | |
| return results | |
| def draft_newsletter(topic: str, outline: list[str], research: dict) -> str: | |
| """Drafts a newsletter using the topic, outline, and research.""" | |
| if not client: | |
| return "❌ Error: Hugging Face client is not initialized." | |
| sys_prompt = """ | |
| You are a senior medical content writer for a public health newsletter in Australia. Your job is to create informative, professional, and human-readable newsletters for a general audience, including policymakers, practitioners, and the public. | |
| Format: | |
| 1. **Introduction**: Start with a hook, describe why the topic matters today. | |
| 2. **Body**: | |
| - Use 3–4 clear subheadings (e.g., 'Current State', 'Economic Impact', 'Types of Wounds', 'Prevention & Treatment') | |
| - Write in full sentences and paragraphs. Use evidence-based insights. | |
| - Avoid data dumps or unconnected facts. | |
| 3. **Conclusion**: Summarize key insights and include a call to action or forward-looking statement. | |
| Rules: | |
| - Do NOT invent or hallucinate data. | |
| - Use ONLY the research provided. | |
| - Avoid repetitive statements and filler phrases. | |
| - Maintain a consistent voice — neutral, informative, and professional. | |
| - Do not write like a chatbot or script — write like a human editorial article. | |
| - Use clear, simple English suitable for educated adults (grade 10+). | |
| """ | |
| outline_text = "\n".join(f"- {point}" for point in outline) | |
| research_text = "\n\n".join(f"**{key}**:\n{value}" for key, value in research.items()) | |
| user_prompt = f""" | |
| Write a newsletter on the topic: "{topic}". | |
| Use this outline: | |
| {outline_text} | |
| And this research: | |
| {research_text} | |
| """ | |
| try: | |
| response = client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": sys_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=2000, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 503: | |
| return "⚠️ The model server is temporarily unavailable (503). Please try again in a few minutes or switch to a different model." | |
| return f"❌ HTTP error occurred: {e}" | |
| except Exception as e: | |
| return f"❌ Error drafting newsletter: {e}" | |
| def refine_draft(draft: str, instruction: str) -> str: | |
| """Refines a draft based on the user's instruction.""" | |
| if not client: | |
| return "❌ Hugging Face client is not initialized." | |
| sys_prompt = ( | |
| "You are an expert newsletter editor. Follow the user's instruction strictly to revise the draft. " | |
| "Keep structure intact unless the instruction says otherwise. Maintain clarity, professionalism, and conciseness." | |
| ) | |
| user_prompt = f"Instruction: {instruction}\n\nDraft:\n{draft}" | |
| try: | |
| response = client.chat_completion( | |
| messages=[ | |
| {"role": "system", "content": sys_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=2000, | |
| temperature=0.5 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 503: | |
| return "⚠️ Refinement model is temporarily unavailable (503). Please try again later." | |
| return f"❌ HTTP error during refinement: {e}" | |
| except Exception as e: | |
| return f"❌ Error refining draft: {e}" | |
| def send_email(subject: str, body: str, recipients: str): | |
| """Sends the newsletter to a list of recipients.""" | |
| if not all([EMAIL_USER, EMAIL_PASSWORD, SMTP_SERVER, SMTP_PORT]): | |
| return "❌ Error: SMTP email credentials are not fully configured." | |
| recipient_list = [r.strip() for r in recipients.split(',') if "@" in r.strip()] | |
| if not recipient_list: | |
| return "⚠️ Warning: No valid email recipients provided." | |
| sent_count, failed_recipients = 0, [] | |
| for recipient in recipient_list: | |
| try: | |
| msg = MIMEMultipart("alternative") | |
| msg["Subject"] = subject | |
| msg["From"] = EMAIL_USER | |
| msg["To"] = recipient | |
| # ✅ FIX: Properly terminate the string and format line breaks for HTML | |
| html_body = body.replace("\n", "<br>") | |
| msg.attach(MIMEText(html_body, "html", "utf-8")) | |
| with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server: | |
| server.login(EMAIL_USER, EMAIL_PASSWORD) | |
| server.sendmail(EMAIL_USER, recipient, msg.as_string()) | |
| sent_count += 1 | |
| except Exception as e: | |
| logging.error(f"Failed to send email to {recipient}: {e}") | |
| failed_recipients.append(recipient) | |
| if not failed_recipients: | |
| return f"✅ Successfully sent to {sent_count} recipient(s)." | |
| else: | |
| return f"✅ Sent to {sent_count}, ❌ Failed for: {', '.join(failed_recipients)}" | |
| # ------------------ Gradio UI ------------------ | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange")) as app: | |
| gr.Markdown("# 🧠 AI Newsletter Generator") | |
| gr.Markdown("Follow the steps below to generate, refine, and send your newsletter.") | |
| status_display = gr.Textbox(label="Status", value="Idle. Enter a topic to begin.", interactive=False) | |
| with gr.Tabs() as tabs: | |
| with gr.TabItem("Step 1: Generate Draft", id=0): | |
| with gr.Row(): | |
| topic_input = gr.Textbox(label="Newsletter Topic", placeholder="e.g., The Future of Renewable Energy", scale=3) | |
| generate_btn = gr.Button("Generate Draft", variant="primary", scale=1) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| outline_display = gr.Markdown(label="Generated Outline") | |
| research_display = gr.Markdown(label="Research Snippets") | |
| draft_output = gr.Textbox(label="Generated Draft", lines=20, interactive=True) | |
| with gr.TabItem("Step 2: Refine Draft", id=1): | |
| gr.Markdown("Use this section to iteratively improve your draft.") | |
| refined_output = gr.Textbox(label="Editable Draft", lines=20, interactive=True) | |
| refine_instruction = gr.Textbox(label="Refinement Instruction", placeholder="e.g., Make the tone more optimistic and add a call-to-action.") | |
| refine_btn = gr.Button("Refine", variant="primary") | |
| with gr.TabItem("Step 3: Send Email", id=2): | |
| gr.Markdown("Finalize the subject and recipients, then send your newsletter.") | |
| email_subject = gr.Textbox(label="Email Subject", value="Your Weekly AI-Generated Newsletter") | |
| email_input = gr.Textbox(label="Recipients (comma-separated)", placeholder="user1@example.com, user2@example.com") | |
| final_draft_display = gr.Textbox(label="Final Newsletter Body", lines=15, interactive=True) | |
| send_btn = gr.Button("Send Email", variant="primary") | |
| start_over_btn = gr.Button("Start Over") | |
| # ------------------ Gradio Event Logic ------------------ | |
| def do_generation(topic): | |
| if not topic.strip(): | |
| # Use a dictionary to update components by reference | |
| return {status_display: gr.update(value="⚠️ Please enter a topic first.")} | |
| # Step 1: Generate Outline | |
| yield {status_display: gr.update(value="1/4: Generating outline...")} | |
| outline = generate_outline(topic) | |
| yield { | |
| status_display: gr.update(value="2/4: Researching sub-topics..."), | |
| outline_display: gr.update(value="### Outline\n" + "\n".join(f"- {o}" for o in outline)) | |
| } | |
| # Step 2: Research | |
| research = research_with_google(outline) | |
| research_md = "### Research\n" + "\n\n".join(f"**{k}**\n{v}" for k, v in research.items()) | |
| yield { | |
| status_display: gr.update(value="3/4: Drafting newsletter..."), | |
| research_display: gr.update(value=research_md) | |
| } | |
| # Step 3: Draft | |
| draft = draft_newsletter(topic, outline, research) | |
| yield { | |
| status_display: gr.update(value="✅ Draft generated successfully! You can now refine it in Step 2."), | |
| draft_output: gr.update(value=draft), | |
| refined_output: gr.update(value=draft), # Populate the refine tab | |
| final_draft_display: gr.update(value=draft), # Populate the email tab | |
| tabs: gr.update(selected=1) # Switch to refine tab | |
| } | |
| def do_refinement(draft, instruction): | |
| if not instruction.strip(): | |
| return {status_display: gr.update(value="⚠️ Please enter a refinement instruction.")} | |
| yield {status_display: gr.update(value="Refining draft...")} | |
| refined_text = refine_draft(draft, instruction) | |
| yield { | |
| status_display: gr.update(value="✅ Draft refined! You can refine again or proceed to send."), | |
| refined_output: gr.update(value=refined_text), | |
| final_draft_display: gr.update(value=refined_text) # Also update the email tab | |
| } | |
| def do_send_email(subject, body, recipients): | |
| yield {status_display: gr.update(value="Sending email...")} | |
| result = send_email(subject, body, recipients) | |
| yield {status_display: gr.update(value=result)} | |
| def do_start_over(): | |
| return { | |
| topic_input: "", | |
| outline_display: "", | |
| research_display: "", | |
| draft_output: "", | |
| refined_output: "", | |
| refine_instruction: "", | |
| final_draft_display: "", | |
| email_input: "", | |
| status_display: "Idle. Enter a topic to begin.", | |
| tabs: gr.update(selected=0) | |
| } | |
| # Link components to functions | |
| generate_btn.click( | |
| fn=do_generation, | |
| inputs=[topic_input], | |
| outputs=[status_display, outline_display, research_display, draft_output, refined_output, final_draft_display, tabs] | |
| ) | |
| refine_btn.click( | |
| fn=do_refinement, | |
| inputs=[refined_output, refine_instruction], | |
| outputs=[status_display, refined_output, final_draft_display] | |
| ) | |
| send_btn.click( | |
| fn=do_send_email, | |
| inputs=[email_subject, final_draft_display, email_input], | |
| outputs=[status_display] | |
| ) | |
| start_over_btn.click( | |
| fn=do_start_over, | |
| inputs=[], | |
| outputs=[topic_input, outline_display, research_display, draft_output, refined_output, refine_instruction, final_draft_display, email_input, status_display, tabs] | |
| ) | |
| # Sync text between draft, refine, and send tabs | |
| draft_output.change(lambda x: x, inputs=draft_output, outputs=refined_output) | |
| draft_output.change(lambda x: x, inputs=draft_output, outputs=final_draft_display) | |
| refined_output.change(lambda x: x, inputs=refined_output, outputs=final_draft_display) | |
| if __name__ == "__main__": | |
| app.queue().launch(debug=True) | |