heymenn's picture
add features stop and filter class
36fa73c
from datetime import datetime
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from google import genai
from dotenv import load_dotenv
import os
import json
import logging
import re
import mistune
import requests
from data_service import DataService
# Load envs
load_dotenv()
load_dotenv("../.env.local")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="3GPP Innovation Backend")
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # React dev server
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize DataService
data_service = DataService()
def ask_gemini(prompt, content):
MAX_LEN = 10000
if len(prompt) + len(content) <= MAX_LEN:
client = genai.Client(api_key=GEMINI_API_KEY)
response = client.models.generate_content(
model="gemma-3-27b-it",
contents=prompt + "\n\n" + content
)
return response.text
chunk = content[:MAX_LEN - len(prompt)]
rest = content[MAX_LEN - len(prompt):]
first_answer = ask_gemini(prompt, chunk)
#remaining_answer = ask_gemini(prompt, rest)
return first_answer #+ "\n\n" + remaining_answer
PROCESS_PROMPT = """
Task :
Using the text provided,
create chunk that are dense in relevant information and minimize near-duplicate or
loosely related passages, provide a paragraph on whats new to this document using
the SUGGESTION START and END.
"""
def format_answer(answer):
return f"We obtained the following methodology:"+answer["methodology"]+"\n\nThe context is :"+answer["context"]+"\n\nThe problem description is :"+answer["problem"]
def extract_json(text: str) -> dict:
match = re.search(r'\{.*\}', text, re.DOTALL)
if not match:
raise ValueError("Aucun JSON trouvé")
return json.loads(match.group())
# --- Pydantic Models ---
class ProcessRequest(BaseModel):
file_id: str
filename: str
working_group: str
meeting: str
type: str # doc.Type
status: str # doc["TDoc Status"]
agenda_item: str # doc["Agenda item description"]
url: str
class InnovationResponse(BaseModel):
id: str
file_name: str
answer: str
classification: str
class PatternResponse(BaseModel):
pattern_id: int
pattern_name: str
prompt: str
class AnalyzeRequest(BaseModel):
file_id: str = None
text: str = None
pattern_id: int
class AnalyzeResponse(BaseModel):
id: int
file_name: str
content: str
methodology: str
context: str
problem: str
pattern_name: str
class ClassificationRequest(BaseModel):
result_id: int
classification: str
class ResultResponse(BaseModel):
id: int
file_name: str
content: str
classification: str
pattern_name: str
methodology: str
context: str
problem: str
# --- Helper Functions ---
def fetch_text_content(req: AnalyzeRequest):
"""
fetches text content from request or database.
"""
if req.text:
return req.text
elif req.file_id:
content = data_service.get_file_content(req.file_id)
if content:
return content
else:
refined = data_service.get_refined_output(req.file_id)
if refined:
return refined
return None
# --- API Endpoints ---
@app.get("/get_all")
def get_all():
return data_service.get_all_files()
@app.get("/patterns", response_model=list[PatternResponse])
def get_patterns():
return data_service.get_patterns()
class PatternRequest(BaseModel):
pattern_name: str
prompt: str
@app.post("/patterns", response_model=PatternResponse)
def create_pattern(req: PatternRequest):
try:
pattern_id = data_service.add_pattern(req.pattern_name, req.prompt)
return {
"pattern_id": pattern_id,
"pattern_name": req.pattern_name,
"prompt": req.prompt
}
except Exception as e:
logger.error(f"Error creating pattern: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.put("/patterns/{pattern_id}", response_model=PatternResponse)
def update_pattern(pattern_id: int, req: PatternRequest):
try:
updated = data_service.update_pattern(pattern_id, req.pattern_name, req.prompt)
if not updated:
raise HTTPException(status_code=404, detail="Pattern not found")
return {
"pattern_id": pattern_id,
"pattern_name": req.pattern_name,
"prompt": req.prompt
}
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"Error updating pattern: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze", response_model=AnalyzeResponse)
async def analyze_content(req: AnalyzeRequest):
print("Start of analyse")
try:
# 1. Check for existing result (caching strategy)
existing_result, refined_id, file_name = data_service.get_existing_result(req.file_id)
if existing_result:
# Cache Hit
return {
"id": existing_result['result_id'],
"file_name": file_name,
"content": existing_result['content'],
"methodology": existing_result['methodology'],
"context": existing_result['context'],
"problem": existing_result['problem'],
"pattern_name": existing_result['pattern_name']
}
# 2. Cache Miss - Perform Analysis
print('Performing new analysis')
text_content = fetch_text_content(req)
if not text_content:
raise HTTPException(status_code=400, detail="No content found to analyze")
pattern = data_service.get_pattern(req.pattern_id)
if not pattern:
raise HTTPException(status_code=404, detail="Pattern not found")
pattern_name = pattern['pattern_name']
pattern_prompt = pattern['prompt']
# Call LLM
response = ask_gemini(f"Pattern: {pattern_name}\nPrompt: {pattern_prompt}\n\nContext:\n", text_content)
json_response = extract_json(response)
answer = format_answer(json_response)
methodology = json_response["methodology"]
context = json_response["context"]
problem = json_response["problem"]
# Save Result
# We need refined_id. If get_existing_result returned it (even if no result matched), use it.
# But get_existing_result returns it.
# If refined_id is None, it means the file wasn't refined?
# Ideally fetch_text_content doesn't give refined_id.
# Let's get refined_id again if missing.
if not refined_id and req.file_id:
ref_row = data_service.get_refined_by_file_id(req.file_id)
if ref_row:
refined_id = ref_row["refined_id"]
result_id = data_service.add_result(req.pattern_id, refined_id, answer, methodology, context, problem)
print("End of analyse")
return {
"id": result_id,
"file_name": file_name,
"content": answer,
"methodology": methodology,
"context": context,
"problem": problem,
"pattern_name": pattern_name
}
except Exception as e:
logger.error(f"Error during analysis: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/classify")
def classify_result(req: ClassificationRequest):
try:
updated = data_service.update_classification(req.result_id, req.classification)
if not updated:
raise HTTPException(status_code=404, detail="Result not found")
return {"id": req.result_id, "status": "updated"}
except Exception as e:
logger.error(f"Error updating classification: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/results", response_model=list[ResultResponse])
def get_results():
try:
return data_service.get_all_results_joined()
except Exception as e:
logger.error(f"Error fetching results: {e}")
return []
@app.post("/process", response_model=InnovationResponse)
async def process_document(req: ProcessRequest):
try:
existing_content = data_service.get_file_content(req.file_id)
text_content = ""
content = ""
if existing_content:
logger.info(f"File {req.file_id} found in DB.")
text_content = existing_content
else:
try:
print(req.url)
hf_response = requests.post(
'https://organizedprogrammers-docxtract.hf.space/docs/extract_text_from_url',
json={"url": req.url},
timeout=30
)
if hf_response.status_code == 200:
data = hf_response.json()
text_content = data.get('text') or data.get('content') or ""
else:
logger.error(f"Failed to fetch content from HF: {hf_response.text}")
text_content = "Extraction failed."
except Exception as e:
logger.error(f"Error fetching content: {e}")
text_content = "Extraction error."
print(req)
# Add file to DataService
data_service.add_file({
"file_id": req.file_id,
"working_group": req.working_group,
"meeting": req.meeting,
"type": req.type,
"status": req.status,
"agenda_item": req.agenda_item,
"content": text_content,
"filename": req.filename,
"timestamp": datetime.now().isoformat()
})
refined_output = data_service.get_refined_output(req.file_id)
md = mistune.create_markdown()
if refined_output:
content = md(refined_output)
else:
print(text_content)
answer = ask_gemini(PROCESS_PROMPT, text_content)
content = md(answer)
data_service.add_refined(req.file_id, answer)
return {
"id": req.file_id,
"file_name": req.filename,
"answer": content,
"classification": "UNCLASSIFIED",
}
except Exception as e:
logger.error(f"Error processing: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Serve Static Files for Deployment (must be after API routes)
static_dir = "static"
if os.path.exists(static_dir):
# Mount assets folder
if os.path.exists(os.path.join(static_dir, "assets")):
app.mount("/assets", StaticFiles(directory=os.path.join(static_dir, "assets")), name="assets")
# Catch-all for SPA and other static files at root
@app.get("/{full_path:path}")
async def serve_frontend(full_path: str):
# Check if it's a specific file that exists
file_path = os.path.join(static_dir, full_path)
if os.path.isfile(file_path):
return FileResponse(file_path)
# Default to index.html for SPA routing
return FileResponse(os.path.join(static_dir, "index.html"))