Titan / app.py
Riy777's picture
Rename app (38).py to app.py
80c39c8 verified
# titan_service.py
# (احفظ هذا الملف باسم app.py في مساحة Titan الجديدة)
import os
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from typing import Dict, List, Any
# استيراد محرك Titan الأصلي
from titan_engine import TitanEngine
# (نفترض أن نماذج Titan موجودة في "ml_models/layer2" داخل هذه المساحة)
MODEL_PATH = os.getenv("TITAN_MODEL_PATH", "layer2")
titan_engine: TitanEngine = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global titan_engine
print(f"🧠 [Titan Service] بدء التشغيل...")
print(f"🧠 [Titan Service] تحميل النموذج من {MODEL_PATH}...")
titan_engine = TitanEngine(model_dir=MODEL_PATH)
await titan_engine.initialize()
if titan_engine.initialized:
print("✅ [Titan Service] النموذج جاهز.")
else:
print("❌ [Titan Service] فشل تحميل النموذج!")
yield
# (كود إيقاف التشغيل)
print("🛑 [Titan Service] إيقاف التشغيل.")
titan_engine = None
app = FastAPI(lifespan=lifespan, title="Titan Model Service")
@app.get("/")
def root():
return {
"service": "Titan Model Service V1.1",
"model_initialized": titan_engine.initialized if titan_engine else False
}
@app.post("/analyze")
async def analyze_data(ohlcv_data: Dict[str, List[List[Any]]]):
"""
نقطة النهاية الرئيسية لتحليل بيانات الشموع.
"""
if not titan_engine or not titan_engine.initialized:
raise HTTPException(status_code=503, detail="Model is not ready")
try:
# (دالة predict في Titan V1.1 ليست async، لذا نستخدم to_thread)
result = await asyncio.to_thread(titan_engine.predict, ohlcv_data)
return result
except Exception as e:
print(f"❌ [Titan Service] خطأ أثناء التحليل: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Internal error: {e}")
if __name__ == "__main__":
import uvicorn
# (يجب تعديل هذا إذا كان ملف titan_engine.py في مجلد فرعي)
# (نفترض أن الهيكل مسطح أو أن PYTHONPATH معدل)
uvicorn.run(app, host="0.0.0.0", port=8001)