| from fastapi import FastAPI, Request, HTTPException,Depends,File, UploadFile, Response |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from huggingface_hub import InferenceClient |
| import secrets |
| from typing import Optional |
| from sentence_transformers import SentenceTransformer |
| from bson.objectid import ObjectId |
| from datetime import datetime, timedelta |
| from fastapi import Request |
| import requests |
| import numpy as np |
| import argparse |
| import os |
| from pymongo import MongoClient |
| from datetime import datetime |
| from passlib.hash import bcrypt |
| import PyPDF2 |
| from io import BytesIO |
| import uuid |
|
|
| SECRET_KEY = secrets.token_hex(32) |
|
|
| HOST = os.environ.get("API_URL", "0.0.0.0") |
| PORT = os.environ.get("PORT", 7860) |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--host", default=HOST) |
| parser.add_argument("--port", type=int, default=PORT) |
| parser.add_argument("--reload", action="store_true", default=True) |
| parser.add_argument("--ssl_certfile") |
| parser.add_argument("--ssl_keyfile") |
| args = parser.parse_args() |
|
|
| |
| mongo_uri = os.environ.get("MONGODB_URI", "mongodb+srv://giffardaxel95:TQ5bfvWFqRhkHGVi@chatbotmed.qfn2kdn.mongodb.net/") |
| db_name = os.environ.get("DB_NAME", "chatmed_schizo") |
| mongo_client = MongoClient(mongo_uri) |
| db = mongo_client[db_name] |
|
|
|
|
|
|
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
|
|
| allow_origins=[ |
| "https://axl95-medically.hf.space", |
| "https://huggingface.co", |
| "http://localhost:3000", |
| "http://localhost:7860", |
| "http://0.0.0.0:7860" |
| ], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| async def get_admin_user(request: Request): |
| user = await get_current_user(request) |
| if user["role"] != "Administrateur": |
| raise HTTPException(status_code=403, detail="Accès interdit: Droits d'administrateur requis") |
| return user |
|
|
|
|
| |
| try: |
| embedder = SentenceTransformer('all-MiniLM-L6-v2') |
| except Exception as e: |
| print(f"Erreur lors du chargement du modèle d'embedding: {str(e)}") |
| embedder = None |
| @app.post("/api/admin/knowledge/upload") |
| async def upload_pdf( |
| file: UploadFile = File(...), |
| title: str = None, |
| tags: str = None, |
| current_user: dict = Depends(get_admin_user) |
| ): |
| try: |
| |
| if not file.filename.endswith('.pdf'): |
| raise HTTPException(status_code=400, detail="Le fichier doit être un PDF") |
| |
| |
| contents = await file.read() |
| pdf_file = BytesIO(contents) |
| |
| |
| pdf_reader = PyPDF2.PdfReader(pdf_file) |
| text_content = "" |
| for page_num in range(len(pdf_reader.pages)): |
| text_content += pdf_reader.pages[page_num].extract_text() + "\n" |
| |
| |
| embedding = None |
| if embedder: |
| try: |
| |
| max_length = 5000 |
| truncated_text = text_content[:max_length] |
| embedding = embedder.encode(truncated_text).tolist() |
| except Exception as e: |
| print(f"Erreur lors de la génération de l'embedding: {str(e)}") |
| |
| |
| doc_id = ObjectId() |
| |
| |
| pdf_path = f"files/{str(doc_id)}.pdf" |
| os.makedirs("files", exist_ok=True) |
| with open(pdf_path, "wb") as f: |
| pdf_file.seek(0) |
| f.write(contents) |
| |
| |
| document = { |
| "_id": doc_id, |
| "text": text_content, |
| "embedding": embedding, |
| "title": title or file.filename, |
| "tags": tags.split(",") if tags else [], |
| "uploaded_by": str(current_user["_id"]), |
| "upload_date": datetime.utcnow() |
| } |
| |
| print(f"Tentative d'insertion du document avec ID: {doc_id}") |
| result = db.connaissances.insert_one(document) |
| print(f"Document inséré avec ID: {result.inserted_id}") |
| |
| |
| verification = db.connaissances.find_one({"_id": doc_id}) |
| if verification: |
| print(f"Document vérifié et trouvé dans la base de données") |
| return {"success": True, "document_id": str(doc_id)} |
| else: |
| print(f"ERREUR: Document non trouvé après insertion") |
| return {"success": False, "error": "Document non trouvé après insertion"} |
| |
| except Exception as e: |
| import traceback |
| print(f"Erreur lors de l'upload du PDF: {traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") |
|
|
| @app.get("/api/admin/knowledge") |
| async def list_documents(current_user: dict = Depends(get_admin_user)): |
| try: |
| |
| documents = list(db.connaissances.find().sort("upload_date", -1)) |
| |
| |
| result = [] |
| for doc in documents: |
| doc_safe = { |
| "id": str(doc["_id"]), |
| "title": doc.get("title", "Sans titre"), |
| "tags": doc.get("tags", []), |
| "date": doc.get("upload_date").isoformat() if "upload_date" in doc else None, |
| "text_preview": doc.get("text", "")[:100] + "..." if len(doc.get("text", "")) > 100 else doc.get("text", "") |
| } |
| result.append(doc_safe) |
| |
| return {"documents": result} |
| except Exception as e: |
| print(f"Erreur lors de la liste des documents: {str(e)}") |
| raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") |
|
|
|
|
|
|
| @app.delete("/api/admin/knowledge/{document_id}") |
| async def delete_document(document_id: str, current_user: dict = Depends(get_admin_user)): |
| try: |
| |
| try: |
| doc_id = ObjectId(document_id) |
| except Exception: |
| raise HTTPException(status_code=400, detail="ID de document invalide") |
| |
| |
| document = db.connaissances.find_one({"_id": doc_id}) |
| if not document: |
| raise HTTPException(status_code=404, detail="Document non trouvé") |
| |
| |
| result = db.connaissances.delete_one({"_id": doc_id}) |
| |
| if result.deleted_count == 0: |
| raise HTTPException(status_code=500, detail="Échec de la suppression du document") |
| |
| |
| pdf_path = f"files/{document_id}.pdf" |
| if os.path.exists(pdf_path): |
| try: |
| os.remove(pdf_path) |
| print(f"Fichier supprimé: {pdf_path}") |
| except Exception as e: |
| print(f"Erreur lors de la suppression du fichier: {str(e)}") |
| |
| |
| return {"success": True, "message": "Document supprimé avec succès"} |
| |
| except HTTPException as he: |
| raise he |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Erreur lors de la suppression: {str(e)}") |
|
|
|
|
| @app.post("/api/login") |
| async def login(request: Request, response: Response): |
| try: |
| data = await request.json() |
| email = data.get("email") |
| password = data.get("password") |
| |
| user = db.users.find_one({"email": email}) |
| if not user or not bcrypt.verify(password, user["password"]): |
| raise HTTPException(status_code=401, detail="Email ou mot de passe incorrect") |
| |
| session_id = secrets.token_hex(16) |
| user_id = str(user["_id"]) |
| username = f"{user['prenom']} {user['nom']}" |
| |
| |
| db.sessions.insert_one({ |
| "session_id": session_id, |
| "user_id": user_id, |
| "created_at": datetime.utcnow(), |
| "expires_at": datetime.utcnow() + timedelta(days=7) |
| }) |
| |
| |
| response.set_cookie( |
| key="session_id", |
| value=session_id, |
| httponly=False, |
| max_age=7*24*60*60, |
| samesite="none", |
| secure=True, |
| path="/" |
| ) |
| |
| |
| print(f"Session créée: {session_id} pour l'utilisateur {user_id}") |
| |
| return { |
| "success": True, |
| "username": username, |
| "user_id": user_id, |
| "session_id": session_id, |
| "role": user.get("role", "user") |
|
|
| } |
| |
| except Exception as e: |
| print(f"Erreur login: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| async def get_current_user(request: Request): |
| session_id = request.cookies.get("session_id") |
| print(f"Cookie de session reçu: {session_id[:5] if session_id else 'None'}") |
| |
| if not session_id: |
| auth_header = request.headers.get("Authorization") |
| if auth_header and auth_header.startswith("Bearer "): |
| session_id = auth_header.replace("Bearer ", "") |
| print(f"Session d'autorisation reçue: {session_id[:5]}...") |
| |
| if not session_id: |
| session_id = request.query_params.get("session_id") |
| if session_id: |
| print(f"Session des paramètres de requête: {session_id[:5]}...") |
| |
| if not session_id: |
| raise HTTPException(status_code=401, detail="Non authentifié - Aucune session trouvée") |
| |
| session = db.sessions.find_one({ |
| "session_id": session_id, |
| "expires_at": {"$gt": datetime.utcnow()} |
| }) |
| |
| if not session: |
| raise HTTPException(status_code=401, detail="Session expirée ou invalide") |
| |
| user = db.users.find_one({"_id": ObjectId(session["user_id"])}) |
| if not user: |
| raise HTTPException(status_code=401, detail="Utilisateur non trouvé") |
| |
| return user |
|
|
| |
| @app.post("/api/logout") |
| async def logout(request: Request, response: Response): |
| session_id = request.cookies.get("session_id") |
| if session_id: |
| db.sessions.delete_one({"session_id": session_id}) |
| |
| response.delete_cookie(key="session_id") |
| return {"success": True} |
| @app.post("/api/register") |
| async def register(request: Request): |
| try: |
| data = await request.json() |
| |
| required_fields = ["prenom", "nom", "email", "password"] |
| for field in required_fields: |
| if not data.get(field): |
| raise HTTPException(status_code=400, detail=f"Le champ {field} est requis") |
| |
| existing_user = db.users.find_one({"email": data["email"]}) |
| if existing_user: |
| raise HTTPException(status_code=409, detail="Cet email est déjà utilisé") |
| |
| hashed_password = bcrypt.hash(data["password"]) |
| |
| user = { |
| "prenom": data["prenom"], |
| "nom": data["nom"], |
| "email": data["email"], |
| "password": hashed_password, |
| "createdAt": datetime.utcnow(), |
| "role": data.get("role", "user"), |
|
|
| } |
| |
| result = db.users.insert_one(user) |
| |
| return {"message": "Utilisateur créé avec succès", "userId": str(result.inserted_id)} |
| |
| except HTTPException as he: |
| raise he |
| |
| except Exception as e: |
| import traceback |
| print(f"Erreur lors de l'inscription: {str(e)}") |
| print(traceback.format_exc()) |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
| @app.post("/api/embed") |
| async def embed(request: Request): |
| data = await request.json() |
| texts = data.get("texts", []) |
| |
| try: |
| |
| dummy_embedding = [[0.1, 0.2, 0.3] for _ in range(len(texts))] |
| |
| return {"embeddings": dummy_embedding} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @app.get("/invert") |
| async def invert(text: str): |
| return { |
| "original": text, |
| "inverted": text[::-1], |
| } |
|
|
| HF_TOKEN = os.getenv('REACT_APP_HF_TOKEN') |
| if not HF_TOKEN: |
| raise RuntimeError("Le token Hugging Face (HF_TOKEN) n'est pas défini dans les variables d'environnement.") |
|
|
| hf_client = InferenceClient(token=HF_TOKEN) |
|
|
| @app.post("/api/chat") |
| async def chat(request: Request): |
| data = await request.json() |
| user_message = data.get("message", "").strip() |
| if not user_message: |
| raise HTTPException(status_code=400, detail="Le champ 'message' est requis.") |
|
|
| try: |
| response = hf_client.text_generation( |
| model="mistralai/Mistral-7B-Instruct-v0.3", |
| prompt=f"<s>[INST] Tu es un assistant médical spécialisé en schizophrénie. Réponds à cette question: {user_message} [/INST]", |
| max_new_tokens=512, |
| temperature=0.7 |
| ) |
| |
| return {"response": response} |
| |
| except Exception as e: |
| import traceback |
| print(f"Erreur détaillée: {traceback.format_exc()}") |
| raise HTTPException(status_code=502, detail=f"Erreur d'inférence HF : {str(e)}") |
|
|
|
|
|
|
| @app.get("/data") |
| async def get_data(): |
| data = {"data": np.random.rand(100).tolist()} |
| return JSONResponse(data) |
|
|
| @app.get("/api/conversations") |
| async def get_conversations(current_user: dict = Depends(get_current_user)): |
| try: |
| user_id = str(current_user["_id"]) |
| conversations = list(db.conversations.find( |
| {"user_id": user_id}, |
| {"_id": 1, "title": 1, "date": 1, "time": 1, "last_message": 1, "created_at": 1} |
| ).sort("created_at", -1)) |
| |
| for conv in conversations: |
| conv["_id"] = str(conv["_id"]) |
| |
| return {"conversations": conversations} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
| @app.post("/api/conversations") |
| async def create_conversation(request: Request, current_user: dict = Depends(get_current_user)): |
| try: |
| data = await request.json() |
| user_id = str(current_user["_id"]) |
| |
| conversation = { |
| "user_id": user_id, |
| "title": data.get("title", "Nouvelle conversation"), |
| "date": data.get("date"), |
| "time": data.get("time"), |
| "last_message": data.get("message", ""), |
| "created_at": datetime.utcnow() |
| } |
| |
| result = db.conversations.insert_one(conversation) |
| |
| return {"conversation_id": str(result.inserted_id)} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
| @app.post("/api/conversations/{conversation_id}/messages") |
| async def add_message(conversation_id: str, request: Request, current_user: dict = Depends(get_current_user)): |
| try: |
| data = await request.json() |
| user_id = str(current_user["_id"]) |
| |
| print(f"Ajout message: conversation_id={conversation_id}, sender={data.get('sender')}, text={data.get('text')[:20]}...") |
| |
| conversation = db.conversations.find_one({ |
| "_id": ObjectId(conversation_id), |
| "user_id": user_id |
| }) |
| |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation non trouvée") |
| |
| message = { |
| "conversation_id": conversation_id, |
| "user_id": user_id, |
| "sender": data.get("sender", "user"), |
| "text": data.get("text", ""), |
| "timestamp": datetime.utcnow() |
| } |
| |
| db.messages.insert_one(message) |
| |
| db.conversations.update_one( |
| {"_id": ObjectId(conversation_id)}, |
| {"$set": {"last_message": data.get("text", ""), "updated_at": datetime.utcnow()}} |
| ) |
| |
| return {"success": True} |
| except Exception as e: |
| print(f"Erreur lors de l'ajout d'un message: {str(e)}") |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
| @app.get("/api/conversations/{conversation_id}/messages") |
| async def get_messages(conversation_id: str, current_user: dict = Depends(get_current_user)): |
| try: |
| user_id = str(current_user["_id"]) |
| |
| conversation = db.conversations.find_one({ |
| "_id": ObjectId(conversation_id), |
| "user_id": user_id |
| }) |
| |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation non trouvée") |
| |
| messages = list(db.messages.find( |
| {"conversation_id": conversation_id} |
| ).sort("timestamp", 1)) |
| |
| for msg in messages: |
| msg["_id"] = str(msg["_id"]) |
| if "timestamp" in msg: |
| msg["timestamp"] = msg["timestamp"].isoformat() |
| |
| return {"messages": messages} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
| @app.delete("/api/conversations/{conversation_id}") |
| async def delete_conversation(conversation_id: str, current_user: dict = Depends(get_current_user)): |
| try: |
| user_id = str(current_user["_id"]) |
| |
| result = db.conversations.delete_one({ |
| "_id": ObjectId(conversation_id), |
| "user_id": user_id |
| }) |
| |
| if result.deleted_count == 0: |
| raise HTTPException(status_code=404, detail="Conversation non trouvée") |
| |
| db.messages.delete_many({"conversation_id": conversation_id}) |
| |
| return {"success": True} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Erreur serveur: {str(e)}") |
|
|
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| print(args) |
| uvicorn.run( |
| "app:app", |
| host=args.host, |
| port=args.port, |
| reload=args.reload, |
| ssl_certfile=args.ssl_certfile, |
| ssl_keyfile=args.ssl_keyfile, |
| ) |
|
|
|
|
|
|