Nex / app.py
Riy777's picture
Update app.py
f719794 verified
import gradio as gr
import os
import time
import json
import re
import asyncio
import smtplib
import requests
import praw
import random
import threading
import nest_asyncio
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from openai import OpenAI
from playwright.async_api import async_playwright
from apscheduler.schedulers.background import BackgroundScheduler
from upstash_redis import Redis as UpstashRedis
os.environ["TZ"] = "UTC"
nest_asyncio.apply()
# ==========================================
# 1. إعدادات البيئة والمفاتيح
# ==========================================
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY")
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID")
REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET")
REDDIT_USERNAME = os.getenv("REDDIT_USERNAME")
REDDIT_PASSWORD = os.getenv("REDDIT_PASSWORD")
SMTP_EMAIL = os.getenv("SMTP_EMAIL")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
DEVTO_API_KEY = os.getenv("DEVTO_API_KEY") # اختياري
DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") # اختياري
DISCORD_GUILD_IDS = os.getenv("DISCORD_GUILD_IDS", "") # معرفات السيرفرات (مفصولة بفاصلة)
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 587
SITE_URL = "https://www.orgteh.com"
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
# حدود يومية للحماية من الحظر
LIMITS = {
"reddit_daily": 30,
"github_daily": 80,
"email_daily": 200,
"devto_daily": 25,
"hn_daily": 40,
"discord_daily": 20,
}
# ==========================================
# 2. نظام الذاكرة الحديدية (Iron Memory)
# ==========================================
class IronMemory:
"""
يحفظ كل URL / معرف مستخدم تم التعامل معه بشكل دائم.
يعمل على طبقتين: ذاكرة محلية (سريعة) + Upstash Redis (سحابية / دائمة).
لن نعود لأي شخص أو رابط تم حفظه أبداً حتى بعد إعادة التشغيل.
"""
SET_KEY = "orgteh:processed"
LOGS_KEY = "orgteh:logs"
USERS_KEY = "orgteh:users" # مجموعة معرفات المستخدمين تحديداً
def __init__(self):
self._local: set = set()
self._users: set = set()
self.redis = None
self.enabled = False
if UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN:
try:
self.redis = UpstashRedis(url=UPSTASH_REDIS_REST_URL, token=UPSTASH_REDIS_REST_TOKEN)
self.redis.ping()
self.enabled = True
print("✅ Iron Memory: Upstash Redis متصل — الذاكرة الدائمة نشطة")
except Exception as e:
print(f"❌ Upstash خطأ: {e} — سيعمل النظام بذاكرة مؤقتة فقط")
else:
print("⚠️ بيانات Upstash غير موجودة — الذاكرة مؤقتة فقط")
# --- فحص ---
def seen(self, key: str) -> bool:
if key in self._local:
return True
if self.enabled:
try:
return bool(self.redis.sismember(self.SET_KEY, key))
except:
pass
return False
def user_seen(self, user_id: str) -> bool:
if user_id in self._users:
return True
if self.enabled:
try:
return bool(self.redis.sismember(self.USERS_KEY, user_id))
except:
pass
return False
# --- تسجيل ---
def mark(self, key: str):
self._local.add(key)
if self.enabled:
try:
self.redis.sadd(self.SET_KEY, key)
except:
pass
def mark_user(self, user_id: str):
self._users.add(user_id)
if self.enabled:
try:
self.redis.sadd(self.USERS_KEY, user_id)
except:
pass
# --- السجلات ---
def log(self, platform, title, url, snippet, reply):
entry = {
"ts": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"platform": platform,
"title": title,
"url": url,
"snippet": (snippet or "")[:400],
"reply": reply,
}
LOCAL_LOGS.insert(0, entry)
if len(LOCAL_LOGS) > 200:
LOCAL_LOGS.pop()
if self.enabled:
try:
self.redis.lpush(self.LOGS_KEY, json.dumps(entry))
self.redis.ltrim(self.LOGS_KEY, 0, 999)
except:
pass
return entry
def load_logs(self):
if self.enabled:
try:
raw = self.redis.lrange(self.LOGS_KEY, 0, 199)
result = []
for item in raw:
try:
result.append(json.loads(item) if isinstance(item, str) else item)
except:
pass
return result
except:
pass
return LOCAL_LOGS[:]
memory = IronMemory()
LOCAL_LOGS = []
LOCAL_STATS = {"reddit": 0, "github": 0, "email": 0, "devto": 0, "hn": 0, "discord": 0}
# ==========================================
# 3. محرك الذكاء الاصطناعي
# ==========================================
client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY)
def ask_ai(prompt: str, system: str = "You are a helpful assistant.", temp=0.5) -> str | None:
try:
resp = client.chat.completions.create(
model="meta/llama-3.3-70b-instruct",
messages=[{"role": "system", "content": system}, {"role": "user", "content": prompt}],
temperature=temp, max_tokens=900
)
return resp.choices[0].message.content.strip()
except Exception as e:
print(f"AI Error: {e}")
return None
# ==========================================
# 4. نظام كلمات البحث الذكي — تلقائي 100%
# ==========================================
# طبقتان:
# 1. FALLBACK_KEYWORDS — كلمات ثابتة لضمان العمل دائماً
# 2. كل KEYWORD_REFRESH_MINUTES يطلب النموذج توليد كلمات جديدة بنفسه
# بكل اللغات الممكنة بناءً على ما يعرفه عن Orgteh وسوق AI API
# ── كلمات ثابتة (Fallback) — تُستخدم عند بدء التشغيل أو فشل التوليد ──
FALLBACK_KEYWORDS: list[str] = [
# EN
"llm api", "ai api", "chatbot api", "language model api",
"integrate llm", "openai api", "deepseek api", "mistral api",
"llama api", "ai api developer", "chat completions api",
# AR
"api ذكاء اصطناعي", "دمج نموذج لغوي", "api شات بوت",
# TR
"yapay zeka api", "llm api entegrasyonu",
# FR
"api intelligence artificielle", "intégrer llm api",
# ES
"api inteligencia artificial", "integrar llm api",
# DE
"ki api", "sprachmodell api",
# PT
"api inteligência artificial", "chatbot api integração",
# RU
"языковая модель api", "llm api интеграция",
# ZH
"大语言模型 api", "ai api 接入",
# JA
"llm api 統合", "言語モデル api",
# KO
"llm api 통합", "챗봇 api 연동",
# HI
"ai api इंटीग्रेशन", "chatbot api बनाना",
# ID
"api kecerdasan buatan", "chatbot api developer",
]
KEYWORD_REFRESH_MINUTES = 90 # كل 90 دقيقة يطلب كلمات جديدة
_keyword_cache: list[str] = list(FALLBACK_KEYWORDS)
_last_keyword_refresh: float = 0.0
_keyword_lock = threading.Lock()
def _ai_generate_keywords() -> list[str]:
"""
يطلب من النموذج توليد كلمات بحث جديدة بنفسه —
بكل اللغات والزوايا الممكنة لإيجاد المطورين المحتاجين لـ AI API.
"""
prompt = """You are helping market Orgteh (orgteh.com) — an AI API service for developers.
Orgteh provides: OpenAI-compatible LLM API, multiple models (DeepSeek, Mistral, Llama, Kimi, Gemma), cheap pricing.
Target: developers and startups who want to USE or ACCESS an AI language model API for their projects.
Generate 40 short search keywords (2-5 words each) to find these people on GitHub, Reddit, forums, and the web.
- Cover MANY languages: English, Arabic, Turkish, French, Spanish, German, Portuguese, Russian, Chinese, Japanese, Korean, Hindi, Indonesian, Italian, Polish, Dutch, Vietnamese, Thai, and others you know.
- Include different angles: building chatbots, integrating LLMs, looking for API access, seeking cheaper alternatives, starting AI projects.
- Keep each keyword SHORT and natural (as someone would actually search or post).
- Output ONLY a JSON array of strings, no explanation:
["keyword1", "keyword2", ...]"""
resp = ask_ai(prompt, temp=0.8)
if resp:
try:
m = re.search(r'\[.*\]', resp, re.DOTALL)
if m:
keywords = json.loads(m.group(0))
valid = [k.strip() for k in keywords if isinstance(k, str) and 2 < len(k.strip()) < 60]
if len(valid) >= 10:
print(f"🔄 Keywords refreshed: {len(valid)} new keywords generated by AI")
return valid
except Exception as e:
print(f"Keyword generation parse error: {e}")
return []
def refresh_keywords_if_needed():
"""يجدد كلمات البحث تلقائياً كل KEYWORD_REFRESH_MINUTES."""
global _keyword_cache, _last_keyword_refresh
now = time.time()
with _keyword_lock:
if now - _last_keyword_refresh > KEYWORD_REFRESH_MINUTES * 60:
new_kws = _ai_generate_keywords()
if new_kws:
# نمزج الكلمات الجديدة مع الثابتة لضمان التغطية
_keyword_cache = list(set(FALLBACK_KEYWORDS + new_kws))
_last_keyword_refresh = now
def _keyword_refresh_loop():
"""خيط مستقل يجدد الكلمات بشكل دوري."""
# توليد فوري عند البدء
refresh_keywords_if_needed()
while True:
time.sleep(KEYWORD_REFRESH_MINUTES * 60)
refresh_keywords_if_needed()
def get_random_keywords(n: int = 3) -> list[str]:
"""يختار n كلمات عشوائية من الكاش الحالي (ثابتة + مولّدة)."""
with _keyword_lock:
pool = _keyword_cache if _keyword_cache else FALLBACK_KEYWORDS
return random.sample(pool, min(n, len(pool)))
# ==========================================
# 5. مولّد الردود الذكي
# ==========================================
SITE_CONTEXT = """
Orgteh (orgteh.com) is an AI API service offering:
- OpenAI-compatible endpoint: https://orgteh.com/v1/chat/completions
- Multiple models: DeepSeek-R1, Kimi (256K context), Mistral, Llama-3B, Gemma
- API keys start with "Orgteh_"
- Features: GPU-accelerated, sub-second response, enterprise security, streaming support
- Extra tools: web scraper, OCR, embeddings, finance/news aggregator, AI Code Builder
- Cheaper than OpenAI with the same integration format (drop-in replacement, zero code changes)
- Registration: orgteh.com/register
"""
REPLY_STYLES = [
"casual_developer", # مطور يشارك تجربته
"helpful_peer", # زميل يساعد
"been_there", # شخص مر بنفس المشكلة
]
def ai_judge(text: str) -> bool:
"""
النموذج نفسه يحكم: هل هذا الشخص يحتاج فعلاً لـ API نماذج ذكاء اصطناعي؟
لا كلمات مفتاحية — فهم النية الحقيقية فقط.
"""
prompt = f"""Read this post/issue carefully:
\"\"\"
{text[:600]}
\"\"\"
Answer with ONE word only — YES or NO:
Is this person actively trying to USE or ACCESS an AI language model API
(to build something, integrate it, or get API access for a real project)?
Rules:
- YES: they are building/integrating/seeking API access to an AI model for a concrete use case.
- NO: discussing AI concepts/news/opinions, reporting unrelated bugs, academic questions, or anything not about actually using a model API.
- If unsure → NO.
Answer:"""
resp = ask_ai(prompt, temp=0.0)
if resp:
return resp.strip().upper().startswith("YES")
return False
def generate_reply(content: str, platform: str) -> str | None:
"""يولد رد يبدو كمستخدم طبيعي اكتشف orgteh وأراد مشاركته."""
style = random.choice(REPLY_STYLES)
prompt = f"""
You are a real developer who uses Orgteh (orgteh.com) for AI API access.
You are browsing {platform} and found this post/issue that matches your past experience.
You want to genuinely help by sharing what worked for you.
CONTEXT ABOUT ORGTEH:
{SITE_CONTEXT}
POST/ISSUE CONTENT:
\"\"\"
{content[:700]}
\"\"\"
YOUR PERSONA STYLE TODAY: {style}
STRICT RULES:
1. Detect the language of the post and reply in THE SAME LANGUAGE.
2. Write as a REAL PERSON, NOT a company. Use "I" not "we".
3. Sound natural and conversational. NOT salesy or promotional.
4. Keep it SHORT (3-6 sentences max).
5. Mention orgteh.com naturally as "I've been using orgteh.com" or "stumbled on orgteh.com" etc.
6. Only reference features that are actually relevant to their specific problem.
7. DO NOT start with "Hi" or "Hello" or generic openers.
8. DO NOT use bullet points or formatting.
9. Output ONLY the reply text, nothing else.
Examples of good natural openers (adapt to language/situation):
- "Been using orgteh.com's API for this exact use case, works well with Python..."
- "I built something similar, ended up using orgteh.com — they have an OpenAI-compatible endpoint so..."
- "For LLM API access orgteh.com has been solid for me, multiple models under one endpoint..."
- "جربت orgteh.com لنفس الغرض، API متوافق مع OpenAI ويشتغل مباشرة..."
- "عندي تجربة مع هذا، استخدمت orgteh.com وفيه عدة نماذج بـ endpoint موحد..."
"""
return ask_ai(prompt, temp=0.7)
def generate_email(company: str, snippet: str) -> dict | None:
"""يولد إيميل بارد احترافي وشخصي لاستهداف الشركات والمطورين."""
prompt = f"""
You are a developer who uses Orgteh API and wants to recommend it to a company/developer who seems to need AI API services.
COMPANY/DEV NAME: {company}
WHAT THEY DO (from their website): {snippet[:500]}
CONTEXT ABOUT ORGTEH:
{SITE_CONTEXT}
Write a cold email as a fellow developer (not a company rep).
- Detect language from the snippet and write in the SAME LANGUAGE.
- Subject: short, specific to their use case.
- Body: personal, 4-6 sentences. Reference what they do specifically.
- Sound like you're sharing a tool that helped you, not selling.
- Include orgteh.com naturally.
- Use "I" not "we".
Output STRICT JSON only:
{{"subject": "...", "body": "..."}}
"""
resp = ask_ai(prompt, temp=0.6)
if resp:
try:
m = re.search(r'\{.*\}', resp, re.DOTALL)
if m:
return json.loads(m.group(0))
except:
pass
return None
# ==========================================
# 6. نظام مانع الحظر (Anti-Ban Engine)
# ==========================================
class AntiBan:
"""
يتحكم في توقيت الإرسال لكل منصة لتجنب الحظر.
يستخدم تأخيرات عشوائية وحدوداً يومية صارمة.
"""
def __init__(self):
self._day = datetime.utcnow().date()
self._counts = {k: 0 for k in LIMITS}
self._lock = threading.Lock()
def _reset_if_new_day(self):
today = datetime.utcnow().date()
if today != self._day:
self._day = today
self._counts = {k: 0 for k in LIMITS}
def can_act(self, platform: str) -> bool:
key = f"{platform}_daily"
with self._lock:
self._reset_if_new_day()
return self._counts.get(key, 0) < LIMITS.get(key, 999)
def record(self, platform: str):
key = f"{platform}_daily"
with self._lock:
self._counts[key] = self._counts.get(key, 0) + 1
LOCAL_STATS[platform] = LOCAL_STATS.get(platform, 0) + 1
@staticmethod
def human_delay(base=15, jitter=20):
"""تأخير عشوائي يشبه السلوك البشري."""
delay = base + random.uniform(0, jitter)
time.sleep(delay)
@staticmethod
def micro_delay():
time.sleep(random.uniform(2, 6))
antiban = AntiBan()
# ==========================================
# 7. منصات البث - GitHub
# ==========================================
def github_loop():
headers = {"Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json"}
while True:
try:
if not GITHUB_TOKEN:
time.sleep(600)
continue
keywords = get_random_keywords(2)
for kw in keywords:
if not antiban.can_act("github"):
break
# البحث في Issues
resp = requests.get(
f"https://api.github.com/search/issues"
f"?q={requests.utils.quote(kw)}+state:open+type:issue"
f"&sort=updated&per_page=5",
headers=headers, timeout=15
)
if resp.status_code != 200:
if resp.status_code == 403:
time.sleep(300)
break
for item in resp.json().get("items", []):
url = item["html_url"]
body_text = (item.get("body") or "")
full_text = item["title"] + " " + body_text
if memory.seen(url):
continue
user_id = f"gh:{item.get('user', {}).get('login', '')}"
if memory.user_seen(user_id):
continue
# النموذج يحكم على النية — لا شروط كلمية
if not ai_judge(full_text):
print(f" ⏭ GitHub skip (not relevant): {item['title'][:40]}")
continue
reply = generate_reply(full_text, "GitHub")
if not reply:
continue
post = requests.post(
item["url"] + "/comments",
headers=headers,
json={"body": reply},
timeout=10
)
if post.status_code == 201:
memory.mark(url)
memory.mark_user(user_id)
memory.log("GitHub", item["title"], url, body_text, reply)
antiban.record("github")
print(f"✅ GitHub: {item['title'][:50]}")
antiban.human_delay(60, 60)
time.sleep(random.randint(90, 180))
except Exception as e:
print(f"GitHub Error: {e}")
time.sleep(300)
# ==========================================
# 8. منصات البث - Reddit
# ==========================================
# سابريدتات مناسبة فقط
TARGET_SUBREDDITS = [
"MachineLearning", "LocalLLaMA", "learnmachinelearning",
"artificial", "ChatGPT", "OpenAI", "SideProject",
"startups", "webdev", "learnprogramming", "Python",
"programming", "ArtificialIntelligence",
]
def reddit_loop():
while True:
try:
if not REDDIT_CLIENT_ID or not antiban.can_act("reddit"):
time.sleep(600)
continue
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID, client_secret=REDDIT_CLIENT_SECRET,
username=REDDIT_USERNAME, password=REDDIT_PASSWORD,
user_agent="orgteh-community-bot/1.0"
)
kw = random.choice(get_random_keywords(1))
sub = random.choice(TARGET_SUBREDDITS)
for post in reddit.subreddit(sub).search(kw, limit=5, sort="new"):
if not antiban.can_act("reddit"):
break
full_text = post.title + " " + (post.selftext or "")
if not ai_judge(full_text):
continue
url = post.url
user_id = f"rd:{post.author}"
if memory.seen(url) or memory.user_seen(user_id):
continue
reply = generate_reply(full_text, "Reddit")
if not reply:
continue
post.reply(reply)
memory.mark(url)
memory.mark_user(user_id)
memory.log("Reddit", post.title, url, post.selftext or "", reply)
antiban.record("reddit")
print(f"✅ Reddit: {post.title[:50]}")
antiban.human_delay(120, 180)
break # رد واحد لكل دورة
except Exception as e:
print(f"Reddit Error: {e}")
time.sleep(random.randint(900, 1500))
# ==========================================
# 9. منصات البث - Hacker News
# ==========================================
def hn_loop():
"""Hacker News Ask HN / Show HN عبر Algolia API المجاني."""
base = "https://hn.algolia.com/api/v1/search"
while True:
try:
if not antiban.can_act("hn"):
time.sleep(600)
continue
kw = random.choice(SEED_KEYWORDS_EN[:8])
resp = requests.get(base, params={
"query": kw, "tags": "ask_hn,story",
"hitsPerPage": 5, "numericFilters": "points>5"
}, timeout=15)
if resp.status_code != 200:
time.sleep(300)
continue
for hit in resp.json().get("hits", []):
story_id = str(hit.get("objectID"))
full_text = (hit.get("title","") + " " + (hit.get("story_text") or ""))
if not ai_judge(full_text):
continue
url = f"https://news.ycombinator.com/item?id={story_id}"
user_id = f"hn:{hit.get('author','')}"
if memory.seen(url) or memory.user_seen(user_id):
continue
# HN لا يسمح بالتعليق عبر API — نسجل فقط للمراجعة اليدوية
# لكن نولد الرد للاطلاع
reply = generate_reply(full_text, "HackerNews")
if reply:
memory.mark(url)
memory.mark_user(user_id)
memory.log("HackerNews (Manual)", hit.get("title",""), url, full_text[:300], reply)
antiban.record("hn")
print(f"📝 HN (للمراجعة): {hit.get('title','')[:50]}")
antiban.micro_delay()
except Exception as e:
print(f"HN Error: {e}")
time.sleep(random.randint(600, 900))
# ==========================================
# 10. منصات البث - Dev.to
# ==========================================
def devto_loop():
"""Dev.to API - التعليق على مقالات المطورين."""
headers = {"api-key": DEVTO_API_KEY or "", "Content-Type": "application/json"}
while True:
try:
if not DEVTO_API_KEY or not antiban.can_act("devto"):
time.sleep(1800)
continue
for tag in ["api", "llm", "openai", "chatbot", "artificialintelligence"]:
resp = requests.get(
f"https://dev.to/api/articles?tag={tag}&top=1&per_page=5",
timeout=10
)
if resp.status_code != 200:
continue
for article in resp.json():
full_text = article.get("title","") + " " + (article.get("description") or "")
if not ai_judge(full_text):
continue
url = article.get("url","")
user_id = f"devto:{article.get('user',{}).get('username','')}"
if not url or memory.seen(url) or memory.user_seen(user_id):
continue
reply = generate_reply(full_text, "Dev.to")
if not reply:
continue
post = requests.post(
f"https://dev.to/api/comments",
headers=headers,
json={"body_markdown": reply, "article_id": article["id"]},
timeout=10
)
if post.status_code in (200, 201):
memory.mark(url)
memory.mark_user(user_id)
memory.log("Dev.to", article["title"], url, full_text, reply)
antiban.record("devto")
print(f"✅ Dev.to: {article['title'][:50]}")
antiban.human_delay(90, 90)
break
except Exception as e:
print(f"Dev.to Error: {e}")
time.sleep(random.randint(1800, 2700))
# ==========================================
# 11. منصات البث - Discord (قراءة فقط → رد عبر DM)
# ==========================================
def discord_monitor_loop():
"""
يراقب سيرفرات Discord المحددة ويبحث عن رسائل عن API.
يرسل DM للأشخاص المحتاجين (ليس رد عام للسيرفر).
يتطلب: DISCORD_BOT_TOKEN + DISCORD_GUILD_IDS
"""
if not DISCORD_BOT_TOKEN:
return
headers = {
"Authorization": f"Bot {DISCORD_BOT_TOKEN}",
"Content-Type": "application/json"
}
guild_ids = [g.strip() for g in DISCORD_GUILD_IDS.split(",") if g.strip()]
while True:
try:
if not antiban.can_act("discord"):
time.sleep(3600)
continue
for guild_id in guild_ids:
# جلب قنوات السيرفر
ch_resp = requests.get(
f"https://discord.com/api/v10/guilds/{guild_id}/channels",
headers=headers, timeout=10
)
if ch_resp.status_code != 200:
continue
text_channels = [c for c in ch_resp.json() if c.get("type") == 0]
for channel in text_channels[:5]:
# جلب آخر الرسائل
msg_resp = requests.get(
f"https://discord.com/api/v10/channels/{channel['id']}/messages?limit=20",
headers=headers, timeout=10
)
if msg_resp.status_code != 200:
continue
for msg in msg_resp.json():
content = msg.get("content", "")
msg_id = msg.get("id", "")
user = msg.get("author", {})
user_id = f"dc:{user.get('id','')}"
if user.get("bot"):
continue
if not ai_judge(content):
continue
if memory.seen(msg_id) or memory.user_seen(user_id):
continue
# إرسال DM
dm_resp = requests.post(
"https://discord.com/api/v10/users/@me/channels",
headers=headers,
json={"recipient_id": user["id"]},
timeout=10
)
if dm_resp.status_code != 200:
continue
dm_channel = dm_resp.json().get("id")
reply = generate_reply(content, "Discord")
if not reply:
continue
send = requests.post(
f"https://discord.com/api/v10/channels/{dm_channel}/messages",
headers=headers,
json={"content": reply},
timeout=10
)
if send.status_code in (200, 201):
memory.mark(msg_id)
memory.mark_user(user_id)
memory.log("Discord DM", channel.get("name",""), msg_id, content, reply)
antiban.record("discord")
print(f"✅ Discord DM → {user.get('username','')}")
antiban.human_delay(120, 120)
break
except Exception as e:
print(f"Discord Error: {e}")
time.sleep(random.randint(3600, 5400))
# ==========================================
# 12. نظام الإيميل المتقدم
# ==========================================
def send_email(to: str, subject: str, body: str) -> tuple[bool, str]:
if not SMTP_EMAIL or not SMTP_PASSWORD:
return False, "SMTP غير مهيأ"
try:
msg = MIMEMultipart()
msg["From"] = SMTP_EMAIL
msg["To"] = to
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain", "utf-8"))
srv = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
srv.starttls()
srv.login(SMTP_EMAIL, SMTP_PASSWORD)
srv.send_message(msg)
srv.quit()
return True, "تم الإرسال"
except Exception as e:
return False, str(e)
# استراتيجيات البحث عن الإيميلات والشركات الناشئة
EMAIL_SEARCH_QUERIES = [
# يبني تطبيق على AI API
"site:github.io ai api project contact email",
"site:github.com llm api integration project contact",
'"llm api" developer project contact email',
'"ai api" developer "contact" OR "email" site:github.com',
# يبحث عن كيفية الاستخدام
'"how to use llm api" developer email',
'"integrate ai model" project developer contact',
# شركات ناشئة تبني على AI
"site:producthunt.com ai chatbot api developer 2024",
"site:producthunt.com llm powered app developer",
"startup ai api integration developer email contact",
"site:crunchbase.com ai startup llm api developer",
# مطورون مستقلون
"site:indie.hackers.com ai api llm project",
'"building with llm" developer email contact',
'"ai powered" app developer "reach me" OR "contact"',
# بحث عام
'"llm api" OR "ai api" developer startup email contact 2024',
'"chat completions" developer project contact email',
]
async def email_hunter():
"""يبحث عن مطورين وشركات ناشئة تحتاج فعلاً لـ API ذكاء اصطناعي."""
if not antiban.can_act("email"):
return
query = random.choice(EMAIL_SEARCH_QUERIES)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True, args=[
"--no-sandbox", "--disable-setuid-sandbox",
"--disable-dev-shm-usage", "--disable-gpu"
])
page = await browser.new_page()
await page.set_extra_http_headers({"Accept-Language": "en-US,en;q=0.9"})
try:
# البحث في DuckDuckGo
await page.goto(
f"https://duckduckgo.com/?q={requests.utils.quote(query)}&t=h_&ia=web",
timeout=25000
)
await asyncio.sleep(3)
links = await page.query_selector_all("a.result__a")
targets = []
for l in links[:8]:
href = await l.get_attribute("href")
text = await l.inner_text()
if href and all(x not in href for x in ["facebook", "linkedin", "twitter", "youtube"]):
targets.append((href, text.strip()))
for url, title in targets:
if not antiban.can_act("email"):
break
if memory.seen(url):
continue
try:
await page.goto(url, timeout=20000)
await asyncio.sleep(2)
content = await page.content()
# استخراج الإيميلات
emails = list(set(re.findall(
r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", content
)))
emails = [e for e in emails if not any(
e.endswith(x) for x in [".png", ".jpg", ".js", ".css", ".svg", ".gif"]
) and "example" not in e and "noreply" not in e]
if not emails:
continue
# التحقق من الصلة بالموضوع
page_text = re.sub(r"<[^>]+>", " ", content)
if not ai_judge(page_text[:2000]):
continue
target_email = emails[0]
email_id = f"email:{target_email}"
if memory.user_seen(email_id):
continue
# توليد الإيميل
company_name = title or url.split("/")[2] if "/" in url else url
data = generate_email(company_name, page_text[:600])
if not data:
continue
ok, msg = send_email(target_email, data["subject"], data["body"])
if ok:
memory.mark(url)
memory.mark_user(email_id)
memory.log("Email", title, url, f"→ {target_email}", data["body"])
antiban.record("email")
print(f"✅ Email → {target_email} [{title[:30]}]")
await asyncio.sleep(random.uniform(30, 60))
except Exception as e:
print(f" Email sub-error: {e}")
except Exception as e:
print(f"Email search error: {e}")
finally:
await browser.close()
def email_loop_sync():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
try:
loop.run_until_complete(email_hunter())
except Exception as e:
print(f"Email Loop Error: {e}")
time.sleep(random.randint(300, 600))
# ==========================================
# 13. تشغيل الخيوط
# ==========================================
def start_all():
threads = [
threading.Thread(target=_keyword_refresh_loop, daemon=True), # مولّد الكلمات التلقائي
threading.Thread(target=github_loop, daemon=True),
threading.Thread(target=reddit_loop, daemon=True),
threading.Thread(target=hn_loop, daemon=True),
threading.Thread(target=devto_loop, daemon=True),
threading.Thread(target=discord_monitor_loop, daemon=True),
threading.Thread(target=email_loop_sync, daemon=True),
]
for t in threads:
t.start()
print("🚀 Orgteh Marketing Engine — جميع الأنظمة تعمل")
start_all()
# ==========================================
# 14. واجهة Gradio
# ==========================================
def refresh_stats():
with _keyword_lock:
kw_count = len(_keyword_cache)
last_refresh = datetime.utcfromtimestamp(_last_keyword_refresh).strftime("%H:%M UTC") if _last_keyword_refresh else "لم يتم بعد"
return (
LOCAL_STATS.get("reddit", 0),
LOCAL_STATS.get("github", 0),
LOCAL_STATS.get("email", 0),
LOCAL_STATS.get("devto", 0),
LOCAL_STATS.get("hn", 0),
LOCAL_STATS.get("discord", 0),
f"{kw_count} كلمة | آخر تجديد: {last_refresh}",
)
def refresh_logs():
logs = memory.load_logs()
chat = []
for log in logs:
user_msg = (
f"**[{log['platform']}]** {log['title']}\n\n"
f"{log.get('snippet','')}\n\n"
f"🔗 {log['url']}"
)
bot_msg = f"🕐 {log['ts']}\n\n{log['reply']}"
chat.append({"role": "user", "content": user_msg})
chat.append({"role": "assistant", "content": bot_msg})
return chat
with gr.Blocks(title="Orgteh Marketing Engine", theme=gr.themes.Base()) as demo:
gr.Markdown("""
# 🚀 Orgteh Marketing Engine
**موقع:** [orgteh.com](https://www.orgteh.com) | **المنصات:** GitHub · Reddit · HackerNews · Dev.to · Discord · Email
""")
with gr.Row():
r_reddit = gr.Number(label="Reddit اليوم", min_width=100)
r_github = gr.Number(label="GitHub اليوم", min_width=100)
r_email = gr.Number(label="Emails اليوم", min_width=100)
r_devto = gr.Number(label="Dev.to اليوم", min_width=100)
r_hn = gr.Number(label="HN (للمراجعة)", min_width=100)
r_discord = gr.Number(label="Discord DMs", min_width=100)
r_keywords = gr.Textbox(label="🔑 كلمات البحث الحالية (AI مولّدة + ثابتة)", interactive=False)
refresh_btn = gr.Button("🔄 تحديث البيانات")
gr.Markdown("### 💬 سجل التفاعلات الحية")
log_view = gr.Chatbot(label="السجل", height=650, type="messages")
refresh_btn.click(refresh_stats, outputs=[r_reddit, r_github, r_email, r_devto, r_hn, r_discord, r_keywords])
refresh_btn.click(refresh_logs, outputs=[log_view])
demo.load(refresh_stats, outputs=[r_reddit, r_github, r_email, r_devto, r_hn, r_discord, r_keywords])
demo.load(refresh_logs, outputs=[log_view])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)