| import os |
| import time |
| import asyncio |
| import urllib.parse |
| import re |
| import json |
| import uvicorn |
| from fastapi import FastAPI, Form, File, UploadFile, HTTPException |
| from fastapi.responses import FileResponse, StreamingResponse |
| from playwright.async_api import async_playwright |
|
|
| app = FastAPI() |
| UPLOAD_DIR = "temp_uploads" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
| def clean_extracted_text(raw_text: str, user_message: str) -> str: |
| if not raw_text: |
| return "" |
| |
| lines = raw_text.split('\n') |
| cleaned_lines = [] |
| |
| disclaimer_triggers = [ |
| "هوشواره", "اشتباه کند", "دوبارهسنجی", "دوباره سنجی", |
| "پیشنمایش", "Generative AI", "experimental" |
| ] |
| |
| |
| cutoff_triggers = [ |
| "visual matches", "نتایج مشابه", "تصاویر مشابه", "جستجوهای مرتبط", |
| "جستجوی مرتبط", "یافتن منبع تصویر", "مطابقت دقیق", "نمایش همه", |
| "همچنین ببینید", "search", "google lens" |
| ] |
| |
| skip_keywords = [ |
| "تصاویر", "ویدیوها", "ویدئوها", "اخبار", "نقشهها", "خرید کردن", "کتابها", "مالی", |
| "حالت موضوعمحور", "ورود", "جستجو", "تنظیمات", "ابزارها", "صفحه اصلی", "همه", "خرید", |
| "پروازها", "بیشتر بدانید", "در پاسخهای", "بازخورد", "درباره این نتیجه", |
| "گزارش این", "Feedback", "About this result", "حالت هوشوارهای", "اگر امکان دارد آن را به فارسی ایمیل کنید", |
| "متن پیدا شد", "ترجمه", "مشاهده موارد مشابه", "جستجوی تصویر" |
| ] |
| |
| spam_domains = ["etsy", "pinterest", "dreamstime", "amazon", "ebay", "shutterstock", "istock", "redbubble", "aliexpress"] |
| |
| for line in lines: |
| line_str = re.sub(r'[\u200b-\u200d\u200f\u202a-\u202e]', '', line).strip() |
| line_lower = line_str.lower() |
| |
| if not line_str: |
| continue |
| |
| |
| if any(trigger in line_lower for trigger in cutoff_triggers) or any(trigger in line_lower for trigger in disclaimer_triggers): |
| break |
| |
| |
| if "var(--" in line_str or "-webkit-" in line_str or "display:" in line_str or line_str.startswith("."): |
| continue |
| |
| if re.search(r'^\d+\+?$', line_str): |
| continue |
| |
| |
| if re.search(r'^[a-zA-Z0-9-]+\.[a-zA-Z]{2,4}$', line_str, re.IGNORECASE) or any(domain in line_lower for domain in spam_domains): |
| continue |
| |
| if re.search(r'^\d+\s+سایت$', line_str) or re.search(r'^\d+\s+site', line_str, re.IGNORECASE): |
| continue |
| |
| if any(keyword == line_lower for keyword in skip_keywords): |
| continue |
| |
| |
| if line_str.endswith("...") and not re.search(r'[\u0600-\u06FF]', line_str): |
| continue |
| |
| cleaned_lines.append(line_str) |
| |
| if cleaned_lines: |
| first_line = re.sub(r'[؟\?]', '', cleaned_lines[0].strip().lower()) |
| msg_clean = re.sub(r'[؟\?]', '', user_message.strip().lower()) |
| if first_line == msg_clean or msg_clean in first_line or first_line in msg_clean: |
| cleaned_lines.pop(0) |
| |
| return "\n".join(cleaned_lines).strip() |
|
|
| @app.get("/") |
| async def chat_interface(): |
| return FileResponse("index.html") |
|
|
| @app.post("/api/chat") |
| async def chat_endpoint(message: str = Form(...), image: UploadFile = File(None)): |
| saved_image_path = None |
| |
| if image: |
| try: |
| saved_image_path = os.path.join(UPLOAD_DIR, f"{int(time.time())}_{image.filename}") |
| with open(saved_image_path, "wb") as buffer: |
| buffer.write(await image.read()) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"خطا در ذخیرهسازی تصویر: {str(e)}") |
|
|
| async def generate_response(): |
| async with async_playwright() as p: |
| try: |
| is_multimodal = bool(saved_image_path and os.path.exists(saved_image_path)) |
| |
| if is_multimodal: |
| user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" |
| viewport = {"width": 1280, "height": 720} |
| else: |
| user_agent = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36" |
| viewport = {"width": 412, "height": 915} |
|
|
| browser = await p.chromium.launch( |
| headless=True, |
| args=[ |
| "--no-sandbox", |
| "--disable-setuid-sandbox", |
| "--disable-dev-shm-usage", |
| "--disable-blink-features=AutomationControlled" |
| ] |
| ) |
| |
| context = await browser.new_context( |
| user_agent=user_agent, |
| viewport=viewport, |
| locale="fa-IR", |
| timezone_id="Asia/Tehran" |
| ) |
| |
| page = await context.new_page() |
| |
| if is_multimodal: |
| await page.goto("https://images.google.com/", wait_until="networkidle", timeout=45000) |
| await page.wait_for_timeout(2000) |
| |
| uploaded = False |
| try: |
| camera_btn = page.locator('div[aria-label="Search by image"], div[aria-label="جستجو با تصویر"], [aria-label*="camera" i], [aria-label*="دوربین"]').first |
| await camera_btn.click(timeout=10000) |
| await page.wait_for_timeout(1500) |
| |
| file_input = page.locator("input[type='file']").first |
| await file_input.wait_for(state="attached", timeout=5000) |
| await file_input.set_input_files(saved_image_path) |
| uploaded = True |
| except Exception: |
| pass |
|
|
| if uploaded: |
| await page.wait_for_timeout(6000) |
| try: |
| inputs = await page.locator("textarea, input[type='text'], [contenteditable='true']").all() |
| for inp in inputs: |
| if await inp.is_visible() and not await inp.is_disabled(): |
| await inp.focus() |
| await inp.fill(message) |
| await page.wait_for_timeout(500) |
| await page.keyboard.press("Enter") |
| |
| |
| |
| |
| await page.wait_for_timeout(4500) |
| break |
| except Exception: |
| pass |
| else: |
| encoded_query = urllib.parse.quote_plus(message) |
| url = f"https://www.google.com/search?q={encoded_query}&udm=50" |
| await page.goto(url, wait_until="domcontentloaded", timeout=45000) |
| |
| previous_text = "" |
| unchanged_count = 0 |
| max_iterations = 300 |
| |
| for _ in range(max_iterations): |
| raw_ai_text = await page.evaluate("""() => { |
| function isGoodText(txt) { |
| if (!txt || txt.length < 25) return false; |
| if (txt.includes('var(--') || txt.includes('-webkit-')) return false; |
| return true; |
| } |
| |
| // اولویت با خواندن دقیق کادر هوش مصنوعی |
| const aiSelectors = [ |
| 'div[aria-label*="AI Overview"]', |
| 'div[aria-label*="AI-generated"]', |
| 'div[data-subtree="aimc"]', |
| 'div[data-subtree="mfc"]', |
| '.aah4tc', |
| 'div[data-m="0"] div[dir="rtl"]' |
| ]; |
| |
| for (const sel of aiSelectors) { |
| try { |
| const els = document.querySelectorAll(sel); |
| for (const el of els) { |
| if (isGoodText(el.innerText)) return el.innerText.trim(); |
| } |
| } catch(e) {} |
| } |
| |
| // کادر اصلی اگر کادرهای بالا پیدا نشد |
| const mainLens = document.querySelector('div[data-m="0"]'); |
| if (mainLens && isGoodText(mainLens.innerText)) return mainLens.innerText.trim(); |
| |
| return ""; |
| }""") |
| |
| cleaned_text = clean_extracted_text(raw_ai_text, message) |
| |
| if cleaned_text and cleaned_text != previous_text: |
| |
| if previous_text and len(cleaned_text) < 50 and len(previous_text) > 100: |
| continue |
|
|
| is_replacement = not previous_text or not cleaned_text.startswith(previous_text[:len(previous_text)//2 + 1]) |
| |
| if is_replacement or len(cleaned_text) > len(previous_text) + 20: |
| start_idx = 0 if is_replacement else len(previous_text) |
| step = max(3, (len(cleaned_text) - start_idx) // 15) |
| |
| for i in range(start_idx + step, len(cleaned_text), step): |
| yield f"data: {json.dumps({'text': cleaned_text[:i]}, ensure_ascii=False)}\n\n" |
| await asyncio.sleep(0.04) |
| |
| yield f"data: {json.dumps({'text': cleaned_text}, ensure_ascii=False)}\n\n" |
| previous_text = cleaned_text |
| unchanged_count = 0 |
| elif cleaned_text and cleaned_text == previous_text: |
| unchanged_count += 1 |
| if unchanged_count >= 30: |
| break |
| |
| await asyncio.sleep(0.1) |
|
|
| if not previous_text: |
| fallback_text = await page.evaluate("() => document.body.innerText") |
| clean_fb = clean_extracted_text(fallback_text, message) |
| if clean_fb: |
| yield f"data: {json.dumps({'text': clean_fb[:2000] + '...'}, ensure_ascii=False)}\n\n" |
|
|
| except Exception as e: |
| yield f"data: {json.dumps({'text': f'خطا: {str(e)}'}, ensure_ascii=False)}\n\n" |
| finally: |
| if 'browser' in locals(): |
| await browser.close() |
| if saved_image_path and os.path.exists(saved_image_path): |
| try: |
| os.remove(saved_image_path) |
| except: |
| pass |
|
|
| headers = { |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "X-Accel-Buffering": "no" |
| } |
| |
| return StreamingResponse(generate_response(), media_type="text/event-stream", headers=headers) |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |