Spaces:
Running
Running
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import JSONResponse | |
| import torch | |
| import whisper | |
| import cv2 | |
| import imagehash | |
| from PIL import Image | |
| from transformers import pipeline | |
| import numpy as np | |
| import io | |
| import shutil | |
| import os | |
| import easyocr | |
| import acoustid | |
| import subprocess | |
| import xgboost as xgb | |
| import time | |
| from thefuzz import fuzz | |
| import re | |
| from collections import Counter | |
| from ultralytics import YOLO | |
| app = FastAPI(title="Copyright & Deepfake Guard API (XGBoost Enhanced)") | |
| # --- MODELLER (CPU İÇİN EN HAFİF SEÇİMLER) --- | |
| print("Modeller yükleniyor...") | |
| # 1. Deepfake & Image Classifier | |
| deepfake_model = pipeline("image-classification", model="umm-maybe/AI-image-detector") | |
| # 2. Whisper (Ses -> Metin) | |
| whisper_model = whisper.load_model("tiny") | |
| # 3. EasyOCR (Görsel -> Metin / Filigran Tespiti) İngilizce ve Türkçe destekli | |
| reader = easyocr.Reader(['en', 'tr'], gpu=False) | |
| # 4. Nesne & Logo Algılama Modeli (Global ve Hafif Sürüm) | |
| # 'yolov8n.pt' (nano) CPU için çok hafiftir ve stok logolarını tanımaya hazırdır. | |
| logo_model = YOLO("yolov8n.pt") | |
| # AcoustID API Anahtarı (Ücretsiz olarak acoustid.org'dan alabilirsin) | |
| ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "").strip() | |
| # Telif İpuçları (OCR ve Whisper bu kelimeleri arayacak) | |
| COPYRIGHT_KEYWORDS = [ | |
| "shutterstock", "shutter", "stock music", "preview", | |
| "shut up suck", "shut up", "suck music", # Whisper hataları için | |
| "©", "copyright", "getty", "adobe stock", | |
| # Semboller ve Standart Terimler | |
| "©", "copyright", "all rights reserved", "tüm hakları saklıdır", "rights reserved", | |
| "trademark", "registered", "license", "licensable", "exclusive use", | |
| # Global Stok Ajansları (Kök Kelimeler - OCR hataları için kısa tutulmuştur) | |
| "shutter", "stock", "getty", "istock", "depositphoto", "adobe", "dreamstime", | |
| "pond5", "envato", "storyblock", "alamy", "vectorstock", "canva", "unsplash", | |
| # Fransızca (French) | |
| "tous droits réservés", "droit d'auteur", "propriété intellectuelle", | |
| # İspanyolca & Portekizce (Spanish & Portuguese) | |
| "todos los derechos reservados", "derechos de autor", "todos os direitos reservados", | |
| # Almanca (German) | |
| "alle rechte vorbehalten", "urheberrechtlich geschützt", "urheberrecht", | |
| # İtalyanca (Italian) | |
| "tutti i diritti riservati", "diritto d'autore", | |
| # Rusça (Russian) | |
| "все права защищены", "авторское право", | |
| # Uzak Doğu (Çince, Japonca, Korece - Yaygın kullanılanlar) | |
| "版权所有", "著作権", "©", "모든 권리 보유" | |
| ] | |
| # --- XGBOOST ÖZNİTELİK ÇIKARMA FONKSİYONU --- | |
| def get_statistical_features(frame): | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| sharpness = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| color_std = np.std(frame) | |
| brightness = np.mean(gray) | |
| return { | |
| "sharpness": round(float(sharpness), 2), | |
| "color_consistency": round(float(color_std), 2), | |
| "brightness": round(float(brightness), 2) | |
| } | |
| # --- STOK MÜZİK AVCI FONKSİYONU --- | |
| def detect_audio_watermarks(text): | |
| text_lower = text.lower() | |
| # Whisper'ın meşhur hatalarını da içeren geniş liste | |
| STOCK_KEYWORDS = [ | |
| # Stok Devleri | |
| "shutterstock", "audiojungle", "pond5", "envato", "getty", "istock", | |
| "adobe stock", "storyblocks", "depositphotos", "123rf", "premiumbeat", | |
| "artlist", "epidemic sound", "soundstripe", "audio network", | |
| # Whisper Hataları ve Varyasyonlar | |
| "shutter", "suck music", "shut up", "stock music", "preview", "watermark", | |
| "audio jungle", "show them", "shutter sack", "shutter soccer", "shut down", | |
| # Evrensel Telif Terimleri | |
| "copyright", "all rights reserved", "rights reserved", "licensable", | |
| "audio preview", "purchase license", "tüm hakları saklıdır" | |
| ] | |
| found_flags = [] | |
| clean_text = re.sub(r'[^\w\s]', '', text_lower) | |
| words = clean_text.split() | |
| # Kelime ve öbek sayacı | |
| word_counts = {} | |
| # Tekliler | |
| for word in words: | |
| if len(word) > 2: | |
| word_counts[word] = word_counts.get(word, 0) + 1 | |
| # İkililer (bigram) | |
| for i in range(len(words) - 1): | |
| bigram = f"{words[i]} {words[i+1]}" | |
| word_counts[bigram] = word_counts.get(bigram, 0) + 1 | |
| # Analiz | |
| for token, count in word_counts.items(): | |
| for kw in STOCK_KEYWORDS: | |
| if token == kw or fuzz.ratio(token, kw) > 85: | |
| if count >= 2: # 2 veya daha fazla tekrar = Kesin Filigran | |
| found_flags.append(f"Döngü Tespit Edildi: '{kw}' ({count} kez)") | |
| elif token in ["shutterstock", "audiojungle", "preview"]: # Kritik kelimeler | |
| found_flags.append(f"Marka Tespit Edildi: '{kw}'") | |
| return list(set(found_flags)) | |
| def enhance_frame_for_ocr(frame): | |
| """Transparan filigranları belirginleştirmek için kontrastı artırır.""" | |
| lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
| cl = clahe.apply(l) | |
| enhanced_img = cv2.merge((cl,a,b)) | |
| return cv2.cvtColor(enhanced_img, cv2.COLOR_LAB2BGR) | |
| # 1. ANA SAYFA | |
| def home(): | |
| return {"status": "Running", "features": ["XGBoost-ready Stats", "3-Point Video Scan", "OCR"]} | |
| # 2. GÖRSEL ANALİZ (OCR + DEEPFAKE) | |
| async def analyze_image(file: UploadFile = File(...)): | |
| try: | |
| contents = await file.read() | |
| img = Image.open(io.BytesIO(contents)).convert("RGB") | |
| img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) | |
| # A) OCR ile Filigran / Telif Yazısı Taraması | |
| ocr_results = reader.readtext(img_cv, detail=0) # Sadece metinleri al | |
| found_keywords = [] | |
| for text in ocr_results: | |
| text_lower = text.lower() | |
| if any(kw in text_lower for kw in COPYRIGHT_KEYWORDS): | |
| found_keywords.append(text) | |
| # B) Deepfake Analizi | |
| df_results = deepfake_model(img)[0] | |
| # C) Deepfake - XGBoost İstatistiksel Analiz (YENİ) | |
| stats = get_statistical_features(img_cv) | |
| # Hibrit Karar: Eğer istatistikler çok düşükse ve model kararsızsa 'AI' diyebiliriz | |
| # Örn: Sharpness < 100 genelde yapay bir yumuşaklığa işarettir | |
| ai_risk = "HIGH" if stats['sharpness'] < 100 and df_results['score'] < 0.95 else "LOW" | |
| return { | |
| "copyright_report": { | |
| "status": "🚨 SUSPICIOUS" if found_keywords else "✅ CLEAN", | |
| "flags": found_keywords | |
| }, | |
| "deepfake_report": { | |
| "transformer_label": df_results["label"], | |
| "transformer_score": f"%{df_results['score']*100:.2f}", | |
| "statistical_risk": ai_risk, | |
| "math_features": stats | |
| } | |
| } | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| # 3. SES ANALİZİ (WHISPER + ACOUSTID + TEKRAR SAYACI) | |
| async def analyze_audio(file: UploadFile = File(...)): | |
| clean_filename = os.path.basename(file.filename) | |
| unique_name = f"{int(time.time())}_{clean_filename}" | |
| temp_path = f"/tmp/aud_{unique_name}" | |
| try: | |
| with open(temp_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": f"Dosya kaydetme hatası: {str(e)}"}) | |
| result_data = { | |
| "filename": clean_filename, | |
| "speech_copyright_status": "✅ CLEAN", | |
| "music_copyright_status": "✅ CLEAN" | |
| } | |
| try: | |
| # --- A) Whisper ile Metin ve Filigran (Watermark) Analizi --- | |
| transcription = whisper_model.transcribe(temp_path) | |
| text = transcription.get('text', '') | |
| found_flags = detect_audio_watermarks(text) | |
| result_data["whisper_transcript"] = text[:300] + "..." | |
| result_data["suspicious_words_found"] = found_flags | |
| if found_flags: | |
| result_data["speech_copyright_status"] = "🚨 WATERMARK DETECTED" | |
| # --- B) AcoustID ile Müzik Parmak İzi Analizi (Eski kodunla aynı bırakıyoruz) --- | |
| if ACOUSTID_API_KEY: | |
| try: | |
| # API'ye ham parmak izini göndererek hatayı detaylandıralım | |
| # acoustid.match bazen wrapper hataları verebilir, bu yüzden manuel kontrol ekliyoruz | |
| matches = acoustid.match(ACOUSTID_API_KEY, temp_path, parse=True) | |
| match_list = list(matches) | |
| if match_list: | |
| score, recording_id, title, artist = match_list[0] | |
| result_data["music_fingerprint"] = { | |
| "title": title or "Bilinmeyen Şarkı", | |
| "artist": artist or "Bilinmeyen Sanatçı", | |
| "score": round(score, 2) | |
| } | |
| result_data["music_copyright_status"] = "🚨 COPYRIGHTED MUSIC" | |
| else: | |
| result_data["music_fingerprint_info"] = "AcoustID veritabanında bu ses izine rastlanmadı." | |
| except acoustid.WebServiceError as we: | |
| # EĞER BURAYA DÜŞERSE: API Key yanlıştır veya servis meşguldür. | |
| result_data["music_fingerprint_error"] = f"API Reddi: {str(we)}" | |
| # Küçük bir ipucu: API anahtarın tırnak içinde mi kalıyor kontrol et | |
| except Exception as ae: | |
| # EĞER BURAYA DÜŞERSE: fpcalc düzgün çalışmıyor olabilir. | |
| result_data["music_fingerprint_error"] = f"Sistem Hatası (fpcalc): {str(ae)}" | |
| else: | |
| result_data["music_fingerprint_info"] = "HF Secrets üzerinde ACOUSTID_API_KEY bulunamadı." | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": f"Analiz sırasında genel hata: {str(e)}"}) | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return result_data | |
| # 4. VİDEO ANALİZ (GÖRSEL + SES BİRLEŞİMİ + 3 NOKTA + XGBoost) | |
| async def analyze_video(file: UploadFile = File(...)): | |
| clean_filename = os.path.basename(file.filename) | |
| unique_name = f"{int(time.time())}_{clean_filename}" | |
| video_path = f"/tmp/vid_{unique_name}" | |
| audio_path = f"/tmp/aud_{unique_name}.wav" | |
| try: | |
| with open(video_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # 1. SES ANALİZİ | |
| audio_report = "Sessiz video veya ses işlenemedi" | |
| try: | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", | |
| audio_path | |
| ], capture_output=True, text=True, check=False) | |
| if result.returncode == 0 and os.path.exists(audio_path): | |
| audio_res = whisper_model.transcribe(audio_path) | |
| audio_text = audio_res.get('text', '') | |
| video_audio_flags = detect_audio_watermarks(audio_text) | |
| if video_audio_flags: | |
| audio_report = f"🚨 STOCK WATERMARK: {', '.join(video_audio_flags)}" | |
| else: | |
| audio_report = "✅ CLEAN" | |
| os.remove(audio_path) | |
| except Exception: | |
| audio_report = "Sessiz video veya ses işlenemedi" | |
| # 2. VİDEO ANALİZ | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| # Daha fazla noktadan örnekleme yaparak yakalama şansını artıralım | |
| scan_points = [1000, 3000, (duration / 2) * 1000, (duration - 1) * 1000] | |
| video_stats = [] | |
| is_visual_copyright = False | |
| detected_texts = [] | |
| detected_logos = [] | |
| for point in scan_points: | |
| if is_visual_copyright: break # Eğer önceki saniyede telif bulunduysa tamamen dur | |
| if point < 0 or point > (duration * 1000): continue | |
| cap.set(cv2.CAP_PROP_POS_MSEC, point) | |
| ret, frame = cap.read() | |
| if ret: | |
| # 1. Görüntüyü transparan yazılar için optimize et | |
| processed_frame = enhance_frame_for_ocr(frame) | |
| # OCR Çalıştır | |
| ocr_res = reader.readtext(processed_frame, detail=0) | |
| detected_texts.extend(ocr_res) | |
| # ESNEK (FUZZY) KONTROL MANTIĞI | |
| current_frame_text = " ".join(ocr_res).lower() | |
| for kw in COPYRIGHT_KEYWORDS: | |
| # 1. Tam eşleşme veya kelime içinde geçme | |
| if kw in current_frame_text: | |
| is_visual_copyright = True | |
| break | |
| # 2. OCR bozulmalarına karşı: Yakalanan her bir kelime parçasını kontrol et | |
| # Örn: 'atsck' yakalandıysa ve 'stock' keyword listesinde varsa | |
| for word in ocr_res: | |
| clean_word = word.lower() | |
| if len(clean_word) < 3: continue # Çok kısa çöpleri ele | |
| for kw in COPYRIGHT_KEYWORDS: | |
| # 1. Doğrudan veya iç içe geçme kontrolü | |
| if kw in clean_word or clean_word in kw: | |
| is_visual_copyright = True | |
| break | |
| # 2. Bulanık mantık ile benzerlik kontrolü | |
| # 'asckk' ile 'shutterstock' arasındaki benzerliği ölçer | |
| similarity = fuzz.partial_ratio(clean_word, kw) | |
| if similarity > 65: # %65 ve üzeri benzerlikleri ihlal say | |
| is_visual_copyright = True | |
| break | |
| if is_visual_copyright: break | |
| video_stats.append(get_statistical_features(frame)) | |
| # Sadece OCR telif bulamadıysa logo taraması yaparak sistemi yormayalım. | |
| if not is_visual_copyright: | |
| # Görüntüyü YOLO'ya gönder (conf=0.5 güven eşiğidir, ihtiyaca göre ayarlanabilir) | |
| logo_results = logo_model(frame, conf=0.5, verbose=False) | |
| for result in logo_results: | |
| # YOLO'nun bulduğu nesneleri tara | |
| for box in result.boxes: | |
| cls_id = int(box.cls[0]) # Sınıf ID'si | |
| # YOLOv8'in genel modelinde marka logoları 'brand_logo' veya benzeri sınıflarda olabilir. | |
| # Eğer bulduğu nesne bir logo ise (bu kısım kullanılan modele göre ayarlanır) | |
| if logo_model.names[cls_id] in ["brand_logo", "logo", "watermark"]: | |
| detected_logos.append(f"Grafiksel Logo Tespit Edildi (Güven: %{box.conf[0]*100:.1f})") | |
| is_visual_copyright = True | |
| break # Bu kareden telif çıktıysa diğer nesnelere bakmaya gerek yok | |
| if is_visual_copyright: break # Bu noktadan telif çıktıysa OCR döngüsüne geri dön | |
| cap.release() | |
| if os.path.exists(video_path): os.remove(video_path) | |
| # 3. İSTATİSTİKSEL KARAR | |
| avg_sharpness = np.mean([s['sharpness'] for s in video_stats]) if video_stats else 0 | |
| # EĞER OCR bir şeyler yakaladıysa (asckk gibi), Deepfake riskini LOW yapalım. | |
| # Çünkü piksellerdeki bozulmanın sebebi AI değil, üzerindeki yazılardır. | |
| final_deepfake_risk = "LOW" | |
| if avg_sharpness < 100: | |
| if is_visual_copyright: | |
| final_deepfake_risk = "LOW (Pixel noise caused by watermark)" | |
| else: | |
| final_deepfake_risk = "HIGH" | |
| return { | |
| "video_duration": f"{duration:.2f} sn", | |
| "copyright_status": "🚨 VIOLATION" if is_visual_copyright else "✅ CLEAN", | |
| "audio_report": audio_report, | |
| "detected_logos_report": detected_logos, | |
| "deepfake_statistical_risk": final_deepfake_risk, | |
| "metrics": { | |
| "avg_sharpness": round(float(avg_sharpness), 2), | |
| "scan_count": len(video_stats) | |
| }, | |
| "detected_ocr_sample": list(set(detected_texts))[:10] | |
| } | |
| except Exception as e: | |
| if os.path.exists(video_path): os.remove(video_path) | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def full_analysis(file: UploadFile = File(...)): | |
| start_time = time.time() | |
| clean_filename = os.path.basename(file.filename) | |
| extension = clean_filename.split('.')[-1].lower() | |
| report = { | |
| "filename": clean_filename, | |
| "type": "unknown", | |
| "overall_status": "✅ CLEAN", | |
| "detailed_results": {}, | |
| "analysis_duration": 0 | |
| } | |
| try: | |
| # --- GÖRSEL ANALİZ --- | |
| if extension in ['jpg', 'jpeg', 'png', 'webp']: | |
| report["type"] = "Image" | |
| contents = await file.read() | |
| img = Image.open(io.BytesIO(contents)).convert("RGB") | |
| img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) | |
| # 1. Modelleri Çalıştır | |
| ocr_res = reader.readtext(img_cv, detail=0) | |
| df_res = deepfake_model(img)[0] | |
| logo_res = logo_model(img_cv, conf=0.4, verbose=False) | |
| # 2. İstatistiksel Analiz (XGBoost Mantığı) - AI tespiti için kritik | |
| stats = get_statistical_features(img_cv) | |
| # 3. Hibrit AI Karar Mekanizması (Senin hatayı düzelten kısım) | |
| ai_label = df_res["label"] | |
| # Eğer model 'human' dese bile keskinlik 80'in altındaysa şüphelen (AI genelde aşırı pürüzsüzdür) | |
| if ai_label == "human" and stats['sharpness'] < 80: | |
| ai_label = "suspicious_ai" | |
| report["ai_warning"] = "Düşük piksellik tutarlılığı nedeniyle AI şüphesi tespit edildi." | |
| report["detailed_results"] = { | |
| "ocr_found": [txt for txt in ocr_res if any(kw in txt.lower() for kw in COPYRIGHT_KEYWORDS)], | |
| "ai_content_score": f"%{df_res['score']*100:.2f}", | |
| "ai_label": ai_label, | |
| "math_stats": stats, | |
| "detected_objects": [logo_model.names[int(b.cls[0])] for r in logo_res for b in r.boxes] | |
| } | |
| # --- VİDEO ANALİZ --- | |
| elif extension in ['mp4', 'mov', 'avi', 'mkv']: | |
| report["type"] = "Video" | |
| report["detailed_results"] = await analyze_video(file) | |
| # --- SES ANALİZİ --- | |
| elif extension in ['mp3', 'wav', 'flac', 'm4a', 'preview']: | |
| report["type"] = "Audio" | |
| report["detailed_results"] = await analyze_audio(file) | |
| # 2. OVERALL STATUS GÜNCELLEME | |
| res_str = str(report["detailed_results"]).lower() | |
| has_ocr_findings = len(report["detailed_results"].get("ocr_found", [])) > 0 | |
| # Eğer OCR bulgusu varsa VEYA metinde ihlal kelimeleri geçiyorsa VEYA AI etiketi şüpheliyse | |
| if has_ocr_findings or \ | |
| any(x in res_str for x in ["violation", "watermark", "copyrighted", "suspicious", "fake", "suspicious_ai"]): | |
| report["overall_status"] = "🚨 VIOLATION / SUSPICIOUS" | |
| except Exception as e: | |
| report["error"] = f"Full Analysis Hatası: {str(e)}" | |
| report["analysis_duration"] = f"{time.time() - start_time:.2f} sn" | |
| return report | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |