download
raw
12.2 kB
import time
import random
import math
from celery import shared_task
import json
import numpy as np
import matplotlib.pyplot as plt
def clean_for_json(obj):
"""Convertit les objets numpy en types Python valides pour JSON."""
if isinstance(obj, np.ndarray):
return [clean_for_json(x) for x in obj.tolist()]
elif isinstance(obj, (np.integer,)):
return int(obj)
elif isinstance(obj, (np.floating,)):
val = float(obj)
if math.isnan(val) or math.isinf(val):
return 0.0
return val
elif isinstance(obj, dict):
return {k: clean_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [clean_for_json(x) for x in obj]
elif isinstance(obj, (int, float, str, bool, type(None))):
return obj
else:
return obj
def run_monte_carlo_simulation(params):
"""Simulation Monte Carlo pour estimation de Pi."""
n_points = params.get('n_points', 10000)
seed = params.get('seed', 42)
random.seed(seed)
inside_circle = 0
for i in range(n_points):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
inside_circle += 1
if (i + 1) % (n_points // 10) == 0:
progress = int((i + 1) / n_points * 100)
yield progress
pi_estimate = 4 * inside_circle / n_points
return {
'pi_estimate': clean_for_json(pi_estimate),
'exact_pi': math.pi,
'error': clean_for_json(abs(pi_estimate - math.pi)),
'n_points': n_points,
'inside_circle': inside_circle
}
def run_diffusion_1d_simulation(params):
"""Simulation de diffusion 1D explicite."""
L = params.get('length', 1.0)
T = params.get('time', 0.1)
nx = params.get('nx', 50)
nt = params.get('nt', 100)
D = params.get('diffusion_coef', 1.0)
dx = L / (nx - 1)
dt = T / nt
u = [0.0] * nx
u[0] = 1.0
u[-1] = 0.0
for j in range(nt):
u_old = u.copy()
for i in range(1, nx - 1):
u[i] = u_old[i] + D * dt / (dx * dx) * (
u_old[i+1] - 2*u_old[i] + u_old[i-1]
)
if (j + 1) % (nt // 10) == 0:
progress = int((j + 1) / nt * 100)
yield progress
return {
'final_state': clean_for_json(u),
'dx': clean_for_json(dx),
'dt': clean_for_json(dt),
'nx': nx,
'nt': nt,
'max_value': clean_for_json(max(u)),
'min_value': clean_for_json(min(u))
}
def run_linear_solve_simulation(params):
"""Résolution de système linéaire Ax = b."""
size = params.get('size', 100)
np.random.seed(params.get('seed', 42))
A = np.random.rand(size, size) + size * np.eye(size)
b = np.random.rand(size)
for i in range(5):
time.sleep(0.5)
yield (i + 1) * 20
x = np.linalg.solve(A, b)
residual = np.linalg.norm(A.dot(x) - b)
return {
'solution_norm': clean_for_json(np.linalg.norm(x)),
'residual': clean_for_json(residual),
'condition_number': clean_for_json(np.linalg.cond(A)),
'size': size
}
def run_heat_conduction_simulation(params):
"""Simulation de conduction thermique Newton (finite_difference)."""
T_0 = params.get('T_initial', 100.0)
T_s = params.get('T_surface', 20.0)
k = params.get('conductivity', 0.1)
dt = params.get('dt', 0.001)
N = params.get('N', 1000)
theta = params.get('theta', 0.5)
T = np.zeros(N)
T[0] = T_0
for i in range(1, N):
T[i] = (1 - (1 - theta) * dt * k) * T[i-1] / (1 + theta * dt * k) - k * dt * T_s / (1 + theta * dt * k)
if (i + 1) % (N // 10) == 0:
progress = int((i + 1) / N * 100)
yield progress
T_analytical = (T_0 - T_s) * np.exp(-k * np.arange(N) * dt) + T_s
error = np.max(np.abs(T - T_analytical))
return {
'final_T': clean_for_json(T.tolist()),
'T_analytical': clean_for_json(T_analytical.tolist()),
'error_max': clean_for_json(error),
'T_initial': T_0,
'T_surface': T_s,
'k': k,
'dt': dt,
'theta': theta,
'N': N,
'time_final': clean_for_json(N * dt)
}
def run_traffic_flow_simulation(params):
"""Modèle de trafic 1D (numerical-mooc)."""
nx = params.get('nx', 100)
nt = params.get('nt', 200)
L = params.get('length', 10.0)
T = params.get('time', 2.0)
u_max = params.get('u_max', 1.0)
rho_max = params.get('rho_max', 1.0)
dx = L / nx
dt = T / nt
x = np.linspace(0, L, nx)
t = np.linspace(0, T, nt)
rho = np.zeros((nt, nx))
rho[0] = rho_red_light(x, rho_max)
for n in range(nt - 1):
for i in range(nx):
F_ip = flux(rho[n, (i+1) % nx], u_max, rho_max)
F_i = flux(rho[n, i], u_max, rho_max)
rho[n+1, i] = rho[n, i] + dt / dx * (F_i - F_ip)
if (n + 1) % (nt // 10) == 0:
progress = int((n + 1) / nt * 100)
yield progress
return {
'density_matrix': clean_for_json(rho.tolist()),
'x_grid': clean_for_json(x.tolist()),
't_grid': clean_for_json(t.tolist()),
'u_max': u_max,
'rho_max': rho_max,
'max_density': clean_for_json(float(np.max(rho))),
'avg_density': clean_for_json(float(np.mean(rho)))
}
def rho_red_light(x, rho_max):
"""Condition initiale 'red light' avec choc."""
rho = rho_max * np.ones_like(x)
mask = np.where(x < 3.0)
rho[mask] = 0.5 * rho_max
return rho
def flux(rho, u_max, rho_max):
"""Flux de trafic F = V * rho."""
return rho * u_max * (1.0 - rho / rho_max)
def run_phugoid_simulation(params):
"""Trajectoire de vol phugoid (numerical-mooc)."""
zt = params.get('zt', 1.0)
z0 = params.get('z0', 2.0)
theta0 = params.get('theta0', 5.0)
N = params.get('N', 1000)
theta0_rad = np.radians(theta0)
if z0 <= 0:
z0 = 0.1
try:
C = (np.cos(theta0_rad) - 1/3 * z0 / zt) * (z0 / zt)**0.5
except (ZeroDivisionError, RuntimeWarning):
C = 0.0
x = np.zeros(N)
z = np.zeros(N)
theta = theta0_rad
for i in range(1, N):
try:
normal = np.array([+ np.cos(theta + np.pi/2.0), - np.sin(theta + np.pi/2.0)])
R = radius_of_curvature(z[i-1], zt, C)
if abs(R) < 0.001:
R = 0.001
center = np.array([x[i-1], z[i-1]]) + R * normal
ds = 1.0
dtheta = ds / R
x[i], z[i] = rotate((x[i-1], z[i-1]), center=center, angle=dtheta, mode='radians')
theta += dtheta
except (RuntimeWarning, FloatingPointError):
break
if i % (N // 10) == 0:
yield int(i / N * 100)
return {
'x': clean_for_json(x.tolist()),
'z': clean_for_json(z.tolist()),
'theta_final': clean_for_json(float(np.degrees(theta))),
'C': clean_for_json(float(C)),
'zt': zt,
'z0': z0,
'theta0': theta0
}
def radius_of_curvature(z, zt, C):
"""Rayon de courbure de la trajectoire."""
if z <= 0 or abs(1/3 - C / 2 * (zt / z)**1.5) < 1e-10:
return zt * 1000
return zt / (1 / 3 - C / 2 * (zt / z)**1.5)
def rotate(coords, center=(0.0, 0.0), angle=0.0, mode='degrees'):
"""Rotation d'un point autour d'un centre."""
x, z = coords
xc, zc = center
if mode == 'degrees':
angle = np.radians(angle)
x_new = xc + (x - xc) * np.cos(angle) + (z - zc) * np.sin(angle)
z_new = zc - (x - xc) * np.sin(angle) + (z - zc) * np.cos(angle)
return x_new, z_new
def run_elasticity_dangvan_simulation(params):
"""Tenseur deviatoire et critère de Tresca (DangVan)."""
tensor_input = params.get('tensor', [100, 0, 0, 0, 50, 0, 0, 0, 30])
sigma = np.array(tensor_input).reshape(3, 3)
trace = np.trace(sigma)
hydrostatic = (trace / 3) * np.eye(3)
deviator = sigma - hydrostatic
eigenvals = np.linalg.eigvals(sigma)
yield 30
tresca = np.max(eigenvals) - np.min(eigenvals)
von_mises = np.sqrt(3/2 * np.sum(deviator**2))
yield 60
yield 100
return {
'tensor_matrix': clean_for_json(sigma.tolist()),
'deviator_tensor': clean_for_json(deviator.tolist()),
'principal_stresses': clean_for_json(eigenvals.tolist()),
'trace': clean_for_json(float(trace)),
'hydrostatic_pressure': clean_for_json(float(trace / 3)),
'tresca': clean_for_json(float(tresca)),
'von_mises': clean_for_json(float(von_mises))
}
def run_wave_equation_simulation(params):
"""Résolution de l'équation d'onde 1D."""
L = params.get('length', 1.0)
T = params.get('time', 1.0)
nx = params.get('nx', 100)
nt = params.get('nt', 500)
c = params.get('wave_speed', 1.0)
dx = L / nx
dt = T / nt
r = c * dt / dx
if r > 1.0:
r = 0.9
x = np.linspace(0, L, nx)
t = np.linspace(0, T, nt)
u = np.zeros((nt, nx))
u[0] = np.sin(np.pi * x / L)
u[1] = 0.5 * u[0].copy()
for n in range(1, nt - 1):
for i in range(1, nx - 1):
u[n+1, i] = 2*(1 - r**2) * u[n, i] - u[n-1, i] + r**2 * (u[n, i+1] + u[n, i-1])
u[n+1, 0] = 0
u[n+1, -1] = 0
if (n + 1) % (nt // 10) == 0:
progress = int((n + 1) / nt * 100)
yield progress
max_displacement = clean_for_json(float(np.max(np.abs(u))))
return {
'displacement_matrix': clean_for_json(u.tolist()),
'x_grid': clean_for_json(x.tolist()),
't_grid': clean_for_json(t.tolist()),
'wave_speed': c,
'dx': clean_for_json(dx),
'dt': clean_for_json(dt),
'CFL': clean_for_json(r),
'max_displacement': max_displacement
}
SIMULATION_METHODS = {
'monte-carlo-pi': run_monte_carlo_simulation,
'diffusion-1d': run_diffusion_1d_simulation,
'linear-solve': run_linear_solve_simulation,
'heat-conduction': run_heat_conduction_simulation,
'traffic-flow': run_traffic_flow_simulation,
'phugoid': run_phugoid_simulation,
'elasticity-dangvan': run_elasticity_dangvan_simulation,
'wave-equation': run_wave_equation_simulation,
}
@shared_task(bind=True)
def run_simulation(self, run_id):
"""Tâche Celery pour exécuter une simulation."""
from simulations.models import SimulationRun
try:
run = SimulationRun.objects.get(pk=run_id)
run.set_running()
run.add_log(f"Début de la simulation {run_id}")
run.add_log(f"Paramètres: {json.dumps(run.parameters)}")
method_func = SIMULATION_METHODS.get(run.method.slug)
if not method_func:
raise ValueError(f"Méthode inconnue: {run.method.slug}")
gen = method_func(run.parameters)
result = None
try:
while True:
progress = next(gen)
run.progress = progress
run.save(update_fields=['progress'])
except StopIteration as e:
result = e.value
if result is None:
result = {}
result = clean_for_json(result)
run.set_success(result)
run.add_log(f"Simulation terminée avec succès")
run.add_log(f"Résultat: {json.dumps(result)}")
except Exception as e:
if 'run' in locals():
run.set_failure(str(e))
run.add_log(f"Erreur: {str(e)}")
raise
@shared_task
def cleanup_old_runs(days=7):
"""Nettoie les simulations anciennes."""
from simulations.models import SimulationRun
from django.utils import timezone
from datetime import timedelta
cutoff = timezone.now() - timedelta(days=days)
old_runs = SimulationRun.objects.filter(
created_at__lt=cutoff,
status__in=['SUCCESS', 'FAILURE', 'CANCELLED']
)
count = old_runs.count()
old_runs.delete()
return f"Supprimé {count} simulations anciennes"

Xet Storage Details

Size:
12.2 kB
·
Xet hash:
278ca3392891d707d8e391f784644a2fbf140a88e12d7b17664cfa91016486c3

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.