Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import FileResponse | |
| import json | |
| from dotenv import load_dotenv | |
| import time | |
| import uuid | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| import os | |
| import threading | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| from langchain_google_genai import GoogleGenerativeAI | |
| app = FastAPI() | |
| # Global variables to track answering status | |
| answering_status = { | |
| "is_running": False, | |
| "start_time": None, | |
| "processed_questions": 0, | |
| "total_questions": 0, | |
| "answers_generated": 0, | |
| "completed": False, | |
| "result_file": None, | |
| "error": None | |
| } | |
| answering_lock = threading.Lock() | |
| def generate_answer_for_question(question_data: Dict, model="gemini-2.0-flash") -> Dict: | |
| """ | |
| Generate an answer for a given question using the Gemini API. | |
| Args: | |
| question_data (Dict): The question data including question, chunk_text, and type. | |
| Returns: | |
| Dict: The question data with an added answer field. | |
| """ | |
| prompt = f""" | |
| À partir du texte suivant et de la question donnée, fournissez une réponse précise et concise en français. | |
| Si with_citations=true, incluez des références spécifiques au texte dans la réponse. | |
| Retournez uniquement la réponse au format texte. | |
| Texte : {question_data["chunk_text"]} | |
| Question : {question_data["question"]} | |
| Type : {question_data["type"]} | |
| """ | |
| try: | |
| llm = GoogleGenerativeAI( | |
| model=model, | |
| google_api_key=os.getenv("GOOGLE_API_KEY") | |
| ) | |
| response = llm.invoke(prompt) | |
| answer_text = str(response).strip() | |
| # Update the question data with the answer | |
| question_data["answer"] = answer_text | |
| # Update the global status | |
| with answering_lock: | |
| answering_status["answers_generated"] += 1 | |
| return question_data | |
| except Exception as e: | |
| print(f"Erreur lors de la génération de la réponse pour la question {question_data['question_id']}: {e}") | |
| question_data["answer"] = "Erreur lors de la génération de la réponse." | |
| return question_data | |
| def answer_questions_in_background(questions: List[Dict]): | |
| """ | |
| Generate answers in a background thread and update status. | |
| """ | |
| global answering_status | |
| try: | |
| all_answered_questions = [] | |
| with answering_lock: | |
| answering_status["total_questions"] = len(questions) | |
| answering_status["processed_questions"] = 0 | |
| answering_status["answers_generated"] = 0 | |
| for i, question in enumerate(questions): | |
| print(f"Processing question {i+1}/{len(questions)}...") | |
| answered_question = generate_answer_for_question(question) | |
| all_answered_questions.append(answered_question) | |
| with answering_lock: | |
| answering_status["processed_questions"] = i + 1 | |
| time.sleep(9) # Rate limiting | |
| dataset = { | |
| "dataset_info": { | |
| "title": "Vaccine Guide Question-Answer Dataset with Answers", | |
| "description": "A dataset of question-answer pairs with answers generated from a vaccine guide for AI language model training.", | |
| "version": "1.0.0", | |
| "created_date": datetime.utcnow().isoformat(), | |
| "source": "Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.pdf", | |
| "generated_by": "Gemini API", | |
| "total_questions": len(all_answered_questions), | |
| "intended_use": "Fine-tuning medical language models for knowledge recall and reasoning" | |
| }, | |
| "questions": all_answered_questions | |
| } | |
| # Save the dataset | |
| filename = f"vaccine_answers_{int(time.time())}.json" | |
| with open(f"./{filename}", 'w', encoding='utf-8') as f: | |
| json.dump(dataset, f, indent=4, ensure_ascii=False) | |
| # Update status to completed | |
| with answering_lock: | |
| answering_status["completed"] = True | |
| answering_status["is_running"] = False | |
| answering_status["result_file"] = filename | |
| except Exception as e: | |
| print(f"Error in background answering: {e}") | |
| with answering_lock: | |
| answering_status["error"] = str(e) | |
| answering_status["is_running"] = False | |
| async def generate_answers(): | |
| """ | |
| Endpoint to generate answers for questions from the dataset | |
| """ | |
| global answering_status | |
| # Check if answering is already running | |
| with answering_lock: | |
| if answering_status["is_running"]: | |
| return { | |
| "status": "running", | |
| "message": "Answer generation already in progress", | |
| "current_status": answering_status | |
| } | |
| try: | |
| # Reset status | |
| with answering_lock: | |
| answering_status["is_running"] = True | |
| answering_status["start_time"] = datetime.utcnow().isoformat() | |
| answering_status["processed_questions"] = 0 | |
| answering_status["answers_generated"] = 0 | |
| answering_status["completed"] = False | |
| answering_status["result_file"] = None | |
| answering_status["error"] = None | |
| # Load questions from the dataset | |
| with open("./vaccine_questions.json", "r", encoding="utf-8") as f: | |
| dataset = json.load(f) | |
| if dataset is None or "questions" not in dataset: | |
| raise HTTPException(status_code=404, detail="Questions file not found or invalid format") | |
| questions = dataset["questions"] | |
| # Start answering in background thread | |
| thread = threading.Thread(target=answer_questions_in_background, args=(questions,)) | |
| thread.daemon = True | |
| thread.start() | |
| return { | |
| "status": "started", | |
| "message": "Answer generation started in background", | |
| "current_status": answering_status | |
| } | |
| except Exception as e: | |
| with answering_lock: | |
| answering_status["is_running"] = False | |
| answering_status["error"] = str(e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_answering_status(): | |
| """ | |
| Endpoint to check the current status of answering | |
| """ | |
| with answering_lock: | |
| status_copy = answering_status.copy() | |
| return status_copy | |
| async def download_file(filename: str): | |
| """ | |
| Endpoint to download generated files | |
| """ | |
| file_path = f"./{filename}" | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path, media_type="application/json", filename=filename) | |
| raise HTTPException(status_code=404, detail="File not found") | |
| async def root(): | |
| """ | |
| Root endpoint that serves the HTML UI from the index.html file. | |
| """ | |
| return FileResponse("./index.html", media_type="text/html") | |
| async def ask_question(question: str, with_citations: bool = False): | |
| """ | |
| Endpoint to answer a specific question using the Gemini API. | |
| Args: | |
| question (str): The question to answer. | |
| with_citations (bool): Whether to include citations in the response. | |
| Returns: | |
| Dict: The question with the generated answer. | |
| """ | |
| try: | |
| # For simplicity, use a default chunk for context (in practice, match to relevant chunk) | |
| with open("./vaccine_questions.json", "r", encoding="utf-8") as f: | |
| dataset = json.load(f) | |
| default_chunk = dataset["questions"][0]["chunk_text"] if dataset["questions"] else "" | |
| question_data = { | |
| "question_id": str(uuid.uuid4()), | |
| "chunk_id": 0, | |
| "chunk_text": default_chunk, | |
| "question": question, | |
| "type": "applied", | |
| "difficulty": "hard", | |
| "training_purpose": "Reasoning", | |
| "validated": False | |
| } | |
| answered_question = generate_answer_for_question(question_data) | |
| return answered_question | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Erreur lors de la génération de la réponse: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |