Buckets:
| import time | |
| import random | |
| import math | |
| from celery import shared_task | |
| import json | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| def clean_for_json(obj): | |
| """Convertit les objets numpy en types Python valides pour JSON.""" | |
| if isinstance(obj, np.ndarray): | |
| return [clean_for_json(x) for x in obj.tolist()] | |
| elif isinstance(obj, (np.integer,)): | |
| return int(obj) | |
| elif isinstance(obj, (np.floating,)): | |
| val = float(obj) | |
| if math.isnan(val) or math.isinf(val): | |
| return 0.0 | |
| return val | |
| elif isinstance(obj, dict): | |
| return {k: clean_for_json(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [clean_for_json(x) for x in obj] | |
| elif isinstance(obj, (int, float, str, bool, type(None))): | |
| return obj | |
| else: | |
| return obj | |
| def run_monte_carlo_simulation(params): | |
| """Simulation Monte Carlo pour estimation de Pi.""" | |
| n_points = params.get('n_points', 10000) | |
| seed = params.get('seed', 42) | |
| random.seed(seed) | |
| inside_circle = 0 | |
| for i in range(n_points): | |
| x = random.random() | |
| y = random.random() | |
| if x*x + y*y <= 1: | |
| inside_circle += 1 | |
| if (i + 1) % (n_points // 10) == 0: | |
| progress = int((i + 1) / n_points * 100) | |
| yield progress | |
| pi_estimate = 4 * inside_circle / n_points | |
| return { | |
| 'pi_estimate': clean_for_json(pi_estimate), | |
| 'exact_pi': math.pi, | |
| 'error': clean_for_json(abs(pi_estimate - math.pi)), | |
| 'n_points': n_points, | |
| 'inside_circle': inside_circle | |
| } | |
| def run_diffusion_1d_simulation(params): | |
| """Simulation de diffusion 1D explicite.""" | |
| L = params.get('length', 1.0) | |
| T = params.get('time', 0.1) | |
| nx = params.get('nx', 50) | |
| nt = params.get('nt', 100) | |
| D = params.get('diffusion_coef', 1.0) | |
| dx = L / (nx - 1) | |
| dt = T / nt | |
| u = [0.0] * nx | |
| u[0] = 1.0 | |
| u[-1] = 0.0 | |
| for j in range(nt): | |
| u_old = u.copy() | |
| for i in range(1, nx - 1): | |
| u[i] = u_old[i] + D * dt / (dx * dx) * ( | |
| u_old[i+1] - 2*u_old[i] + u_old[i-1] | |
| ) | |
| if (j + 1) % (nt // 10) == 0: | |
| progress = int((j + 1) / nt * 100) | |
| yield progress | |
| return { | |
| 'final_state': clean_for_json(u), | |
| 'dx': clean_for_json(dx), | |
| 'dt': clean_for_json(dt), | |
| 'nx': nx, | |
| 'nt': nt, | |
| 'max_value': clean_for_json(max(u)), | |
| 'min_value': clean_for_json(min(u)) | |
| } | |
| def run_linear_solve_simulation(params): | |
| """Résolution de système linéaire Ax = b.""" | |
| size = params.get('size', 100) | |
| np.random.seed(params.get('seed', 42)) | |
| A = np.random.rand(size, size) + size * np.eye(size) | |
| b = np.random.rand(size) | |
| for i in range(5): | |
| time.sleep(0.5) | |
| yield (i + 1) * 20 | |
| x = np.linalg.solve(A, b) | |
| residual = np.linalg.norm(A.dot(x) - b) | |
| return { | |
| 'solution_norm': clean_for_json(np.linalg.norm(x)), | |
| 'residual': clean_for_json(residual), | |
| 'condition_number': clean_for_json(np.linalg.cond(A)), | |
| 'size': size | |
| } | |
| def run_heat_conduction_simulation(params): | |
| """Simulation de conduction thermique Newton (finite_difference).""" | |
| T_0 = params.get('T_initial', 100.0) | |
| T_s = params.get('T_surface', 20.0) | |
| k = params.get('conductivity', 0.1) | |
| dt = params.get('dt', 0.001) | |
| N = params.get('N', 1000) | |
| theta = params.get('theta', 0.5) | |
| T = np.zeros(N) | |
| T[0] = T_0 | |
| for i in range(1, N): | |
| T[i] = (1 - (1 - theta) * dt * k) * T[i-1] / (1 + theta * dt * k) - k * dt * T_s / (1 + theta * dt * k) | |
| if (i + 1) % (N // 10) == 0: | |
| progress = int((i + 1) / N * 100) | |
| yield progress | |
| T_analytical = (T_0 - T_s) * np.exp(-k * np.arange(N) * dt) + T_s | |
| error = np.max(np.abs(T - T_analytical)) | |
| return { | |
| 'final_T': clean_for_json(T.tolist()), | |
| 'T_analytical': clean_for_json(T_analytical.tolist()), | |
| 'error_max': clean_for_json(error), | |
| 'T_initial': T_0, | |
| 'T_surface': T_s, | |
| 'k': k, | |
| 'dt': dt, | |
| 'theta': theta, | |
| 'N': N, | |
| 'time_final': clean_for_json(N * dt) | |
| } | |
| def run_traffic_flow_simulation(params): | |
| """Modèle de trafic 1D (numerical-mooc).""" | |
| nx = params.get('nx', 100) | |
| nt = params.get('nt', 200) | |
| L = params.get('length', 10.0) | |
| T = params.get('time', 2.0) | |
| u_max = params.get('u_max', 1.0) | |
| rho_max = params.get('rho_max', 1.0) | |
| dx = L / nx | |
| dt = T / nt | |
| x = np.linspace(0, L, nx) | |
| t = np.linspace(0, T, nt) | |
| rho = np.zeros((nt, nx)) | |
| rho[0] = rho_red_light(x, rho_max) | |
| for n in range(nt - 1): | |
| for i in range(nx): | |
| F_ip = flux(rho[n, (i+1) % nx], u_max, rho_max) | |
| F_i = flux(rho[n, i], u_max, rho_max) | |
| rho[n+1, i] = rho[n, i] + dt / dx * (F_i - F_ip) | |
| if (n + 1) % (nt // 10) == 0: | |
| progress = int((n + 1) / nt * 100) | |
| yield progress | |
| return { | |
| 'density_matrix': clean_for_json(rho.tolist()), | |
| 'x_grid': clean_for_json(x.tolist()), | |
| 't_grid': clean_for_json(t.tolist()), | |
| 'u_max': u_max, | |
| 'rho_max': rho_max, | |
| 'max_density': clean_for_json(float(np.max(rho))), | |
| 'avg_density': clean_for_json(float(np.mean(rho))) | |
| } | |
| def rho_red_light(x, rho_max): | |
| """Condition initiale 'red light' avec choc.""" | |
| rho = rho_max * np.ones_like(x) | |
| mask = np.where(x < 3.0) | |
| rho[mask] = 0.5 * rho_max | |
| return rho | |
| def flux(rho, u_max, rho_max): | |
| """Flux de trafic F = V * rho.""" | |
| return rho * u_max * (1.0 - rho / rho_max) | |
| def run_phugoid_simulation(params): | |
| """Trajectoire de vol phugoid (numerical-mooc).""" | |
| zt = params.get('zt', 1.0) | |
| z0 = params.get('z0', 2.0) | |
| theta0 = params.get('theta0', 5.0) | |
| N = params.get('N', 1000) | |
| theta0_rad = np.radians(theta0) | |
| if z0 <= 0: | |
| z0 = 0.1 | |
| try: | |
| C = (np.cos(theta0_rad) - 1/3 * z0 / zt) * (z0 / zt)**0.5 | |
| except (ZeroDivisionError, RuntimeWarning): | |
| C = 0.0 | |
| x = np.zeros(N) | |
| z = np.zeros(N) | |
| theta = theta0_rad | |
| for i in range(1, N): | |
| try: | |
| normal = np.array([+ np.cos(theta + np.pi/2.0), - np.sin(theta + np.pi/2.0)]) | |
| R = radius_of_curvature(z[i-1], zt, C) | |
| if abs(R) < 0.001: | |
| R = 0.001 | |
| center = np.array([x[i-1], z[i-1]]) + R * normal | |
| ds = 1.0 | |
| dtheta = ds / R | |
| x[i], z[i] = rotate((x[i-1], z[i-1]), center=center, angle=dtheta, mode='radians') | |
| theta += dtheta | |
| except (RuntimeWarning, FloatingPointError): | |
| break | |
| if i % (N // 10) == 0: | |
| yield int(i / N * 100) | |
| return { | |
| 'x': clean_for_json(x.tolist()), | |
| 'z': clean_for_json(z.tolist()), | |
| 'theta_final': clean_for_json(float(np.degrees(theta))), | |
| 'C': clean_for_json(float(C)), | |
| 'zt': zt, | |
| 'z0': z0, | |
| 'theta0': theta0 | |
| } | |
| def radius_of_curvature(z, zt, C): | |
| """Rayon de courbure de la trajectoire.""" | |
| if z <= 0 or abs(1/3 - C / 2 * (zt / z)**1.5) < 1e-10: | |
| return zt * 1000 | |
| return zt / (1 / 3 - C / 2 * (zt / z)**1.5) | |
| def rotate(coords, center=(0.0, 0.0), angle=0.0, mode='degrees'): | |
| """Rotation d'un point autour d'un centre.""" | |
| x, z = coords | |
| xc, zc = center | |
| if mode == 'degrees': | |
| angle = np.radians(angle) | |
| x_new = xc + (x - xc) * np.cos(angle) + (z - zc) * np.sin(angle) | |
| z_new = zc - (x - xc) * np.sin(angle) + (z - zc) * np.cos(angle) | |
| return x_new, z_new | |
| def run_elasticity_dangvan_simulation(params): | |
| """Tenseur deviatoire et critère de Tresca (DangVan).""" | |
| tensor_input = params.get('tensor', [100, 0, 0, 0, 50, 0, 0, 0, 30]) | |
| sigma = np.array(tensor_input).reshape(3, 3) | |
| trace = np.trace(sigma) | |
| hydrostatic = (trace / 3) * np.eye(3) | |
| deviator = sigma - hydrostatic | |
| eigenvals = np.linalg.eigvals(sigma) | |
| yield 30 | |
| tresca = np.max(eigenvals) - np.min(eigenvals) | |
| von_mises = np.sqrt(3/2 * np.sum(deviator**2)) | |
| yield 60 | |
| yield 100 | |
| return { | |
| 'tensor_matrix': clean_for_json(sigma.tolist()), | |
| 'deviator_tensor': clean_for_json(deviator.tolist()), | |
| 'principal_stresses': clean_for_json(eigenvals.tolist()), | |
| 'trace': clean_for_json(float(trace)), | |
| 'hydrostatic_pressure': clean_for_json(float(trace / 3)), | |
| 'tresca': clean_for_json(float(tresca)), | |
| 'von_mises': clean_for_json(float(von_mises)) | |
| } | |
| def run_wave_equation_simulation(params): | |
| """Résolution de l'équation d'onde 1D.""" | |
| L = params.get('length', 1.0) | |
| T = params.get('time', 1.0) | |
| nx = params.get('nx', 100) | |
| nt = params.get('nt', 500) | |
| c = params.get('wave_speed', 1.0) | |
| dx = L / nx | |
| dt = T / nt | |
| r = c * dt / dx | |
| if r > 1.0: | |
| r = 0.9 | |
| x = np.linspace(0, L, nx) | |
| t = np.linspace(0, T, nt) | |
| u = np.zeros((nt, nx)) | |
| u[0] = np.sin(np.pi * x / L) | |
| u[1] = 0.5 * u[0].copy() | |
| for n in range(1, nt - 1): | |
| for i in range(1, nx - 1): | |
| u[n+1, i] = 2*(1 - r**2) * u[n, i] - u[n-1, i] + r**2 * (u[n, i+1] + u[n, i-1]) | |
| u[n+1, 0] = 0 | |
| u[n+1, -1] = 0 | |
| if (n + 1) % (nt // 10) == 0: | |
| progress = int((n + 1) / nt * 100) | |
| yield progress | |
| max_displacement = clean_for_json(float(np.max(np.abs(u)))) | |
| return { | |
| 'displacement_matrix': clean_for_json(u.tolist()), | |
| 'x_grid': clean_for_json(x.tolist()), | |
| 't_grid': clean_for_json(t.tolist()), | |
| 'wave_speed': c, | |
| 'dx': clean_for_json(dx), | |
| 'dt': clean_for_json(dt), | |
| 'CFL': clean_for_json(r), | |
| 'max_displacement': max_displacement | |
| } | |
| SIMULATION_METHODS = { | |
| 'monte-carlo-pi': run_monte_carlo_simulation, | |
| 'diffusion-1d': run_diffusion_1d_simulation, | |
| 'linear-solve': run_linear_solve_simulation, | |
| 'heat-conduction': run_heat_conduction_simulation, | |
| 'traffic-flow': run_traffic_flow_simulation, | |
| 'phugoid': run_phugoid_simulation, | |
| 'elasticity-dangvan': run_elasticity_dangvan_simulation, | |
| 'wave-equation': run_wave_equation_simulation, | |
| } | |
| def run_simulation(self, run_id): | |
| """Tâche Celery pour exécuter une simulation.""" | |
| from simulations.models import SimulationRun | |
| try: | |
| run = SimulationRun.objects.get(pk=run_id) | |
| run.set_running() | |
| run.add_log(f"Début de la simulation {run_id}") | |
| run.add_log(f"Paramètres: {json.dumps(run.parameters)}") | |
| method_func = SIMULATION_METHODS.get(run.method.slug) | |
| if not method_func: | |
| raise ValueError(f"Méthode inconnue: {run.method.slug}") | |
| gen = method_func(run.parameters) | |
| result = None | |
| try: | |
| while True: | |
| progress = next(gen) | |
| run.progress = progress | |
| run.save(update_fields=['progress']) | |
| except StopIteration as e: | |
| result = e.value | |
| if result is None: | |
| result = {} | |
| result = clean_for_json(result) | |
| run.set_success(result) | |
| run.add_log(f"Simulation terminée avec succès") | |
| run.add_log(f"Résultat: {json.dumps(result)}") | |
| except Exception as e: | |
| if 'run' in locals(): | |
| run.set_failure(str(e)) | |
| run.add_log(f"Erreur: {str(e)}") | |
| raise | |
| def cleanup_old_runs(days=7): | |
| """Nettoie les simulations anciennes.""" | |
| from simulations.models import SimulationRun | |
| from django.utils import timezone | |
| from datetime import timedelta | |
| cutoff = timezone.now() - timedelta(days=days) | |
| old_runs = SimulationRun.objects.filter( | |
| created_at__lt=cutoff, | |
| status__in=['SUCCESS', 'FAILURE', 'CANCELLED'] | |
| ) | |
| count = old_runs.count() | |
| old_runs.delete() | |
| return f"Supprimé {count} simulations anciennes" | |
Xet Storage Details
- Size:
- 12.2 kB
- Xet hash:
- 278ca3392891d707d8e391f784644a2fbf140a88e12d7b17664cfa91016486c3
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.