Zeggai Abdellah
fix the index.html
778a327
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
import json
from dotenv import load_dotenv
import time
import uuid
from typing import List, Dict, Optional
from datetime import datetime
from huggingface_hub import HfApi # For file persistence in Spaces
import os
import threading
import glob
import random
from langchain_google_genai import GoogleGenerativeAI
# Load environment variables from .env file
load_dotenv()
app = FastAPI()
# Global variables to track generation status
generation_status = {
"is_running": False,
"start_time": None,
"processed_chunks": 0,
"total_chunks": 0,
"questions_generated": 0,
"completed": False,
"result_file": None,
"progress_file": None, # New: track progress file
"error": None,
"current_api_key_index": 0, # New: track current API key
"failed_chunks": [], # New: track failed chunks for retry
"partial_results": [] # New: store partial results
}
generation_lock = threading.Lock()
def get_api_keys() -> List[str]:
"""
Get all available Google API keys from environment variables.
Supports GOOGLE_API_KEY, GOOGLE_API_KEY_1, GOOGLE_API_KEY_2, etc.
"""
api_keys = []
# Check for primary key
primary_key = os.getenv("GOOGLE_API_KEY")
if primary_key:
api_keys.append(primary_key)
# Check for numbered keys
i = 1
while True:
key = os.getenv(f"GOOGLE_API_KEY_{i}")
if key:
api_keys.append(key)
i += 1
else:
break
if not api_keys:
raise ValueError("No Google API keys found in environment variables")
return api_keys
def get_next_api_key() -> tuple[str, int]:
"""
Get the next API key in rotation and update the current index.
Returns tuple of (api_key, key_index)
"""
global generation_status
api_keys = get_api_keys()
with generation_lock:
current_index = generation_status["current_api_key_index"]
next_index = (current_index + 1) % len(api_keys)
generation_status["current_api_key_index"] = next_index
return api_keys[next_index], next_index
def save_progress_file():
"""
Save current progress to a file that can be downloaded at any time.
"""
global generation_status
with generation_lock:
progress_data = {
"generation_info": {
"status": "in_progress" if generation_status["is_running"] else "completed",
"start_time": generation_status["start_time"],
"processed_chunks": generation_status["processed_chunks"],
"total_chunks": generation_status["total_chunks"],
"questions_generated": generation_status["questions_generated"],
"completed": generation_status["completed"],
"current_time": datetime.utcnow().isoformat(),
"failed_chunks": generation_status["failed_chunks"].copy(),
"error": generation_status["error"]
},
"partial_dataset": {
"dataset_info": {
"title": "Vaccine Guide Question-Answer Dataset (Partial)",
"description": "Partial dataset of question-answer pairs generated from a vaccine guide.",
"version": "1.1.0",
"created_date": generation_status["start_time"],
"source": "Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.pdf",
"generated_by": "Gemini API",
"total_questions": len(generation_status["partial_results"]),
"intended_use": "Fine-tuning medical language models for knowledge recall and reasoning",
"note": "This is a partial dataset. Generation may still be in progress."
},
"questions": generation_status["partial_results"].copy()
}
}
# Save progress file
progress_filename = f"vaccine_questions_progress_{int(time.time())}.json"
generation_status["progress_file"] = progress_filename
try:
with open(f"./{progress_filename}", 'w', encoding='utf-8') as f:
json.dump(progress_data, f, indent=4, ensure_ascii=False)
print(f"Progress saved to {progress_filename}")
except Exception as e:
print(f"Error saving progress file: {e}")
def estimate_difficulty(question: str, q_type: str) -> str:
"""
Estimate question difficulty based on type and content.
Args:
question (str): The question text.
q_type (str): Question type (factual, conceptual, applied).
Returns:
str: Difficulty level (easy, medium, hard).
"""
if q_type == "factual":
return "easy"
elif q_type == "conceptual":
return "medium"
return "hard" # applied
def generate_questions_for_chunk(chunk: str, chunk_id: int, model="gemini-2.0-flash", max_retries=3) -> List[Dict]:
"""
Generate French questions for a given document chunk using the Gemini API.
Now includes retry logic with different API keys.
"""
prompt = f"""
À partir du texte suivant d'un guide sur les vaccins en français, générez 3 questions variées (factual, conceptual, applied) qui couvrent le contenu de manière exhaustive.
Fournissez uniquement les questions, sans réponses, en français. Retournez le résultat au format JSON, entouré de ```json\n...\n```.
Texte : {chunk}
Exemple de sortie :
```json
[
{{
"question": "Combien de structures sanitaires de proximité sont impliquées dans le suivi de la vaccination ?",
"type": "factual"
}},
{{
"question": "Quel est l'impact de la réglementation de la vaccination sur la couverture vaccinale ?",
"type": "conceptual"
}},
{{
"question": "Quelles seraient les conséquences si les établissements privés ne suivaient plus la réglementation vaccinale ?",
"type": "applied"
}}
]
```
"""
last_error = None
for attempt in range(max_retries):
try:
# Get next API key for this attempt
api_key, key_index = get_next_api_key()
print(f"Chunk {chunk_id}, attempt {attempt + 1}: Using API key index {key_index}")
llm = GoogleGenerativeAI(
model=model,
google_api_key=api_key
)
response = llm.invoke(prompt)
questions_text = str(response) # Convert response to string
# Strip Markdown code fences
if questions_text.startswith("```json\n") and questions_text.endswith("\n```"):
questions_text = questions_text[7:-4].strip()
elif questions_text.startswith("```") and questions_text.endswith("```"):
questions_text = questions_text[3:-3].strip()
if not questions_text:
raise ValueError(f"Empty response for chunk {chunk_id}")
questions = json.loads(questions_text)
formatted_questions = []
for q in questions:
question_id = str(uuid.uuid4())
difficulty = estimate_difficulty(q["question"], q["type"])
formatted_questions.append({
"question_id": question_id,
"chunk_id": chunk_id,
"chunk_text": chunk,
"question": q["question"],
"type": q["type"],
"difficulty": difficulty,
"training_purpose": "Knowledge Recall" if q["type"] == "factual" else "Reasoning",
"validated": False,
"api_key_used": key_index, # Track which key was used
"generation_attempt": attempt + 1
})
# Update the global status and add to partial results
with generation_lock:
generation_status["questions_generated"] += len(formatted_questions)
generation_status["partial_results"].extend(formatted_questions)
# Save progress after each successful chunk
save_progress_file()
print(f"Successfully generated {len(formatted_questions)} questions for chunk {chunk_id}")
return formatted_questions
except Exception as e:
last_error = e
print(f"Attempt {attempt + 1} failed for chunk {chunk_id}: {e}")
# If this is not the last attempt, wait before retrying
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 5 # Increasing wait time
print(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
continue
# All attempts failed
print(f"All {max_retries} attempts failed for chunk {chunk_id}. Last error: {last_error}")
# Add to failed chunks list
with generation_lock:
generation_status["failed_chunks"].append({
"chunk_id": chunk_id,
"error": str(last_error),
"attempts": max_retries
})
return []
def generate_questions_in_background(chunks: List[str]):
"""
Generate questions in a background thread and update status.
Enhanced with better error handling and progress tracking.
"""
global generation_status
try:
all_questions = []
with generation_lock:
generation_status["total_chunks"] = len(chunks)
generation_status["processed_chunks"] = 0
generation_status["questions_generated"] = 0
generation_status["partial_results"] = []
generation_status["failed_chunks"] = []
# Save initial progress file
save_progress_file()
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i+1}/{len(chunks)}...")
questions = generate_questions_for_chunk(chunk, i)
if questions: # Only add if generation was successful
all_questions.extend(questions)
with generation_lock:
generation_status["processed_chunks"] = i + 1
# Rate limiting - slightly randomized to avoid hitting limits
sleep_time = random.uniform(8, 11) # Random between 8-11 seconds
time.sleep(sleep_time)
# Create final dataset
dataset = {
"dataset_info": {
"title": "Vaccine Guide Question-Answer Dataset",
"description": "A dataset of question-answer pairs generated from a vaccine guide for AI language model training.",
"version": "1.1.0",
"created_date": datetime.utcnow().isoformat(),
"source": "Guide-pratique-de-mise-en-oeuvre-du-calendrier-national-de-vaccination-2023.pdf",
"generated_by": "Gemini API",
"total_questions": len(all_questions),
"intended_use": "Fine-tuning medical language models for knowledge recall and reasoning",
"total_chunks_processed": len(chunks),
"successful_chunks": len(chunks) - len(generation_status["failed_chunks"]),
"failed_chunks": len(generation_status["failed_chunks"]),
"failed_chunk_details": generation_status["failed_chunks"].copy()
},
"questions": all_questions
}
# Save the final dataset
filename = f"vaccine_questions_final_{int(time.time())}.json"
with open(f"./{filename}", 'w', encoding='utf-8') as f:
json.dump(dataset, f, indent=4, ensure_ascii=False)
# Update status to completed
with generation_lock:
generation_status["completed"] = True
generation_status["is_running"] = False
generation_status["result_file"] = filename
# Save final progress file
save_progress_file()
success_rate = (len(chunks) - len(generation_status["failed_chunks"])) / len(chunks) * 100
print(f"Generation completed! Success rate: {success_rate:.1f}% ({len(all_questions)} questions generated)")
except Exception as e:
print(f"Error in background generation: {e}")
with generation_lock:
generation_status["error"] = str(e)
generation_status["is_running"] = False
# Save progress even if there was an error
save_progress_file()
def save_dataset_to_space(dataset: Dict, filename: str):
"""
Save dataset to a file in the Space's persistent storage
"""
persistent_path = f"./{filename}"
with open(persistent_path, 'w', encoding='utf-8') as f:
json.dump(dataset, f, indent=4, ensure_ascii=False)
print(f"Dataset saved to {persistent_path}")
@app.get("/generate-questions")
async def generate_questions():
"""
Endpoint to generate questions from all JSON files in the data folder
Enhanced with multi-key support validation
"""
global generation_status
# Check if generation is already running
with generation_lock:
if generation_status["is_running"]:
return {
"status": "running",
"message": "Generation already in progress",
"current_status": generation_status
}
try:
# Validate API keys before starting
api_keys = get_api_keys()
print(f"Found {len(api_keys)} API keys for rotation")
# Reset status
with generation_lock:
generation_status["is_running"] = True
generation_status["start_time"] = datetime.utcnow().isoformat()
generation_status["processed_chunks"] = 0
generation_status["questions_generated"] = 0
generation_status["completed"] = False
generation_status["result_file"] = None
generation_status["progress_file"] = None
generation_status["error"] = None
generation_status["current_api_key_index"] = 0
generation_status["failed_chunks"] = []
generation_status["partial_results"] = []
# Load all JSON files from data folder
json_files = glob.glob("./chunk/*.json")
if not json_files:
raise HTTPException(status_code=404, detail="No JSON files found in chunk folder")
all_chunks = []
for json_file in json_files:
with open(json_file, "r", encoding="utf-8") as f:
chunks_data = json.load(f)
if isinstance(chunks_data, list):
# If it's a list of chunks
for chunk in chunks_data:
if isinstance(chunk, dict) and "text" in chunk:
all_chunks.append(chunk["text"])
elif isinstance(chunk, str):
all_chunks.append(chunk)
elif isinstance(chunks_data, dict):
# If it's a dict, try to extract text content
if "text" in chunks_data:
all_chunks.append(chunks_data["text"])
elif "content" in chunks_data:
all_chunks.append(chunks_data["content"])
if not all_chunks:
raise HTTPException(status_code=404, detail="No text content found in JSON files")
# Start generation in background thread
thread = threading.Thread(target=generate_questions_in_background, args=(all_chunks,))
thread.daemon = True
thread.start()
return {
"status": "started",
"message": f"Question generation started for {len(json_files)} JSON files with {len(all_chunks)} chunks",
"api_keys_available": len(api_keys),
"current_status": generation_status
}
except Exception as e:
with generation_lock:
generation_status["is_running"] = False
generation_status["error"] = str(e)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/generation-status")
async def get_generation_status():
"""
Endpoint to check the current status of generation
Enhanced with more detailed status information
"""
with generation_lock:
status_copy = generation_status.copy()
# Calculate additional metrics
if status_copy["total_chunks"] > 0:
progress_percentage = (status_copy["processed_chunks"] / status_copy["total_chunks"]) * 100
status_copy["progress_percentage"] = round(progress_percentage, 2)
else:
status_copy["progress_percentage"] = 0
# Add estimated time remaining if generation is running
if status_copy["is_running"] and status_copy["start_time"] and status_copy["processed_chunks"] > 0:
start_time = datetime.fromisoformat(status_copy["start_time"].replace('Z', '+00:00'))
elapsed_time = (datetime.utcnow() - start_time.replace(tzinfo=None)).total_seconds()
chunks_per_second = status_copy["processed_chunks"] / elapsed_time if elapsed_time > 0 else 0
if chunks_per_second > 0:
remaining_chunks = status_copy["total_chunks"] - status_copy["processed_chunks"]
estimated_remaining_seconds = remaining_chunks / chunks_per_second
status_copy["estimated_remaining_minutes"] = round(estimated_remaining_seconds / 60, 2)
else:
status_copy["estimated_remaining_minutes"] = None
return status_copy
@app.get("/download-progress")
async def download_progress():
"""
New endpoint to download current progress at any time
"""
global generation_status
# Force save current progress
save_progress_file()
with generation_lock:
progress_file = generation_status["progress_file"]
if progress_file and os.path.exists(f"./{progress_file}"):
return FileResponse(f"./{progress_file}", media_type="application/json", filename=progress_file)
else:
raise HTTPException(status_code=404, detail="No progress file available")
@app.get("/download/{filename}")
async def download_file(filename: str):
"""
Endpoint to download generated files
Enhanced with better error handling
"""
file_path = f"./{filename}"
if os.path.exists(file_path):
return FileResponse(file_path, media_type="application/json", filename=filename)
raise HTTPException(status_code=404, detail=f"File {filename} not found")
@app.get("/retry-failed")
async def retry_failed_chunks():
"""
New endpoint to retry only the failed chunks
"""
global generation_status
with generation_lock:
if generation_status["is_running"]:
return {
"status": "error",
"message": "Cannot retry while generation is running"
}
failed_chunks = generation_status["failed_chunks"].copy()
if not failed_chunks:
return {
"status": "success",
"message": "No failed chunks to retry"
}
# This would require implementing the retry logic
# For now, just return the failed chunks info
return {
"status": "info",
"message": f"Found {len(failed_chunks)} failed chunks",
"failed_chunks": failed_chunks,
"note": "Retry functionality can be implemented based on requirements"
}
@app.get("/api-keys-status")
async def get_api_keys_status():
"""
New endpoint to check API keys status
"""
try:
api_keys = get_api_keys()
return {
"status": "success",
"total_keys": len(api_keys),
"current_key_index": generation_status["current_api_key_index"],
"message": f"{len(api_keys)} API keys configured for rotation"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
@app.get("/")
async def root():
"""
Root endpoint that serves the HTML UI from the index.html file.
"""
print("Serving index.html") # Debug log to confirm serving
return FileResponse("./index.html", media_type="text/html")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)