# app.py import os import datetime import logging from typing import List, Dict, Optional, Any, Tuple from dotenv import load_dotenv load_dotenv() import gradio as gr from google import genai from google.genai import types from asknews_sdk import AskNewsSDK DEFAULT_MODEL = "gemini-2.0-flash" DEFAULT_SYSTEM_PROMPT = """Tu es un assistant virtuel conçu pour aider des journalistes d’agence (Agence France-Presse) dans leurs recherches d’information. Sources : - Tu disposes d’un agent de recherche en langage naturel (Asknews) qui interroge en temps réel le flux des dépêches AFP. - Tu dois répondre uniquement avec des informations issues de ces dépêches. Mission : - Comprendre les requêtes d’un journaliste (souvent courtes, imprécises, ou en langage naturel). - Transformer ces requêtes en recherches efficaces dans les dépêches AFP, avec Asknews. - Résumer les résultats en style journalistique : factuel, concis, hiérarchisé, neutre. - Proposer, si pertinent, des angles complémentaires (ex. contexte historique, réactions, comparaisons, chiffres clés). - Permettre au journaliste de raffiner la recherche (par période, sujet, acteurs, pays). - Citer les dépêches AFP en retour (référence et date/heure). Contraintes : - Toujours rester factuel, éviter toute spéculation. - Si la question est ambiguë, demander des précisions. - Si aucun résultat n’est trouvé, proposer des formulations alternatives de recherche. - Résumer les informations de manière actionnable (pour rédaction immédiate). Style : - Réponses brèves et efficaces. - Donner un résumé clair d’abord (les 2–3 points clés). - Ajouter ensuite plus de détails, ou des pistes pour approfondir. - Toujours indiquer les sources/dépêches AFP d’où viennent les infos. """ INITIAL_SOURCES_MARKDOWN = "*Aucune source pour l'instant.*" LOG_LEVEL = os.getenv("ASKNEWS_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO)) logger = logging.getLogger("asknews_app") def format_pub_date(published): if isinstance(published, datetime.datetime): return published.strftime("%Y-%m-%d") if isinstance(published, datetime.date): return published.strftime("%Y-%m-%d") if isinstance(published, str): try: return datetime.datetime.fromisoformat(published).strftime("%Y-%m-%d") except ValueError: return "unknown date" return "unknown date" # ---- AskNews setup ---- def get_asknews_sdk() -> Optional[AskNewsSDK]: """ Initialize AskNews SDK using environment variables. Returns None if missing credentials. """ client_id = os.getenv("ASKNEWS_CLIENT_ID", "").strip() client_secret = os.getenv("ASKNEWS_CLIENT_SECRET", "").strip() if not client_id or not client_secret: logger.warning("AskNews credentials are missing; skipping SDK init.") return None try: sdk = AskNewsSDK( client_id=client_id, client_secret=client_secret, scopes=["news"] ) logger.info("AskNews SDK initialised successfully.") return sdk except Exception as exc: logger.exception("Failed to initialise AskNews SDK: %s", exc) return None def fetch_asknews_context( sdk: AskNewsSDK, query: str, hours_back: int, n_articles: int, domains: List[str], method: str, diversify_sources: bool, languages: List[str], ) -> Tuple[str, List[Dict[str, Any]]]: """ Récupère le contexte texte directement depuis AskNews (return_type="string"). Retourne context_text """ logger.info( "Fetching AskNews context: query=%s, hours_back=%s, n_articles=%s, domains=%s, method=%s, diversify=%s, languages=%s", query, hours_back, n_articles, domains, method, diversify_sources, languages, ) try: kwargs: Dict[str, Any] = { "query": query, "hours_back": hours_back, "n_articles": n_articles, "historical": True, "premium": True, "method": method, "domain_url": domains if domains else None, "return_type": "both", } if diversify_sources: kwargs["diversify_sources"] = True if languages: kwargs["languages"] = languages response = sdk.news.search_news(**kwargs) context_text = getattr(response, "as_string", "") or "" raw_dicts = getattr(response, "as_dicts", None) articles: List[Dict[str, Any]] = [] if isinstance(raw_dicts, list): parsed_articles: List[Dict[str, Any]] = [] for item in raw_dicts: if isinstance(item, dict): parsed_articles.append(item) continue if hasattr(item, "model_dump"): try: data = item.model_dump(by_alias=True) if isinstance(data, dict): parsed_articles.append(data) continue except Exception: logger.debug("model_dump(by_alias=True) failed for article", exc_info=True) if hasattr(item, "dict"): try: data = item.dict(by_alias=True) if isinstance(data, dict): parsed_articles.append(data) continue except Exception: logger.debug("dict(by_alias=True) failed for article", exc_info=True) try: parsed_articles.append(dict(item)) except Exception: logger.debug("Fallback dict() conversion failed for article", exc_info=True) articles = parsed_articles logger.info( "AskNews context received (%s chars, %s articles)", len(context_text), len(articles), ) return context_text, articles except Exception: logger.exception("AskNews context fetch failed.") return "", [] def parse_languages_csv(csv_input: str) -> List[str]: return [lang.strip() for lang in csv_input.split(",") if lang.strip()] def format_sources_markdown(articles: List[Dict[str, Any]]) -> str: if not articles: return "*Aucune source disponible pour cette requête.*" lines: List[str] = [] for article in articles: title = article.get("title") source = article.get("markdown_citation") key = article.get("as_string_key") published = article.get("pub_date") line = f"{key}. {format_pub_date(published)} - {title}" if source: line += f"\n {source}" lines.append(line) return "\n\n".join(lines) # ---- Chat respond function ---- def respond( message: str, history: Optional[List[Tuple[str, str]]], system_message: str, max_tokens: int, temperature: float, top_p: float, model_name: str, google_api_key: str, use_asknews: bool, asknews_hours_back: int, asknews_n_articles: int, asknews_domains_csv: str, asknews_method: str, asknews_diversify_sources: bool, asknews_languages_csv: str, ): """ Stream chat responses from Google Gemini, enriching with AskNews context when enabled. Returns updates for both the chatbot conversation and the sources panel. """ conversation_history: List[Tuple[str, str]] = list(history or []) user_message = (message or "").strip() if not user_message: logger.debug("Empty user message received.") yield conversation_history, conversation_history, format_sources_markdown([]) return api_key = (google_api_key or "").strip() or os.getenv("GOOGLE_API_KEY", "").strip() if not api_key: warning = ( "Définissez GOOGLE_API_KEY dans votre environnement ou saisissez la clé API Google Gemini dans le champ dédié." ) logger.warning("Missing Google API key.") conversation_history.append((user_message, warning)) yield conversation_history, conversation_history, format_sources_markdown([]) return try: genai_client = genai.Client(api_key=api_key) except Exception as exc: logger.exception("Failed to initialise Google GenAI client: %s", exc) error_msg = f"Échec d'initialisation du client Google GenAI: {exc}" conversation_history.append((user_message, error_msg)) yield conversation_history, conversation_history, format_sources_markdown([]) return domains = [d.strip() for d in (asknews_domains_csv or "").split(",") if d.strip()] method = (asknews_method or "both").lower() if method not in {"nl", "kw", "both"}: method = "both" languages = parse_languages_csv(asknews_languages_csv or "") diversify_sources = bool(asknews_diversify_sources) asknews_context_text = "" asknews_articles: List[Dict[str, Any]] = [] asknews_notice = "" if use_asknews: sdk = get_asknews_sdk() if sdk is None: asknews_notice = ( "[AskNews non configuré: définissez ASKNEWS_CLIENT_ID et ASKNEWS_CLIENT_SECRET dans l'environnement.]" ) logger.warning("AskNews SDK unavailable while use_asknews is True.") else: asknews_context_text, asknews_articles = fetch_asknews_context( sdk=sdk, query=user_message, hours_back=int(asknews_hours_back), n_articles=int(asknews_n_articles), domains=domains, method=method, diversify_sources=diversify_sources, languages=languages, ) if asknews_context_text: logger.info( "AskNews context ready (chars=%s, articles=%s)", len(asknews_context_text), len(asknews_articles), ) else: logger.warning("AskNews context is empty after fetch.") else: asknews_notice = "[AskNews désactivé pour cette requête.]" if use_asknews: if asknews_articles: sources_markdown = format_sources_markdown(asknews_articles) elif asknews_notice: sources_markdown = asknews_notice + "\n\n" + INITIAL_SOURCES_MARKDOWN else: sources_markdown = format_sources_markdown([]) else: sources_markdown = "*AskNews désactivé.*" base_system = system_message.strip() if system_message else DEFAULT_SYSTEM_PROMPT conversation_history.append((user_message, "")) assistant_reply = "" if asknews_notice: assistant_reply += asknews_notice.strip() # if asknews_context_text: # context_display = asknews_context_text.strip() # truncated = False # if len(context_display) > 4000: # context_display = context_display[:4000] + "\n[Contexte AskNews tronqué pour affichage]" # truncated = True # if assistant_reply: # assistant_reply += "\n\n" # assistant_reply += "[Contexte AskNews]\n" + (context_display or "[Vide]") # if not truncated: # assistant_reply += "\n" # elif not assistant_reply and use_asknews: # assistant_reply = "[Contexte AskNews introuvable pour cette requête.]" conversation_history[-1] = (user_message, assistant_reply) yield conversation_history, conversation_history, sources_markdown system_instruction = base_system if asknews_context_text: system_instruction += ( "\n\nUtilise le contexte AskNews suivant pour ta réponse. Si la question est sans rapport, ignore ce contexte.\n" f"{asknews_context_text}" ) conversation: List[types.Content] = [] for past_user, past_assistant in conversation_history[:-1]: past_user_clean = (past_user or "").strip() past_assistant_clean = (past_assistant or "").strip() if past_user_clean: conversation.append( types.Content(role="user", parts=[types.Part.from_text(text=past_user_clean)]) ) if past_assistant_clean: conversation.append( types.Content(role="model", parts=[types.Part.from_text(text=past_assistant_clean)]) ) conversation.append( types.Content(role="user", parts=[types.Part.from_text(text=user_message)]) ) generation_config = types.GenerateContentConfig( systemInstruction=system_instruction, temperature=float(temperature), topP=float(top_p), maxOutputTokens=int(max_tokens), ) assistant_full_reply = assistant_reply try: stream = genai_client.models.generate_content_stream( model=(model_name or DEFAULT_MODEL).strip() or DEFAULT_MODEL, contents=conversation, config=generation_config, ) for chunk in stream: token = getattr(chunk, "text", None) if not token and getattr(chunk, "candidates", None): pieces: List[str] = [] for candidate in chunk.candidates: content = getattr(candidate, "content", None) if content and getattr(content, "parts", None): for part in content.parts: text_piece = getattr(part, "text", None) if text_piece: pieces.append(text_piece) token = "".join(pieces) if not token: continue assistant_full_reply += token conversation_history[-1] = (user_message, assistant_full_reply) yield conversation_history, conversation_history, sources_markdown except Exception as exc: logger.exception("Google GenAI generation failed: %s", exc) error_suffix = f"\n\n[Erreur: {exc}]" assistant_full_reply = (assistant_full_reply or "") + error_suffix conversation_history[-1] = (user_message, assistant_full_reply) yield conversation_history, conversation_history, sources_markdown def clear_conversation() -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], str]: """Reset the chat history and sources panel.""" return [], [], INITIAL_SOURCES_MARKDOWN # ---- Gradio UI ---- with gr.Blocks(title="AskNews Gemini") as demo: gr.Markdown("# Chatbot Gemini avec contexte AskNews") chat_state = gr.State([]) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot(label="Conversation", height=520) user_input = gr.Textbox( label="Message", placeholder="Saisissez votre requête journalistique...", lines=3, ) with gr.Row(): send_button = gr.Button("Envoyer", variant="primary") clear_button = gr.Button("Effacer la conversation") with gr.Accordion("Paramètres", open=False): system_message_box = gr.Textbox( value=DEFAULT_SYSTEM_PROMPT, label="System message", lines=20, ) max_tokens_slider = gr.Slider( minimum=1, maximum=4096, value=4096, step=100, label="Max new tokens", ) temperature_slider = gr.Slider( minimum=0.0, maximum=2.0, value=0.7, step=0.1, label="Temperature", ) top_p_slider = gr.Slider( minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p", ) model_name_box = gr.Textbox(value=DEFAULT_MODEL, label="Model name") google_api_key_box = gr.Textbox( value="", label="Google API Key (optionnel)", type="password", ) use_asknews_checkbox = gr.Checkbox( value=True, label="Utiliser AskNews pour le contexte", ) asknews_hours_slider = gr.Slider( minimum=1, maximum=24 * 120, value=24 * 120, step=24, label="AskNews: heures en arrière", ) asknews_articles_slider = gr.Slider( minimum=1, maximum=50, value=10, step=1, label="AskNews: nombre d'articles", ) asknews_domains_box = gr.Textbox( value="afp.com", label="AskNews: domaines (CSV)", ) asknews_method_radio = gr.Radio( choices=["both", "nl", "kw"], value="both", label="AskNews: méthode de recherche", ) asknews_diversify_checkbox = gr.Checkbox( value=False, label="AskNews: diversifier les sources", ) asknews_languages_box = gr.Textbox( value="", label="AskNews: langues (codes CSV)", ) with gr.Column(scale=2): gr.Markdown("### Sources AskNews") sources_panel = gr.Markdown(INITIAL_SOURCES_MARKDOWN) input_components = [ user_input, chat_state, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, model_name_box, google_api_key_box, use_asknews_checkbox, asknews_hours_slider, asknews_articles_slider, asknews_domains_box, asknews_method_radio, asknews_diversify_checkbox, asknews_languages_box, ] output_components = [chatbot, chat_state, sources_panel] def _reset_input() -> str: return "" send_event = user_input.submit( respond, inputs=input_components, outputs=output_components, queue=True, ) send_event.then(_reset_input, inputs=None, outputs=user_input) button_event = send_button.click( respond, inputs=input_components, outputs=output_components, queue=True, ) button_event.then(_reset_input, inputs=None, outputs=user_input) clear_button.click(clear_conversation, None, output_components).then( _reset_input, inputs=None, outputs=user_input ) demo.queue() if __name__ == "__main__": demo.launch()