| from flask import Flask, request, render_template, jsonify, Response |
| import json |
| import os |
| from google import genai |
| from google.genai import types |
| import base64 |
| from werkzeug.utils import secure_filename |
| import mimetypes |
| from dotenv import load_dotenv |
| from datetime import datetime |
|
|
| app = Flask(__name__) |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
| load_dotenv() |
|
|
| def load_system_instruction(): |
| """Charge les instructions système depuis le fichier Markdown""" |
| try: |
| with open('instructions/system_instruction.md', 'r', encoding='utf-8') as f: |
| return f.read().strip() |
| except FileNotFoundError: |
| print("Erreur: Fichier d'instructions système non trouvé.") |
| return "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." |
| except Exception as e: |
| print(f"Erreur lors du chargement des instructions système: {e}") |
| return "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." |
|
|
| |
| API_KEY = os.getenv("GOOGLE_API_KEY") |
| SYSTEM_INSTRUCTION = load_system_instruction() |
|
|
| client = genai.Client(api_key=API_KEY) |
|
|
| |
| MODEL = "gemini-2.5-flash" |
| DEFAULT_CONFIG = { |
| "temperature": 0.7, |
| "max_output_tokens": 8192, |
| "top_p": 0.9, |
| "top_k": 40 |
| } |
|
|
| |
| DEFAULT_TOOLS = [ |
| types.Tool(code_execution=types.ToolCodeExecution()), |
| types.Tool(google_search=types.GoogleSearch()), |
| types.Tool(url_context=types.UrlContext()) |
| ] |
|
|
| |
| conversations = {} |
| conversation_metadata = {} |
|
|
| def add_message_to_history(conversation_id, role, content, has_file=False, file_data=None): |
| """Ajoute un message à l'historique de la conversation""" |
| if conversation_id not in conversation_metadata: |
| conversation_metadata[conversation_id] = { |
| 'id': conversation_id, |
| 'created_at': datetime.now().isoformat(), |
| 'last_activity': datetime.now().isoformat(), |
| 'messages': [], |
| 'status': 'active' |
| } |
| |
| message_data = { |
| 'role': role, |
| 'content': content, |
| 'timestamp': datetime.now().isoformat(), |
| 'hasFile': has_file |
| } |
| if file_data: |
| message_data['fileData'] = file_data |
| conversation_metadata[conversation_id]['messages'].append(message_data) |
| conversation_metadata[conversation_id]['last_activity'] = datetime.now().isoformat() |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/admin1') |
| def admin(): |
| """Page d'administration""" |
| return render_template('admin.html') |
|
|
| @app.route('/admin/conversations') |
| def get_conversations(): |
| """API pour récupérer les conversations pour l'admin""" |
| try: |
| |
| total_conversations = len(conversation_metadata) |
| total_messages = sum(len(conv['messages']) for conv in conversation_metadata.values()) |
| active_conversations = sum(1 for conv in conversation_metadata.values() if conv.get('status') == 'active') |
| conversations_with_files = sum(1 for conv in conversation_metadata.values() |
| if any(msg.get('hasFile') for msg in conv['messages'])) |
| |
| |
| conversations_data = [] |
| for conv_id, conv_data in conversation_metadata.items(): |
| conversations_data.append({ |
| 'id': conv_id, |
| 'createdAt': conv_data.get('created_at'), |
| 'lastActivity': conv_data.get('last_activity'), |
| 'status': conv_data.get('status', 'active'), |
| 'messages': conv_data.get('messages', []) |
| }) |
| |
| |
| conversations_data.sort(key=lambda x: x.get('lastActivity', ''), reverse=True) |
| |
| return jsonify({ |
| 'conversations': conversations_data, |
| 'stats': { |
| 'total': total_conversations, |
| 'totalMessages': total_messages, |
| 'active': active_conversations, |
| 'withFiles': conversations_with_files |
| } |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/chat', methods=['POST']) |
| def chat(): |
| try: |
| data = request.get_json() |
| message = data.get('message', '') |
| thinking_enabled = data.get('thinking_enabled', True) |
| conversation_id = data.get('conversation_id', 'default') |
| |
| |
| add_message_to_history(conversation_id, 'user', message) |
| |
| |
| config_dict = DEFAULT_CONFIG.copy() |
| config_dict["system_instruction"] = SYSTEM_INSTRUCTION |
| |
| if thinking_enabled: |
| config_dict["thinking_config"] = types.ThinkingConfig( |
| thinking_budget=-1, |
| include_thoughts=True |
| ) |
| |
| config_dict["tools"] = DEFAULT_TOOLS |
| generation_config = types.GenerateContentConfig(**config_dict) |
| |
| |
| if conversation_id not in conversations: |
| conversations[conversation_id] = client.chats.create( |
| model=MODEL, |
| config=generation_config |
| ) |
| |
| chat = conversations[conversation_id] |
| |
| |
| def generate(): |
| try: |
| response_stream = chat.send_message_stream( |
| message, |
| config=generation_config |
| ) |
| |
| full_response = "" |
| thoughts = "" |
| |
| for chunk in response_stream: |
| for part in chunk.candidates[0].content.parts: |
| if part.text: |
| full_response += part.text |
| yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" |
| if part.thought and thinking_enabled: |
| thoughts += part.text |
| |
| |
| if full_response: |
| add_message_to_history(conversation_id, 'assistant', full_response) |
| |
| |
| yield f"data: {json.dumps({'type': 'end'})}\n\n" |
| |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n" |
| |
| return Response(generate(), mimetype='text/plain') |
| |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/upload', methods=['POST']) |
| def upload_file(): |
| try: |
| if 'file' not in request.files: |
| return jsonify({'error': 'No file uploaded'}), 400 |
| |
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({'error': 'No file selected'}), 400 |
| |
| |
| file_bytes = file.read() |
| mime_type = file.content_type or mimetypes.guess_type(file.filename)[0] |
| |
| |
| file_b64 = base64.b64encode(file_bytes).decode() |
| |
| return jsonify({ |
| 'success': True, |
| 'filename': file.filename, |
| 'mime_type': mime_type, |
| 'data': file_b64 |
| }) |
| |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/chat_with_file', methods=['POST']) |
| def chat_with_file(): |
| try: |
| data = request.get_json() |
| message = data.get('message', '') |
| file_data = data.get('file_data') |
| thinking_enabled = data.get('thinking_enabled', True) |
| conversation_id = data.get('conversation_id', 'default') |
| |
| |
| display_message = message if message else 'Analyse ce fichier' |
| if file_data: |
| display_message += f" [Fichier: {file_data.get('filename', 'inconnu')}]" |
| add_message_to_history(conversation_id, 'user', display_message, has_file=True, file_data=file_data) |
| |
| |
| config_dict = DEFAULT_CONFIG.copy() |
| if thinking_enabled: |
| config_dict["thinking_config"] = types.ThinkingConfig( |
| thinking_budget=-1, |
| include_thoughts=True |
| ) |
| |
| config_dict["tools"] = DEFAULT_TOOLS |
| config_dict["system_instruction"] = SYSTEM_INSTRUCTION |
| generation_config = types.GenerateContentConfig(**config_dict) |
| |
| |
| if conversation_id not in conversations: |
| conversations[conversation_id] = client.chats.create( |
| model=MODEL, |
| config=generation_config |
| ) |
| |
| chat = conversations[conversation_id] |
| |
| |
| contents = [message] |
| |
| if file_data: |
| file_bytes = base64.b64decode(file_data['data']) |
| file_part = types.Part.from_bytes( |
| data=file_bytes, |
| mime_type=file_data['mime_type'] |
| ) |
| contents.append(file_part) |
| |
| |
| def generate(): |
| try: |
| response_stream = chat.send_message_stream( |
| contents, |
| config=generation_config |
| ) |
| |
| full_response = "" |
| |
| for chunk in response_stream: |
| for part in chunk.candidates[0].content.parts: |
| if part.text: |
| full_response += part.text |
| yield f"data: {json.dumps({'type': 'text', 'content': part.text})}\n\n" |
| if part.thought and thinking_enabled: |
| thoughts += part.text |
| |
| |
| if full_response: |
| add_message_to_history(conversation_id, 'assistant', full_response) |
| |
| yield f"data: {json.dumps({'type': 'end'})}\n\n" |
| |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n" |
| |
| return Response(generate(), mimetype='text/plain') |
| |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/reset_conversation', methods=['POST']) |
| def reset_conversation(): |
| try: |
| data = request.get_json() |
| conversation_id = data.get('conversation_id', 'default') |
| |
| if conversation_id in conversations: |
| del conversations[conversation_id] |
| |
| |
| if conversation_id in conversation_metadata: |
| conversation_metadata[conversation_id]['status'] = 'reset' |
| conversation_metadata[conversation_id]['last_activity'] = datetime.now().isoformat() |
| |
| return jsonify({'success': True}) |
| |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/admin/conversations/<conversation_id>', methods=['DELETE']) |
| def delete_conversation(conversation_id): |
| """Supprimer une conversation (pour l'admin)""" |
| try: |
| if conversation_id in conversations: |
| del conversations[conversation_id] |
| |
| if conversation_id in conversation_metadata: |
| del conversation_metadata[conversation_id] |
| |
| return jsonify({'success': True}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/admin/conversations/<conversation_id>/export') |
| def export_conversation(conversation_id): |
| """Exporter une conversation en JSON""" |
| try: |
| if conversation_id not in conversation_metadata: |
| return jsonify({'error': 'Conversation non trouvée'}), 404 |
| |
| conversation_data = conversation_metadata[conversation_id] |
| |
| return jsonify({ |
| 'conversation_id': conversation_id, |
| 'export_date': datetime.now().isoformat(), |
| 'data': conversation_data |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/admin/stats') |
| def get_admin_stats(): |
| """Statistiques détaillées pour l'admin""" |
| try: |
| |
| total_conversations = len(conversation_metadata) |
| total_messages = sum(len(conv['messages']) for conv in conversation_metadata.values()) |
| |
| |
| status_stats = {} |
| for conv in conversation_metadata.values(): |
| status = conv.get('status', 'active') |
| status_stats[status] = status_stats.get(status, 0) + 1 |
| |
| |
| conversations_with_files = sum(1 for conv in conversation_metadata.values() |
| if any(msg.get('hasFile') for msg in conv['messages'])) |
| |
| |
| from collections import defaultdict |
| daily_activity = defaultdict(int) |
| |
| for conv in conversation_metadata.values(): |
| for message in conv['messages']: |
| if message.get('timestamp'): |
| try: |
| date = datetime.fromisoformat(message['timestamp']).date() |
| daily_activity[date.isoformat()] += 1 |
| except: |
| continue |
| |
| return jsonify({ |
| 'total_conversations': total_conversations, |
| 'total_messages': total_messages, |
| 'status_distribution': status_stats, |
| 'conversations_with_files': conversations_with_files, |
| 'daily_activity': dict(daily_activity) |
| }) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(debug=True, host='0.0.0.0', port=5000) |