import os import time import asyncio import urllib.parse import re import json import uvicorn from fastapi import FastAPI, Form, File, UploadFile, HTTPException from fastapi.responses import FileResponse, StreamingResponse from playwright.async_api import async_playwright app = FastAPI() UPLOAD_DIR = "temp_uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) def clean_extracted_text(raw_text: str, user_message: str) -> str: if not raw_text: return "" lines = raw_text.split('\n') cleaned_lines = [] disclaimer_triggers = [ "هوشواره", "اشتباه کند", "دوباره‌سنجی", "دوباره سنجی", "پیش‌نمایش", "Generative AI", "experimental" ] # قیچی‌های قطع کامل: اگر به این کلمات برسیم، کلاً خواندن را متوقف می‌کنیم تا سایت‌های فروشگاهی نشان داده نشوند cutoff_triggers = [ "visual matches", "نتایج مشابه", "تصاویر مشابه", "جستجوهای مرتبط", "جستجوی مرتبط", "یافتن منبع تصویر", "مطابقت دقیق", "نمایش همه", "همچنین ببینید", "search", "google lens" ] skip_keywords = [ "تصاویر", "ویدیوها", "ویدئوها", "اخبار", "نقشه‌ها", "خرید کردن", "کتاب‌ها", "مالی", "حالت موضوع‌محور", "ورود", "جستجو", "تنظیمات", "ابزارها", "صفحه اصلی", "همه", "خرید", "پروازها", "بیشتر بدانید", "در پاسخ‌های", "بازخورد", "درباره این نتیجه", "گزارش این", "Feedback", "About this result", "حالت هوشواره‌ای", "اگر امکان دارد آن را به فارسی ایمیل کنید", "متن پیدا شد", "ترجمه", "مشاهده موارد مشابه", "جستجوی تصویر" ] spam_domains = ["etsy", "pinterest", "dreamstime", "amazon", "ebay", "shutterstock", "istock", "redbubble", "aliexpress"] for line in lines: line_str = re.sub(r'[\u200b-\u200d\u200f\u202a-\u202e]', '', line).strip() line_lower = line_str.lower() if not line_str: continue # ۱. اگر به بخش سایت‌های مشابه رسیدیم، کلاً بقیه متن را حذف کن if any(trigger in line_lower for trigger in cutoff_triggers) or any(trigger in line_lower for trigger in disclaimer_triggers): break # ۲. فیلتر کردن کدهای اضافی CSS if "var(--" in line_str or "-webkit-" in line_str or "display:" in line_str or line_str.startswith("."): continue if re.search(r'^\d+\+?$', line_str): continue # ۳. فیلتر کردن نام سایت‌های مزاحم مثل Etsy if re.search(r'^[a-zA-Z0-9-]+\.[a-zA-Z]{2,4}$', line_str, re.IGNORECASE) or any(domain in line_lower for domain in spam_domains): continue if re.search(r'^\d+\s+سایت$', line_str) or re.search(r'^\d+\s+site', line_str, re.IGNORECASE): continue if any(keyword == line_lower for keyword in skip_keywords): continue # ۴. حذف خطوط انگلیسی که مربوط به عنوان محصولات فروشگاهی هستند if line_str.endswith("...") and not re.search(r'[\u0600-\u06FF]', line_str): continue cleaned_lines.append(line_str) if cleaned_lines: first_line = re.sub(r'[؟\?]', '', cleaned_lines[0].strip().lower()) msg_clean = re.sub(r'[؟\?]', '', user_message.strip().lower()) if first_line == msg_clean or msg_clean in first_line or first_line in msg_clean: cleaned_lines.pop(0) return "\n".join(cleaned_lines).strip() @app.get("/") async def chat_interface(): return FileResponse("index.html") @app.post("/api/chat") async def chat_endpoint(message: str = Form(...), image: UploadFile = File(None)): saved_image_path = None if image: try: saved_image_path = os.path.join(UPLOAD_DIR, f"{int(time.time())}_{image.filename}") with open(saved_image_path, "wb") as buffer: buffer.write(await image.read()) except Exception as e: raise HTTPException(status_code=500, detail=f"خطا در ذخیره‌سازی تصویر: {str(e)}") async def generate_response(): async with async_playwright() as p: try: is_multimodal = bool(saved_image_path and os.path.exists(saved_image_path)) if is_multimodal: user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" viewport = {"width": 1280, "height": 720} else: user_agent = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36" viewport = {"width": 412, "height": 915} browser = await p.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled" ] ) context = await browser.new_context( user_agent=user_agent, viewport=viewport, locale="fa-IR", timezone_id="Asia/Tehran" ) page = await context.new_page() if is_multimodal: await page.goto("https://images.google.com/", wait_until="networkidle", timeout=45000) await page.wait_for_timeout(2000) uploaded = False try: camera_btn = page.locator('div[aria-label="Search by image"], div[aria-label="جستجو با تصویر"], [aria-label*="camera" i], [aria-label*="دوربین"]').first await camera_btn.click(timeout=10000) await page.wait_for_timeout(1500) file_input = page.locator("input[type='file']").first await file_input.wait_for(state="attached", timeout=5000) await file_input.set_input_files(saved_image_path) uploaded = True except Exception: pass if uploaded: await page.wait_for_timeout(6000) try: inputs = await page.locator("textarea, input[type='text'], [contenteditable='true']").all() for inp in inputs: if await inp.is_visible() and not await inp.is_disabled(): await inp.focus() await inp.fill(message) await page.wait_for_timeout(500) await page.keyboard.press("Enter") # ---> مکث طلایی (Golden Delay) <--- # اینجا 4.5 ثانیه صبر می‌کنیم تا گوگل لنز از سایت‌های فروشگاهی گذر کند # و هوش مصنوعی جوابش را کامل تولید کند. با این کار دیگر پیام اولیه ناقص نشان داده نمی‌شود. await page.wait_for_timeout(4500) break except Exception: pass else: encoded_query = urllib.parse.quote_plus(message) url = f"https://www.google.com/search?q={encoded_query}&udm=50" await page.goto(url, wait_until="domcontentloaded", timeout=45000) previous_text = "" unchanged_count = 0 max_iterations = 300 for _ in range(max_iterations): raw_ai_text = await page.evaluate("""() => { function isGoodText(txt) { if (!txt || txt.length < 25) return false; if (txt.includes('var(--') || txt.includes('-webkit-')) return false; return true; } // اولویت با خواندن دقیق کادر هوش مصنوعی const aiSelectors = [ 'div[aria-label*="AI Overview"]', 'div[aria-label*="AI-generated"]', 'div[data-subtree="aimc"]', 'div[data-subtree="mfc"]', '.aah4tc', 'div[data-m="0"] div[dir="rtl"]' ]; for (const sel of aiSelectors) { try { const els = document.querySelectorAll(sel); for (const el of els) { if (isGoodText(el.innerText)) return el.innerText.trim(); } } catch(e) {} } // کادر اصلی اگر کادرهای بالا پیدا نشد const mainLens = document.querySelector('div[data-m="0"]'); if (mainLens && isGoodText(mainLens.innerText)) return mainLens.innerText.trim(); return ""; }""") cleaned_text = clean_extracted_text(raw_ai_text, message) if cleaned_text and cleaned_text != previous_text: # جلوگیری از پرش متن: اگر متن قبلی خیلی کامل بود و این متن جدید کوتاه است، نادیده‌اش بگیر if previous_text and len(cleaned_text) < 50 and len(previous_text) > 100: continue is_replacement = not previous_text or not cleaned_text.startswith(previous_text[:len(previous_text)//2 + 1]) if is_replacement or len(cleaned_text) > len(previous_text) + 20: start_idx = 0 if is_replacement else len(previous_text) step = max(3, (len(cleaned_text) - start_idx) // 15) for i in range(start_idx + step, len(cleaned_text), step): yield f"data: {json.dumps({'text': cleaned_text[:i]}, ensure_ascii=False)}\n\n" await asyncio.sleep(0.04) yield f"data: {json.dumps({'text': cleaned_text}, ensure_ascii=False)}\n\n" previous_text = cleaned_text unchanged_count = 0 elif cleaned_text and cleaned_text == previous_text: unchanged_count += 1 if unchanged_count >= 30: break await asyncio.sleep(0.1) if not previous_text: fallback_text = await page.evaluate("() => document.body.innerText") clean_fb = clean_extracted_text(fallback_text, message) if clean_fb: yield f"data: {json.dumps({'text': clean_fb[:2000] + '...'}, ensure_ascii=False)}\n\n" except Exception as e: yield f"data: {json.dumps({'text': f'خطا: {str(e)}'}, ensure_ascii=False)}\n\n" finally: if 'browser' in locals(): await browser.close() if saved_image_path and os.path.exists(saved_image_path): try: os.remove(saved_image_path) except: pass headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } return StreamingResponse(generate_response(), media_type="text/event-stream", headers=headers) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)