|
|
import os |
|
|
import cv2 |
|
|
import gradio as gr |
|
|
import google.generativeai as genai |
|
|
from ultralytics import YOLO |
|
|
import tempfile |
|
|
import torch |
|
|
import spaces |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import arabic_reshaper |
|
|
from bidi.algorithm import get_display |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GEMINI_API_KEY = "AIzaSyAvm28ZnTMaZ1Jtg9sYM-EO4qlAN2W4BIQ" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
SYSTEM_PROMPT = ( |
|
|
"لدي نص خام عبارة عن حروف عربية متتابعة بدون مسافات " |
|
|
"ومع وجود تكرار بسيط لأنه ناتج من مترجم لغة الإشارة.\n" |
|
|
"مهمتك:\n" |
|
|
"1) إزالة التكرار غير الضروري.\n" |
|
|
"2) إضافة المسافات بين الكلمات.\n" |
|
|
"3) إخراج الجملة الأقرب للمعنى.\n" |
|
|
"أعد النص فقط بدون شرح." |
|
|
) |
|
|
|
|
|
def fix_with_gemini(raw_text: str) -> str: |
|
|
if not raw_text: |
|
|
return "" |
|
|
try: |
|
|
model = genai.GenerativeModel("models/gemini-2.5-flash") |
|
|
prompt = SYSTEM_PROMPT + f"\n\nالنص الخام:\n«{raw_text}»" |
|
|
resp = model.generate_content(prompt) |
|
|
return (resp.text or "").strip() |
|
|
except Exception as e: |
|
|
return f"خطأ في Gemini: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WEIGHTS_PATH = "best.pt" |
|
|
IMG_SIZE = 1080 |
|
|
CONF_THRESHOLD = 0.15 |
|
|
|
|
|
MIN_STABLE_FRAMES = 1 |
|
|
FRAME_SKIP = 1 |
|
|
MAX_FRAMES = 1000 |
|
|
WORD_GAP_FRAMES = 10 |
|
|
|
|
|
CENTER_CROP = True |
|
|
|
|
|
arabic_map = { |
|
|
"aleff": "ا", |
|
|
"bb": "ب", |
|
|
"ta": "ت", |
|
|
"taa": "ت", |
|
|
"thaa": "ث", |
|
|
"jeem": "ج", |
|
|
"haa": "ح", |
|
|
"khaa": "خ", |
|
|
"dal": "د", |
|
|
"dha": "ظ", |
|
|
"dhad": "ض", |
|
|
"fa": "ف", |
|
|
"gaaf": "ق", |
|
|
"ghain": "غ", |
|
|
"ha": "ه", |
|
|
"kaaf": "ك", |
|
|
"laam": "ل", |
|
|
"meem": "م", |
|
|
"nun": "ن", |
|
|
"ra": "ر", |
|
|
"saad": "ص", |
|
|
"seen": "س", |
|
|
"sheen": "ش", |
|
|
"thal": "ذ", |
|
|
"toot": "ة", |
|
|
"waw": "و", |
|
|
"ya": "ي", |
|
|
"yaa": "ي", |
|
|
"zay": "ز", |
|
|
"ain": "ع", |
|
|
"al": "ال", |
|
|
"la": "لا", |
|
|
} |
|
|
|
|
|
yolo_model = None |
|
|
DEVICE = "cpu" |
|
|
|
|
|
def get_model(): |
|
|
global yolo_model, DEVICE |
|
|
|
|
|
if yolo_model is None: |
|
|
print("🔹 Loading YOLO model...") |
|
|
yolo_model = YOLO(WEIGHTS_PATH) |
|
|
print("📚 Classes:", yolo_model.names) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
if DEVICE != "cuda": |
|
|
DEVICE = "cuda" |
|
|
try: |
|
|
yolo_model.to(DEVICE) |
|
|
print("✅ YOLO model moved to cuda") |
|
|
except Exception as e: |
|
|
print("⚠️ تعذر نقل الموديل إلى cuda:", e) |
|
|
else: |
|
|
if DEVICE != "cpu": |
|
|
print("⚠️ CUDA غير متوفر، سيتم استخدام CPU.") |
|
|
DEVICE = "cpu" |
|
|
|
|
|
return yolo_model |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FONT_PATH = os.path.join(os.path.dirname(__file__), "NotoNaskhArabic-VariableFont_wght.ttf") |
|
|
|
|
|
def draw_arabic_text(frame_bgr, text, x, y, font_size=36, bgr_color=(0, 255, 0)): |
|
|
img = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype(FONT_PATH, font_size) |
|
|
except Exception as e: |
|
|
print("⚠️ خطأ تحميل الخط العربي:", e) |
|
|
font = ImageFont.load_default() |
|
|
|
|
|
shaped = arabic_reshaper.reshape(text) |
|
|
rtl_text = get_display(shaped) |
|
|
|
|
|
rgb_color = (bgr_color[2], bgr_color[1], bgr_color[0]) |
|
|
draw.text((x, y), rtl_text, font=font, fill=rgb_color) |
|
|
|
|
|
return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def resize_and_center_crop(frame, target: int = 640): |
|
|
h, w = frame.shape[:2] |
|
|
short_side = min(w, h) |
|
|
if short_side <= 0: |
|
|
return frame |
|
|
|
|
|
scale = target / short_side |
|
|
new_w = int(w * scale) |
|
|
new_h = int(h * scale) |
|
|
|
|
|
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
x1 = max(0, (w - target) // 2) |
|
|
y1 = max(0, (h - target) // 2) |
|
|
x2 = min(x1 + target, w) |
|
|
y2 = min(y1 + target, h) |
|
|
|
|
|
crop = frame[y1:y2, x1:x2] |
|
|
|
|
|
ch, cw = crop.shape[:2] |
|
|
if ch != target or cw != target: |
|
|
crop = cv2.resize(crop, (target, target), interpolation=cv2.INTER_AREA) |
|
|
|
|
|
return crop |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def preprocess_video(input_path: str, target_short_side: int = 1080, target_fps: int = 8) -> str: |
|
|
cap = cv2.VideoCapture(input_path) |
|
|
if not cap.isOpened(): |
|
|
print("[preprocess] تعذر فتح الفيديو، سنستخدم الملف الأصلي كما هو.") |
|
|
return input_path |
|
|
|
|
|
orig_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
if orig_fps <= 0: |
|
|
frame_step = 1 |
|
|
out_fps = float(target_fps) |
|
|
else: |
|
|
frame_step = max(1, int(round(orig_fps / target_fps))) |
|
|
out_fps = orig_fps / frame_step |
|
|
|
|
|
short_side = min(w, h) |
|
|
scale = 1.0 if short_side <= 0 else (target_short_side / short_side) |
|
|
new_w = int(w * scale) |
|
|
new_h = int(h * scale) |
|
|
|
|
|
fd, tmp_path = tempfile.mkstemp(suffix=".mp4") |
|
|
os.close(fd) |
|
|
|
|
|
out_w, out_h = (IMG_SIZE, IMG_SIZE) if CENTER_CROP else (new_w, new_h) |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
out = cv2.VideoWriter(tmp_path, fourcc, out_fps, (out_w, out_h)) |
|
|
|
|
|
frame_idx = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_idx % frame_step == 0: |
|
|
if CENTER_CROP: |
|
|
processed = resize_and_center_crop(frame, target=IMG_SIZE) |
|
|
else: |
|
|
processed = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
out.write(processed) |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
print(f"[preprocess] orig=({w}x{h}), new=({out_w}x{out_h}), saved={tmp_path}") |
|
|
return tmp_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_frame(frame_bgr): |
|
|
model = get_model() |
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
|
|
result = model.predict( |
|
|
frame_rgb, |
|
|
conf=CONF_THRESHOLD, |
|
|
imgsz=IMG_SIZE, |
|
|
verbose=False, |
|
|
device=DEVICE, |
|
|
)[0] |
|
|
|
|
|
boxes = result.boxes |
|
|
num_boxes = 0 if boxes is None else len(boxes) |
|
|
print(f"[detect_frame] boxes={num_boxes}") |
|
|
|
|
|
if boxes is None or len(boxes) == 0: |
|
|
return [], frame_bgr |
|
|
|
|
|
labels = [] |
|
|
for box in boxes: |
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
cls_id = int(box.cls[0]) |
|
|
|
|
|
if isinstance(model.names, dict): |
|
|
eng = model.names.get(cls_id, str(cls_id)) |
|
|
else: |
|
|
eng = model.names[cls_id] if cls_id < len(model.names) else str(cls_id) |
|
|
|
|
|
letter = arabic_map.get(eng, eng) |
|
|
labels.append(letter) |
|
|
|
|
|
cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
frame_bgr = draw_arabic_text(frame_bgr, letter, x1, max(0, y1 - 45), font_size=36) |
|
|
|
|
|
return labels, frame_bgr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_and_render(video_path: str): |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return "", None, "تعذر فتح الفيديو في extract_and_render" |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
out_path = "processed_output.mp4" |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
if fps <= 0: |
|
|
fps = 8.0 |
|
|
|
|
|
out = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) |
|
|
|
|
|
word = "" |
|
|
words = [] |
|
|
last_label = None |
|
|
last_added = None |
|
|
stable = 0 |
|
|
last_seen = None |
|
|
frame_index = 0 |
|
|
|
|
|
frames_with_dets = 0 |
|
|
debug_lines = [] |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
frame_index += 1 |
|
|
if frame_index > MAX_FRAMES: |
|
|
break |
|
|
|
|
|
if FRAME_SKIP > 1 and frame_index % FRAME_SKIP != 0: |
|
|
continue |
|
|
|
|
|
frame = cv2.flip(frame, 1) |
|
|
labels, rendered = detect_frame(frame) |
|
|
out.write(rendered) |
|
|
|
|
|
if labels: |
|
|
frames_with_dets += 1 |
|
|
debug_lines.append(f"frame {frame_index}: {labels}") |
|
|
|
|
|
label = labels[0] |
|
|
last_seen = frame_index |
|
|
|
|
|
if label == last_label: |
|
|
stable += 1 |
|
|
else: |
|
|
last_label = label |
|
|
stable = 1 |
|
|
|
|
|
if stable >= MIN_STABLE_FRAMES: |
|
|
if label != last_added: |
|
|
word += label |
|
|
last_added = label |
|
|
stable = 0 |
|
|
else: |
|
|
if word and last_seen and (frame_index - last_seen >= WORD_GAP_FRAMES): |
|
|
words.append(word) |
|
|
word = "" |
|
|
last_label = None |
|
|
last_added = None |
|
|
stable = 0 |
|
|
last_seen = None |
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
if word: |
|
|
words.append(word) |
|
|
|
|
|
raw_text = " ".join(words).strip() |
|
|
|
|
|
if not debug_lines: |
|
|
debug_info = ( |
|
|
f"total_frames={frame_index}, frames_with_detections=0\n" |
|
|
"لم يتم رصد أي صناديق (boxes) من YOLO في أي فريم.\n" |
|
|
"تحقق من:\n" |
|
|
"- أن best.pt هو موديل detection وتدريبه سليم.\n" |
|
|
"- أن الفيديو مشابه لتدريب الموديل من ناحية وضعية اليد والكاميرا." |
|
|
) |
|
|
else: |
|
|
sample = "\n".join(debug_lines[:30]) |
|
|
debug_info = ( |
|
|
f"total_frames={frame_index}, frames_with_detections={frames_with_dets}\n" |
|
|
"أمثلة من الفريمات اللي فيها حروف:\n" |
|
|
f"{sample}" |
|
|
) |
|
|
|
|
|
return raw_text, out_path, debug_info |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@spaces.GPU |
|
|
def run(file): |
|
|
if file is None: |
|
|
return "لم يتم رفع فيديو", "", None, "لم يتم رفع فيديو" |
|
|
|
|
|
video_path = file.name |
|
|
light_path = preprocess_video(video_path, target_short_side=640, target_fps=8) |
|
|
|
|
|
raw, processed_path, debug_info = extract_and_render(light_path) |
|
|
pretty = fix_with_gemini(raw) if raw else "" |
|
|
|
|
|
if not raw: |
|
|
raw = "لم يتم التعرف على أي نص من الإشارات." |
|
|
|
|
|
return raw, pretty, processed_path, debug_info |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("## 🤟 ASL → Arabic (YOLO + Gemini) — إصلاح ظهور الحروف العربية داخل الفيديو") |
|
|
|
|
|
inp = gr.File(label="ارفع فيديو الإشارة") |
|
|
raw = gr.Textbox(label="النص الخام", lines=3) |
|
|
pretty = gr.Textbox(label="النص المحسن (Gemini)", lines=3) |
|
|
video_out = gr.Video(label="الفيديو بعد البروسيس") |
|
|
debug_box = gr.Textbox(label="Debug info", lines=10) |
|
|
|
|
|
btn = gr.Button("ابدأ المعالجة") |
|
|
btn.click(run, inputs=[inp], outputs=[raw, pretty, video_out, debug_box]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|
|
|
|