import gradio as gr import os import time import json import re import asyncio import smtplib import requests import praw import random import threading import nest_asyncio from datetime import datetime, timedelta from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from openai import OpenAI from playwright.async_api import async_playwright from apscheduler.schedulers.background import BackgroundScheduler from upstash_redis import Redis as UpstashRedis os.environ["TZ"] = "UTC" nest_asyncio.apply() # ========================================== # 1. إعدادات البيئة والمفاتيح # ========================================== NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID") REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET") REDDIT_USERNAME = os.getenv("REDDIT_USERNAME") REDDIT_PASSWORD = os.getenv("REDDIT_PASSWORD") SMTP_EMAIL = os.getenv("SMTP_EMAIL") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") DEVTO_API_KEY = os.getenv("DEVTO_API_KEY") # اختياري DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") # اختياري DISCORD_GUILD_IDS = os.getenv("DISCORD_GUILD_IDS", "") # معرفات السيرفرات (مفصولة بفاصلة) SMTP_SERVER = "smtp.gmail.com" SMTP_PORT = 587 SITE_URL = "https://www.orgteh.com" UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL") UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN") # حدود يومية للحماية من الحظر LIMITS = { "reddit_daily": 30, "github_daily": 80, "email_daily": 200, "devto_daily": 25, "hn_daily": 40, "discord_daily": 20, } # ========================================== # 2. نظام الذاكرة الحديدية (Iron Memory) # ========================================== class IronMemory: """ يحفظ كل URL / معرف مستخدم تم التعامل معه بشكل دائم. يعمل على طبقتين: ذاكرة محلية (سريعة) + Upstash Redis (سحابية / دائمة). لن نعود لأي شخص أو رابط تم حفظه أبداً حتى بعد إعادة التشغيل. """ SET_KEY = "orgteh:processed" LOGS_KEY = "orgteh:logs" USERS_KEY = "orgteh:users" # مجموعة معرفات المستخدمين تحديداً def __init__(self): self._local: set = set() self._users: set = set() self.redis = None self.enabled = False if UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN: try: self.redis = UpstashRedis(url=UPSTASH_REDIS_REST_URL, token=UPSTASH_REDIS_REST_TOKEN) self.redis.ping() self.enabled = True print("✅ Iron Memory: Upstash Redis متصل — الذاكرة الدائمة نشطة") except Exception as e: print(f"❌ Upstash خطأ: {e} — سيعمل النظام بذاكرة مؤقتة فقط") else: print("⚠️ بيانات Upstash غير موجودة — الذاكرة مؤقتة فقط") # --- فحص --- def seen(self, key: str) -> bool: if key in self._local: return True if self.enabled: try: return bool(self.redis.sismember(self.SET_KEY, key)) except: pass return False def user_seen(self, user_id: str) -> bool: if user_id in self._users: return True if self.enabled: try: return bool(self.redis.sismember(self.USERS_KEY, user_id)) except: pass return False # --- تسجيل --- def mark(self, key: str): self._local.add(key) if self.enabled: try: self.redis.sadd(self.SET_KEY, key) except: pass def mark_user(self, user_id: str): self._users.add(user_id) if self.enabled: try: self.redis.sadd(self.USERS_KEY, user_id) except: pass # --- السجلات --- def log(self, platform, title, url, snippet, reply): entry = { "ts": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "platform": platform, "title": title, "url": url, "snippet": (snippet or "")[:400], "reply": reply, } LOCAL_LOGS.insert(0, entry) if len(LOCAL_LOGS) > 200: LOCAL_LOGS.pop() if self.enabled: try: self.redis.lpush(self.LOGS_KEY, json.dumps(entry)) self.redis.ltrim(self.LOGS_KEY, 0, 999) except: pass return entry def load_logs(self): if self.enabled: try: raw = self.redis.lrange(self.LOGS_KEY, 0, 199) result = [] for item in raw: try: result.append(json.loads(item) if isinstance(item, str) else item) except: pass return result except: pass return LOCAL_LOGS[:] memory = IronMemory() LOCAL_LOGS = [] LOCAL_STATS = {"reddit": 0, "github": 0, "email": 0, "devto": 0, "hn": 0, "discord": 0} # ========================================== # 3. محرك الذكاء الاصطناعي # ========================================== client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY) def ask_ai(prompt: str, system: str = "You are a helpful assistant.", temp=0.5) -> str | None: try: resp = client.chat.completions.create( model="meta/llama-3.3-70b-instruct", messages=[{"role": "system", "content": system}, {"role": "user", "content": prompt}], temperature=temp, max_tokens=900 ) return resp.choices[0].message.content.strip() except Exception as e: print(f"AI Error: {e}") return None # ========================================== # 4. نظام كلمات البحث الذكي — تلقائي 100% # ========================================== # طبقتان: # 1. FALLBACK_KEYWORDS — كلمات ثابتة لضمان العمل دائماً # 2. كل KEYWORD_REFRESH_MINUTES يطلب النموذج توليد كلمات جديدة بنفسه # بكل اللغات الممكنة بناءً على ما يعرفه عن Orgteh وسوق AI API # ── كلمات ثابتة (Fallback) — تُستخدم عند بدء التشغيل أو فشل التوليد ── FALLBACK_KEYWORDS: list[str] = [ # EN "llm api", "ai api", "chatbot api", "language model api", "integrate llm", "openai api", "deepseek api", "mistral api", "llama api", "ai api developer", "chat completions api", # AR "api ذكاء اصطناعي", "دمج نموذج لغوي", "api شات بوت", # TR "yapay zeka api", "llm api entegrasyonu", # FR "api intelligence artificielle", "intégrer llm api", # ES "api inteligencia artificial", "integrar llm api", # DE "ki api", "sprachmodell api", # PT "api inteligência artificial", "chatbot api integração", # RU "языковая модель api", "llm api интеграция", # ZH "大语言模型 api", "ai api 接入", # JA "llm api 統合", "言語モデル api", # KO "llm api 통합", "챗봇 api 연동", # HI "ai api इंटीग्रेशन", "chatbot api बनाना", # ID "api kecerdasan buatan", "chatbot api developer", ] KEYWORD_REFRESH_MINUTES = 90 # كل 90 دقيقة يطلب كلمات جديدة _keyword_cache: list[str] = list(FALLBACK_KEYWORDS) _last_keyword_refresh: float = 0.0 _keyword_lock = threading.Lock() def _ai_generate_keywords() -> list[str]: """ يطلب من النموذج توليد كلمات بحث جديدة بنفسه — بكل اللغات والزوايا الممكنة لإيجاد المطورين المحتاجين لـ AI API. """ prompt = """You are helping market Orgteh (orgteh.com) — an AI API service for developers. Orgteh provides: OpenAI-compatible LLM API, multiple models (DeepSeek, Mistral, Llama, Kimi, Gemma), cheap pricing. Target: developers and startups who want to USE or ACCESS an AI language model API for their projects. Generate 40 short search keywords (2-5 words each) to find these people on GitHub, Reddit, forums, and the web. - Cover MANY languages: English, Arabic, Turkish, French, Spanish, German, Portuguese, Russian, Chinese, Japanese, Korean, Hindi, Indonesian, Italian, Polish, Dutch, Vietnamese, Thai, and others you know. - Include different angles: building chatbots, integrating LLMs, looking for API access, seeking cheaper alternatives, starting AI projects. - Keep each keyword SHORT and natural (as someone would actually search or post). - Output ONLY a JSON array of strings, no explanation: ["keyword1", "keyword2", ...]""" resp = ask_ai(prompt, temp=0.8) if resp: try: m = re.search(r'\[.*\]', resp, re.DOTALL) if m: keywords = json.loads(m.group(0)) valid = [k.strip() for k in keywords if isinstance(k, str) and 2 < len(k.strip()) < 60] if len(valid) >= 10: print(f"🔄 Keywords refreshed: {len(valid)} new keywords generated by AI") return valid except Exception as e: print(f"Keyword generation parse error: {e}") return [] def refresh_keywords_if_needed(): """يجدد كلمات البحث تلقائياً كل KEYWORD_REFRESH_MINUTES.""" global _keyword_cache, _last_keyword_refresh now = time.time() with _keyword_lock: if now - _last_keyword_refresh > KEYWORD_REFRESH_MINUTES * 60: new_kws = _ai_generate_keywords() if new_kws: # نمزج الكلمات الجديدة مع الثابتة لضمان التغطية _keyword_cache = list(set(FALLBACK_KEYWORDS + new_kws)) _last_keyword_refresh = now def _keyword_refresh_loop(): """خيط مستقل يجدد الكلمات بشكل دوري.""" # توليد فوري عند البدء refresh_keywords_if_needed() while True: time.sleep(KEYWORD_REFRESH_MINUTES * 60) refresh_keywords_if_needed() def get_random_keywords(n: int = 3) -> list[str]: """يختار n كلمات عشوائية من الكاش الحالي (ثابتة + مولّدة).""" with _keyword_lock: pool = _keyword_cache if _keyword_cache else FALLBACK_KEYWORDS return random.sample(pool, min(n, len(pool))) # ========================================== # 5. مولّد الردود الذكي # ========================================== SITE_CONTEXT = """ Orgteh (orgteh.com) is an AI API service offering: - OpenAI-compatible endpoint: https://orgteh.com/v1/chat/completions - Multiple models: DeepSeek-R1, Kimi (256K context), Mistral, Llama-3B, Gemma - API keys start with "Orgteh_" - Features: GPU-accelerated, sub-second response, enterprise security, streaming support - Extra tools: web scraper, OCR, embeddings, finance/news aggregator, AI Code Builder - Cheaper than OpenAI with the same integration format (drop-in replacement, zero code changes) - Registration: orgteh.com/register """ REPLY_STYLES = [ "casual_developer", # مطور يشارك تجربته "helpful_peer", # زميل يساعد "been_there", # شخص مر بنفس المشكلة ] def ai_judge(text: str) -> bool: """ النموذج نفسه يحكم: هل هذا الشخص يحتاج فعلاً لـ API نماذج ذكاء اصطناعي؟ لا كلمات مفتاحية — فهم النية الحقيقية فقط. """ prompt = f"""Read this post/issue carefully: \"\"\" {text[:600]} \"\"\" Answer with ONE word only — YES or NO: Is this person actively trying to USE or ACCESS an AI language model API (to build something, integrate it, or get API access for a real project)? Rules: - YES: they are building/integrating/seeking API access to an AI model for a concrete use case. - NO: discussing AI concepts/news/opinions, reporting unrelated bugs, academic questions, or anything not about actually using a model API. - If unsure → NO. Answer:""" resp = ask_ai(prompt, temp=0.0) if resp: return resp.strip().upper().startswith("YES") return False def generate_reply(content: str, platform: str) -> str | None: """يولد رد يبدو كمستخدم طبيعي اكتشف orgteh وأراد مشاركته.""" style = random.choice(REPLY_STYLES) prompt = f""" You are a real developer who uses Orgteh (orgteh.com) for AI API access. You are browsing {platform} and found this post/issue that matches your past experience. You want to genuinely help by sharing what worked for you. CONTEXT ABOUT ORGTEH: {SITE_CONTEXT} POST/ISSUE CONTENT: \"\"\" {content[:700]} \"\"\" YOUR PERSONA STYLE TODAY: {style} STRICT RULES: 1. Detect the language of the post and reply in THE SAME LANGUAGE. 2. Write as a REAL PERSON, NOT a company. Use "I" not "we". 3. Sound natural and conversational. NOT salesy or promotional. 4. Keep it SHORT (3-6 sentences max). 5. Mention orgteh.com naturally as "I've been using orgteh.com" or "stumbled on orgteh.com" etc. 6. Only reference features that are actually relevant to their specific problem. 7. DO NOT start with "Hi" or "Hello" or generic openers. 8. DO NOT use bullet points or formatting. 9. Output ONLY the reply text, nothing else. Examples of good natural openers (adapt to language/situation): - "Been using orgteh.com's API for this exact use case, works well with Python..." - "I built something similar, ended up using orgteh.com — they have an OpenAI-compatible endpoint so..." - "For LLM API access orgteh.com has been solid for me, multiple models under one endpoint..." - "جربت orgteh.com لنفس الغرض، API متوافق مع OpenAI ويشتغل مباشرة..." - "عندي تجربة مع هذا، استخدمت orgteh.com وفيه عدة نماذج بـ endpoint موحد..." """ return ask_ai(prompt, temp=0.7) def generate_email(company: str, snippet: str) -> dict | None: """يولد إيميل بارد احترافي وشخصي لاستهداف الشركات والمطورين.""" prompt = f""" You are a developer who uses Orgteh API and wants to recommend it to a company/developer who seems to need AI API services. COMPANY/DEV NAME: {company} WHAT THEY DO (from their website): {snippet[:500]} CONTEXT ABOUT ORGTEH: {SITE_CONTEXT} Write a cold email as a fellow developer (not a company rep). - Detect language from the snippet and write in the SAME LANGUAGE. - Subject: short, specific to their use case. - Body: personal, 4-6 sentences. Reference what they do specifically. - Sound like you're sharing a tool that helped you, not selling. - Include orgteh.com naturally. - Use "I" not "we". Output STRICT JSON only: {{"subject": "...", "body": "..."}} """ resp = ask_ai(prompt, temp=0.6) if resp: try: m = re.search(r'\{.*\}', resp, re.DOTALL) if m: return json.loads(m.group(0)) except: pass return None # ========================================== # 6. نظام مانع الحظر (Anti-Ban Engine) # ========================================== class AntiBan: """ يتحكم في توقيت الإرسال لكل منصة لتجنب الحظر. يستخدم تأخيرات عشوائية وحدوداً يومية صارمة. """ def __init__(self): self._day = datetime.utcnow().date() self._counts = {k: 0 for k in LIMITS} self._lock = threading.Lock() def _reset_if_new_day(self): today = datetime.utcnow().date() if today != self._day: self._day = today self._counts = {k: 0 for k in LIMITS} def can_act(self, platform: str) -> bool: key = f"{platform}_daily" with self._lock: self._reset_if_new_day() return self._counts.get(key, 0) < LIMITS.get(key, 999) def record(self, platform: str): key = f"{platform}_daily" with self._lock: self._counts[key] = self._counts.get(key, 0) + 1 LOCAL_STATS[platform] = LOCAL_STATS.get(platform, 0) + 1 @staticmethod def human_delay(base=15, jitter=20): """تأخير عشوائي يشبه السلوك البشري.""" delay = base + random.uniform(0, jitter) time.sleep(delay) @staticmethod def micro_delay(): time.sleep(random.uniform(2, 6)) antiban = AntiBan() # ========================================== # 7. منصات البث - GitHub # ========================================== def github_loop(): headers = {"Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json"} while True: try: if not GITHUB_TOKEN: time.sleep(600) continue keywords = get_random_keywords(2) for kw in keywords: if not antiban.can_act("github"): break # البحث في Issues resp = requests.get( f"https://api.github.com/search/issues" f"?q={requests.utils.quote(kw)}+state:open+type:issue" f"&sort=updated&per_page=5", headers=headers, timeout=15 ) if resp.status_code != 200: if resp.status_code == 403: time.sleep(300) break for item in resp.json().get("items", []): url = item["html_url"] body_text = (item.get("body") or "") full_text = item["title"] + " " + body_text if memory.seen(url): continue user_id = f"gh:{item.get('user', {}).get('login', '')}" if memory.user_seen(user_id): continue # النموذج يحكم على النية — لا شروط كلمية if not ai_judge(full_text): print(f" ⏭ GitHub skip (not relevant): {item['title'][:40]}") continue reply = generate_reply(full_text, "GitHub") if not reply: continue post = requests.post( item["url"] + "/comments", headers=headers, json={"body": reply}, timeout=10 ) if post.status_code == 201: memory.mark(url) memory.mark_user(user_id) memory.log("GitHub", item["title"], url, body_text, reply) antiban.record("github") print(f"✅ GitHub: {item['title'][:50]}") antiban.human_delay(60, 60) time.sleep(random.randint(90, 180)) except Exception as e: print(f"GitHub Error: {e}") time.sleep(300) # ========================================== # 8. منصات البث - Reddit # ========================================== # سابريدتات مناسبة فقط TARGET_SUBREDDITS = [ "MachineLearning", "LocalLLaMA", "learnmachinelearning", "artificial", "ChatGPT", "OpenAI", "SideProject", "startups", "webdev", "learnprogramming", "Python", "programming", "ArtificialIntelligence", ] def reddit_loop(): while True: try: if not REDDIT_CLIENT_ID or not antiban.can_act("reddit"): time.sleep(600) continue reddit = praw.Reddit( client_id=REDDIT_CLIENT_ID, client_secret=REDDIT_CLIENT_SECRET, username=REDDIT_USERNAME, password=REDDIT_PASSWORD, user_agent="orgteh-community-bot/1.0" ) kw = random.choice(get_random_keywords(1)) sub = random.choice(TARGET_SUBREDDITS) for post in reddit.subreddit(sub).search(kw, limit=5, sort="new"): if not antiban.can_act("reddit"): break full_text = post.title + " " + (post.selftext or "") if not ai_judge(full_text): continue url = post.url user_id = f"rd:{post.author}" if memory.seen(url) or memory.user_seen(user_id): continue reply = generate_reply(full_text, "Reddit") if not reply: continue post.reply(reply) memory.mark(url) memory.mark_user(user_id) memory.log("Reddit", post.title, url, post.selftext or "", reply) antiban.record("reddit") print(f"✅ Reddit: {post.title[:50]}") antiban.human_delay(120, 180) break # رد واحد لكل دورة except Exception as e: print(f"Reddit Error: {e}") time.sleep(random.randint(900, 1500)) # ========================================== # 9. منصات البث - Hacker News # ========================================== def hn_loop(): """Hacker News Ask HN / Show HN عبر Algolia API المجاني.""" base = "https://hn.algolia.com/api/v1/search" while True: try: if not antiban.can_act("hn"): time.sleep(600) continue kw = random.choice(SEED_KEYWORDS_EN[:8]) resp = requests.get(base, params={ "query": kw, "tags": "ask_hn,story", "hitsPerPage": 5, "numericFilters": "points>5" }, timeout=15) if resp.status_code != 200: time.sleep(300) continue for hit in resp.json().get("hits", []): story_id = str(hit.get("objectID")) full_text = (hit.get("title","") + " " + (hit.get("story_text") or "")) if not ai_judge(full_text): continue url = f"https://news.ycombinator.com/item?id={story_id}" user_id = f"hn:{hit.get('author','')}" if memory.seen(url) or memory.user_seen(user_id): continue # HN لا يسمح بالتعليق عبر API — نسجل فقط للمراجعة اليدوية # لكن نولد الرد للاطلاع reply = generate_reply(full_text, "HackerNews") if reply: memory.mark(url) memory.mark_user(user_id) memory.log("HackerNews (Manual)", hit.get("title",""), url, full_text[:300], reply) antiban.record("hn") print(f"📝 HN (للمراجعة): {hit.get('title','')[:50]}") antiban.micro_delay() except Exception as e: print(f"HN Error: {e}") time.sleep(random.randint(600, 900)) # ========================================== # 10. منصات البث - Dev.to # ========================================== def devto_loop(): """Dev.to API - التعليق على مقالات المطورين.""" headers = {"api-key": DEVTO_API_KEY or "", "Content-Type": "application/json"} while True: try: if not DEVTO_API_KEY or not antiban.can_act("devto"): time.sleep(1800) continue for tag in ["api", "llm", "openai", "chatbot", "artificialintelligence"]: resp = requests.get( f"https://dev.to/api/articles?tag={tag}&top=1&per_page=5", timeout=10 ) if resp.status_code != 200: continue for article in resp.json(): full_text = article.get("title","") + " " + (article.get("description") or "") if not ai_judge(full_text): continue url = article.get("url","") user_id = f"devto:{article.get('user',{}).get('username','')}" if not url or memory.seen(url) or memory.user_seen(user_id): continue reply = generate_reply(full_text, "Dev.to") if not reply: continue post = requests.post( f"https://dev.to/api/comments", headers=headers, json={"body_markdown": reply, "article_id": article["id"]}, timeout=10 ) if post.status_code in (200, 201): memory.mark(url) memory.mark_user(user_id) memory.log("Dev.to", article["title"], url, full_text, reply) antiban.record("devto") print(f"✅ Dev.to: {article['title'][:50]}") antiban.human_delay(90, 90) break except Exception as e: print(f"Dev.to Error: {e}") time.sleep(random.randint(1800, 2700)) # ========================================== # 11. منصات البث - Discord (قراءة فقط → رد عبر DM) # ========================================== def discord_monitor_loop(): """ يراقب سيرفرات Discord المحددة ويبحث عن رسائل عن API. يرسل DM للأشخاص المحتاجين (ليس رد عام للسيرفر). يتطلب: DISCORD_BOT_TOKEN + DISCORD_GUILD_IDS """ if not DISCORD_BOT_TOKEN: return headers = { "Authorization": f"Bot {DISCORD_BOT_TOKEN}", "Content-Type": "application/json" } guild_ids = [g.strip() for g in DISCORD_GUILD_IDS.split(",") if g.strip()] while True: try: if not antiban.can_act("discord"): time.sleep(3600) continue for guild_id in guild_ids: # جلب قنوات السيرفر ch_resp = requests.get( f"https://discord.com/api/v10/guilds/{guild_id}/channels", headers=headers, timeout=10 ) if ch_resp.status_code != 200: continue text_channels = [c for c in ch_resp.json() if c.get("type") == 0] for channel in text_channels[:5]: # جلب آخر الرسائل msg_resp = requests.get( f"https://discord.com/api/v10/channels/{channel['id']}/messages?limit=20", headers=headers, timeout=10 ) if msg_resp.status_code != 200: continue for msg in msg_resp.json(): content = msg.get("content", "") msg_id = msg.get("id", "") user = msg.get("author", {}) user_id = f"dc:{user.get('id','')}" if user.get("bot"): continue if not ai_judge(content): continue if memory.seen(msg_id) or memory.user_seen(user_id): continue # إرسال DM dm_resp = requests.post( "https://discord.com/api/v10/users/@me/channels", headers=headers, json={"recipient_id": user["id"]}, timeout=10 ) if dm_resp.status_code != 200: continue dm_channel = dm_resp.json().get("id") reply = generate_reply(content, "Discord") if not reply: continue send = requests.post( f"https://discord.com/api/v10/channels/{dm_channel}/messages", headers=headers, json={"content": reply}, timeout=10 ) if send.status_code in (200, 201): memory.mark(msg_id) memory.mark_user(user_id) memory.log("Discord DM", channel.get("name",""), msg_id, content, reply) antiban.record("discord") print(f"✅ Discord DM → {user.get('username','')}") antiban.human_delay(120, 120) break except Exception as e: print(f"Discord Error: {e}") time.sleep(random.randint(3600, 5400)) # ========================================== # 12. نظام الإيميل المتقدم # ========================================== def send_email(to: str, subject: str, body: str) -> tuple[bool, str]: if not SMTP_EMAIL or not SMTP_PASSWORD: return False, "SMTP غير مهيأ" try: msg = MIMEMultipart() msg["From"] = SMTP_EMAIL msg["To"] = to msg["Subject"] = subject msg.attach(MIMEText(body, "plain", "utf-8")) srv = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) srv.starttls() srv.login(SMTP_EMAIL, SMTP_PASSWORD) srv.send_message(msg) srv.quit() return True, "تم الإرسال" except Exception as e: return False, str(e) # استراتيجيات البحث عن الإيميلات والشركات الناشئة EMAIL_SEARCH_QUERIES = [ # يبني تطبيق على AI API "site:github.io ai api project contact email", "site:github.com llm api integration project contact", '"llm api" developer project contact email', '"ai api" developer "contact" OR "email" site:github.com', # يبحث عن كيفية الاستخدام '"how to use llm api" developer email', '"integrate ai model" project developer contact', # شركات ناشئة تبني على AI "site:producthunt.com ai chatbot api developer 2024", "site:producthunt.com llm powered app developer", "startup ai api integration developer email contact", "site:crunchbase.com ai startup llm api developer", # مطورون مستقلون "site:indie.hackers.com ai api llm project", '"building with llm" developer email contact', '"ai powered" app developer "reach me" OR "contact"', # بحث عام '"llm api" OR "ai api" developer startup email contact 2024', '"chat completions" developer project contact email', ] async def email_hunter(): """يبحث عن مطورين وشركات ناشئة تحتاج فعلاً لـ API ذكاء اصطناعي.""" if not antiban.can_act("email"): return query = random.choice(EMAIL_SEARCH_QUERIES) async with async_playwright() as p: browser = await p.chromium.launch(headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu" ]) page = await browser.new_page() await page.set_extra_http_headers({"Accept-Language": "en-US,en;q=0.9"}) try: # البحث في DuckDuckGo await page.goto( f"https://duckduckgo.com/?q={requests.utils.quote(query)}&t=h_&ia=web", timeout=25000 ) await asyncio.sleep(3) links = await page.query_selector_all("a.result__a") targets = [] for l in links[:8]: href = await l.get_attribute("href") text = await l.inner_text() if href and all(x not in href for x in ["facebook", "linkedin", "twitter", "youtube"]): targets.append((href, text.strip())) for url, title in targets: if not antiban.can_act("email"): break if memory.seen(url): continue try: await page.goto(url, timeout=20000) await asyncio.sleep(2) content = await page.content() # استخراج الإيميلات emails = list(set(re.findall( r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", content ))) emails = [e for e in emails if not any( e.endswith(x) for x in [".png", ".jpg", ".js", ".css", ".svg", ".gif"] ) and "example" not in e and "noreply" not in e] if not emails: continue # التحقق من الصلة بالموضوع page_text = re.sub(r"<[^>]+>", " ", content) if not ai_judge(page_text[:2000]): continue target_email = emails[0] email_id = f"email:{target_email}" if memory.user_seen(email_id): continue # توليد الإيميل company_name = title or url.split("/")[2] if "/" in url else url data = generate_email(company_name, page_text[:600]) if not data: continue ok, msg = send_email(target_email, data["subject"], data["body"]) if ok: memory.mark(url) memory.mark_user(email_id) memory.log("Email", title, url, f"→ {target_email}", data["body"]) antiban.record("email") print(f"✅ Email → {target_email} [{title[:30]}]") await asyncio.sleep(random.uniform(30, 60)) except Exception as e: print(f" Email sub-error: {e}") except Exception as e: print(f"Email search error: {e}") finally: await browser.close() def email_loop_sync(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) while True: try: loop.run_until_complete(email_hunter()) except Exception as e: print(f"Email Loop Error: {e}") time.sleep(random.randint(300, 600)) # ========================================== # 13. تشغيل الخيوط # ========================================== def start_all(): threads = [ threading.Thread(target=_keyword_refresh_loop, daemon=True), # مولّد الكلمات التلقائي threading.Thread(target=github_loop, daemon=True), threading.Thread(target=reddit_loop, daemon=True), threading.Thread(target=hn_loop, daemon=True), threading.Thread(target=devto_loop, daemon=True), threading.Thread(target=discord_monitor_loop, daemon=True), threading.Thread(target=email_loop_sync, daemon=True), ] for t in threads: t.start() print("🚀 Orgteh Marketing Engine — جميع الأنظمة تعمل") start_all() # ========================================== # 14. واجهة Gradio # ========================================== def refresh_stats(): with _keyword_lock: kw_count = len(_keyword_cache) last_refresh = datetime.utcfromtimestamp(_last_keyword_refresh).strftime("%H:%M UTC") if _last_keyword_refresh else "لم يتم بعد" return ( LOCAL_STATS.get("reddit", 0), LOCAL_STATS.get("github", 0), LOCAL_STATS.get("email", 0), LOCAL_STATS.get("devto", 0), LOCAL_STATS.get("hn", 0), LOCAL_STATS.get("discord", 0), f"{kw_count} كلمة | آخر تجديد: {last_refresh}", ) def refresh_logs(): logs = memory.load_logs() chat = [] for log in logs: user_msg = ( f"**[{log['platform']}]** {log['title']}\n\n" f"{log.get('snippet','')}\n\n" f"🔗 {log['url']}" ) bot_msg = f"🕐 {log['ts']}\n\n{log['reply']}" chat.append({"role": "user", "content": user_msg}) chat.append({"role": "assistant", "content": bot_msg}) return chat with gr.Blocks(title="Orgteh Marketing Engine", theme=gr.themes.Base()) as demo: gr.Markdown(""" # 🚀 Orgteh Marketing Engine **موقع:** [orgteh.com](https://www.orgteh.com) | **المنصات:** GitHub · Reddit · HackerNews · Dev.to · Discord · Email """) with gr.Row(): r_reddit = gr.Number(label="Reddit اليوم", min_width=100) r_github = gr.Number(label="GitHub اليوم", min_width=100) r_email = gr.Number(label="Emails اليوم", min_width=100) r_devto = gr.Number(label="Dev.to اليوم", min_width=100) r_hn = gr.Number(label="HN (للمراجعة)", min_width=100) r_discord = gr.Number(label="Discord DMs", min_width=100) r_keywords = gr.Textbox(label="🔑 كلمات البحث الحالية (AI مولّدة + ثابتة)", interactive=False) refresh_btn = gr.Button("🔄 تحديث البيانات") gr.Markdown("### 💬 سجل التفاعلات الحية") log_view = gr.Chatbot(label="السجل", height=650, type="messages") refresh_btn.click(refresh_stats, outputs=[r_reddit, r_github, r_email, r_devto, r_hn, r_discord, r_keywords]) refresh_btn.click(refresh_logs, outputs=[log_view]) demo.load(refresh_stats, outputs=[r_reddit, r_github, r_email, r_devto, r_hn, r_discord, r_keywords]) demo.load(refresh_logs, outputs=[log_view]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)