Hooshvare / app.py
Opera8's picture
Update app.py
63ad1a9 verified
Raw
History Blame Contribute Delete
13.2 kB
import os
import time
import asyncio
import urllib.parse
import re
import json
import uvicorn
from fastapi import FastAPI, Form, File, UploadFile, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from playwright.async_api import async_playwright
app = FastAPI()
UPLOAD_DIR = "temp_uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def clean_extracted_text(raw_text: str, user_message: str) -> str:
if not raw_text:
return ""
lines = raw_text.split('\n')
cleaned_lines = []
disclaimer_triggers = [
"هوشواره", "اشتباه کند", "دوباره‌سنجی", "دوباره سنجی",
"پیش‌نمایش", "Generative AI", "experimental"
]
# قیچی‌های قطع کامل: اگر به این کلمات برسیم، کلاً خواندن را متوقف می‌کنیم تا سایت‌های فروشگاهی نشان داده نشوند
cutoff_triggers = [
"visual matches", "نتایج مشابه", "تصاویر مشابه", "جستجوهای مرتبط",
"جستجوی مرتبط", "یافتن منبع تصویر", "مطابقت دقیق", "نمایش همه",
"همچنین ببینید", "search", "google lens"
]
skip_keywords = [
"تصاویر", "ویدیوها", "ویدئوها", "اخبار", "نقشه‌ها", "خرید کردن", "کتاب‌ها", "مالی",
"حالت موضوع‌محور", "ورود", "جستجو", "تنظیمات", "ابزارها", "صفحه اصلی", "همه", "خرید",
"پروازها", "بیشتر بدانید", "در پاسخ‌های", "بازخورد", "درباره این نتیجه",
"گزارش این", "Feedback", "About this result", "حالت هوشواره‌ای", "اگر امکان دارد آن را به فارسی ایمیل کنید",
"متن پیدا شد", "ترجمه", "مشاهده موارد مشابه", "جستجوی تصویر"
]
spam_domains = ["etsy", "pinterest", "dreamstime", "amazon", "ebay", "shutterstock", "istock", "redbubble", "aliexpress"]
for line in lines:
line_str = re.sub(r'[\u200b-\u200d\u200f\u202a-\u202e]', '', line).strip()
line_lower = line_str.lower()
if not line_str:
continue
# ۱. اگر به بخش سایت‌های مشابه رسیدیم، کلاً بقیه متن را حذف کن
if any(trigger in line_lower for trigger in cutoff_triggers) or any(trigger in line_lower for trigger in disclaimer_triggers):
break
# ۲. فیلتر کردن کدهای اضافی CSS
if "var(--" in line_str or "-webkit-" in line_str or "display:" in line_str or line_str.startswith("."):
continue
if re.search(r'^\d+\+?$', line_str):
continue
# ۳. فیلتر کردن نام سایت‌های مزاحم مثل Etsy
if re.search(r'^[a-zA-Z0-9-]+\.[a-zA-Z]{2,4}$', line_str, re.IGNORECASE) or any(domain in line_lower for domain in spam_domains):
continue
if re.search(r'^\d+\s+سایت$', line_str) or re.search(r'^\d+\s+site', line_str, re.IGNORECASE):
continue
if any(keyword == line_lower for keyword in skip_keywords):
continue
# ۴. حذف خطوط انگلیسی که مربوط به عنوان محصولات فروشگاهی هستند
if line_str.endswith("...") and not re.search(r'[\u0600-\u06FF]', line_str):
continue
cleaned_lines.append(line_str)
if cleaned_lines:
first_line = re.sub(r'[؟\?]', '', cleaned_lines[0].strip().lower())
msg_clean = re.sub(r'[؟\?]', '', user_message.strip().lower())
if first_line == msg_clean or msg_clean in first_line or first_line in msg_clean:
cleaned_lines.pop(0)
return "\n".join(cleaned_lines).strip()
@app.get("/")
async def chat_interface():
return FileResponse("index.html")
@app.post("/api/chat")
async def chat_endpoint(message: str = Form(...), image: UploadFile = File(None)):
saved_image_path = None
if image:
try:
saved_image_path = os.path.join(UPLOAD_DIR, f"{int(time.time())}_{image.filename}")
with open(saved_image_path, "wb") as buffer:
buffer.write(await image.read())
except Exception as e:
raise HTTPException(status_code=500, detail=f"خطا در ذخیره‌سازی تصویر: {str(e)}")
async def generate_response():
async with async_playwright() as p:
try:
is_multimodal = bool(saved_image_path and os.path.exists(saved_image_path))
if is_multimodal:
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
viewport = {"width": 1280, "height": 720}
else:
user_agent = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36"
viewport = {"width": 412, "height": 915}
browser = await p.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled"
]
)
context = await browser.new_context(
user_agent=user_agent,
viewport=viewport,
locale="fa-IR",
timezone_id="Asia/Tehran"
)
page = await context.new_page()
if is_multimodal:
await page.goto("https://images.google.com/", wait_until="networkidle", timeout=45000)
await page.wait_for_timeout(2000)
uploaded = False
try:
camera_btn = page.locator('div[aria-label="Search by image"], div[aria-label="جستجو با تصویر"], [aria-label*="camera" i], [aria-label*="دوربین"]').first
await camera_btn.click(timeout=10000)
await page.wait_for_timeout(1500)
file_input = page.locator("input[type='file']").first
await file_input.wait_for(state="attached", timeout=5000)
await file_input.set_input_files(saved_image_path)
uploaded = True
except Exception:
pass
if uploaded:
await page.wait_for_timeout(6000)
try:
inputs = await page.locator("textarea, input[type='text'], [contenteditable='true']").all()
for inp in inputs:
if await inp.is_visible() and not await inp.is_disabled():
await inp.focus()
await inp.fill(message)
await page.wait_for_timeout(500)
await page.keyboard.press("Enter")
# ---> مکث طلایی (Golden Delay) <---
# اینجا 4.5 ثانیه صبر می‌کنیم تا گوگل لنز از سایت‌های فروشگاهی گذر کند
# و هوش مصنوعی جوابش را کامل تولید کند. با این کار دیگر پیام اولیه ناقص نشان داده نمی‌شود.
await page.wait_for_timeout(4500)
break
except Exception:
pass
else:
encoded_query = urllib.parse.quote_plus(message)
url = f"https://www.google.com/search?q={encoded_query}&udm=50"
await page.goto(url, wait_until="domcontentloaded", timeout=45000)
previous_text = ""
unchanged_count = 0
max_iterations = 300
for _ in range(max_iterations):
raw_ai_text = await page.evaluate("""() => {
function isGoodText(txt) {
if (!txt || txt.length < 25) return false;
if (txt.includes('var(--') || txt.includes('-webkit-')) return false;
return true;
}
// اولویت با خواندن دقیق کادر هوش مصنوعی
const aiSelectors = [
'div[aria-label*="AI Overview"]',
'div[aria-label*="AI-generated"]',
'div[data-subtree="aimc"]',
'div[data-subtree="mfc"]',
'.aah4tc',
'div[data-m="0"] div[dir="rtl"]'
];
for (const sel of aiSelectors) {
try {
const els = document.querySelectorAll(sel);
for (const el of els) {
if (isGoodText(el.innerText)) return el.innerText.trim();
}
} catch(e) {}
}
// کادر اصلی اگر کادرهای بالا پیدا نشد
const mainLens = document.querySelector('div[data-m="0"]');
if (mainLens && isGoodText(mainLens.innerText)) return mainLens.innerText.trim();
return "";
}""")
cleaned_text = clean_extracted_text(raw_ai_text, message)
if cleaned_text and cleaned_text != previous_text:
# جلوگیری از پرش متن: اگر متن قبلی خیلی کامل بود و این متن جدید کوتاه است، نادیده‌اش بگیر
if previous_text and len(cleaned_text) < 50 and len(previous_text) > 100:
continue
is_replacement = not previous_text or not cleaned_text.startswith(previous_text[:len(previous_text)//2 + 1])
if is_replacement or len(cleaned_text) > len(previous_text) + 20:
start_idx = 0 if is_replacement else len(previous_text)
step = max(3, (len(cleaned_text) - start_idx) // 15)
for i in range(start_idx + step, len(cleaned_text), step):
yield f"data: {json.dumps({'text': cleaned_text[:i]}, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.04)
yield f"data: {json.dumps({'text': cleaned_text}, ensure_ascii=False)}\n\n"
previous_text = cleaned_text
unchanged_count = 0
elif cleaned_text and cleaned_text == previous_text:
unchanged_count += 1
if unchanged_count >= 30:
break
await asyncio.sleep(0.1)
if not previous_text:
fallback_text = await page.evaluate("() => document.body.innerText")
clean_fb = clean_extracted_text(fallback_text, message)
if clean_fb:
yield f"data: {json.dumps({'text': clean_fb[:2000] + '...'}, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'text': f'خطا: {str(e)}'}, ensure_ascii=False)}\n\n"
finally:
if 'browser' in locals():
await browser.close()
if saved_image_path and os.path.exists(saved_image_path):
try:
os.remove(saved_image_path)
except:
pass
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
return StreamingResponse(generate_response(), media_type="text/event-stream", headers=headers)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)