import os import json from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from pathlib import Path import datetime import sqlite3 from typing import Optional, List from dotenv import load_dotenv load_dotenv(override=True) from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded print(" GEO Platform API starting...") print(f" Working directory: {os.getcwd()}") print(f" Python version: {os.sys.version}") # lazily import heavy pipeline to avoid startup failures when optional deps # (like spaCy) are not installed. import inside handlers that need it. run_pipeline = None try: from server import ai_analysis print(" ai_analysis loaded (from server)") except Exception as e: print(f" ai_analysis failed: {e}") ai_analysis = None try: from server import geo_services print(" geo_services loaded") except Exception as e: print(f" geo_services failed: {e}") geo_services = None try: from server import ai_visibility print(" ai_visibility loaded") except Exception as e: print(f" ai_visibility failed: {e}") ai_visibility = None try: from server import job_queue print(" job_queue loaded") except Exception as e: print(f" job_queue failed: {e}") job_queue = None try: from server import keyword_engine print(" keyword_engine loaded") except Exception as e: print(f" keyword_engine failed: {e}") keyword_engine = None try: from server import users as user_mgmt print(" users loaded") except Exception as e: print(f" users failed: {e}") user_mgmt = None try: from server import search_intel print(" search_intel loaded") except Exception as e: print(f" search_intel failed: {e}") search_intel = None try: from server import payments as _payments except Exception: _payments = None try: from server import onboarding except Exception: onboarding = None from fastapi import WebSocket import asyncio OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output'))) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) print(f" Output directory: {OUTPUT_DIR}") app = FastAPI(title='GEO Platform API') # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) # Rate limiting limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) print(" FastAPI app created with rate limiting and CORS") # Serve frontend static files frontend_dir = Path(__file__).resolve().parent.parent / 'frontend' if frontend_dir.exists(): app.mount('/static', StaticFiles(directory=str(frontend_dir)), name='static') print(f" Frontend mounted: {frontend_dir}") else: print(f" Frontend directory not found: {frontend_dir}") class CrawlRequest(BaseModel): url: str org_name: str org_url: str max_pages: int = 3 runs: int = 1 api_keys: Optional[dict] = None class RecommendationRequest(BaseModel): api_keys: Optional[dict] = None job_id: Optional[int] = None extra_context: Optional[dict] = None class AnalysisRequest(BaseModel): api_keys: Optional[dict] = None job_id: Optional[int] = None class SimulationRequest(BaseModel): content: str brand: str api_keys: Optional[dict] = None @app.post('/api/crawl') async def api_crawl(req: CrawlRequest): global run_pipeline if run_pipeline is None: try: from src.main import run_pipeline as _rp run_pipeline = _rp except Exception as e: return JSONResponse({'ok': False, 'error': 'pipeline not available: ' + str(e)}, status_code=500) # runs: perform multiple runs and average scores, snapshot first run runs = max(1, req.runs or 1) run_results = [] timestamps = [] scores = [] breakdowns = [] audit_objs = [] try: for i in range(runs): ts = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ') subdir = OUTPUT_DIR / f"run-{ts}-{i+1}" res = run_pipeline(req.url, req.org_name, req.org_url, max_pages=req.max_pages, output_dir=subdir) # load created audit audit_path = Path(res['audit_path']) with open(audit_path, 'r', encoding='utf-8') as f: audit_obj = json.load(f) audit_objs.append(audit_obj) # Infer brand name if generic for visibility check org_name_for_visibility = req.org_name if not org_name_for_visibility or org_name_for_visibility == 'Company': pages_for_inference = audit_obj.get('pages', []) inferred = ai_analysis.infer_brand_name(pages_for_inference) if inferred and inferred != 'Company': org_name_for_visibility = inferred # Update audit_obj with inferred name for consistency audit_obj['org_name'] = inferred # run AI visibility (Perplexity) for this run and attach; fallback to OpenAI visibility queries = [f"What is {org_name_for_visibility}?", f"Best services for {org_name_for_visibility}", f"Why should I buy from {org_name_for_visibility}?"] perf = ai_visibility.check_perplexity(org_name_for_visibility, queries) if not perf.get('enabled') and perf.get('reason') == 'PERPLEXITY_KEY not set': # try OpenAI fallback try: perf = ai_visibility.check_openai_visibility(org_name_for_visibility, queries) perf['fallback'] = 'openai' except Exception: pass # 🔷 ADDED: Deep Sentiment & Context Analysis (The "Zaher" Gap) try: sentiment = geo_services.sentiment_analysis(org_name_for_visibility, queries, api_keys=req.api_keys) perf['sentiment_analysis'] = sentiment except Exception: pass audit_obj['ai_visibility'] = perf # overwrite audit with ai visibility included with open(audit_path, 'w', encoding='utf-8') as f: json.dump(audit_obj, f, ensure_ascii=False, indent=2) # compute geo score for this run score = ai_analysis.compute_geo_score(audit_obj.get('pages', []), audit=audit_obj, ai_visibility=perf) scores.append(score['score']) breakdowns.append(score['breakdown']) run_results.append({ 'ts': ts, 'audit_path': str(audit_path), 'geo_score': score }) timestamps.append(ts) # snapshot first run pages for deterministic reference snap_ts = timestamps[0] snap_src = Path(run_results[0]['audit_path']) snap_dst = OUTPUT_DIR / f"snapshot-{snap_ts}.json" with open(snap_src, 'r', encoding='utf-8') as fsrc, open(snap_dst, 'w', encoding='utf-8') as fdst: fdst.write(fsrc.read()) # Update the main audit/analysis files so UI endpoints (`/api/results`) read latest data try: # copy snapshot to output/audit.json audit_main = OUTPUT_DIR / 'audit.json' with open(snap_dst, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m: m.write(s.read()) # generate and save aggregated analysis (OpenAI/Groq) and geo_score based on snapshot try: with open(audit_main, 'r', encoding='utf-8') as f: audit_obj = json.load(f) pages = audit_obj.get('pages', []) analysis = ai_analysis.analyze_pages(pages) geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=audit_obj.get('ai_visibility')) analysis_out = { 'analysis': analysis, 'geo_score': geo_score } with open(OUTPUT_DIR / 'analysis.json', 'w', encoding='utf-8') as fa: json.dump(analysis_out, fa, ensure_ascii=False, indent=2) # also save a top-level analysis.json for backwards compatibility except Exception: pass except Exception: pass # compute aggregated stats import statistics mean_score = int(round(statistics.mean(scores))) median_score = int(round(statistics.median(scores))) variance = float(statistics.pstdev(scores)) history_path = OUTPUT_DIR / 'history.json' history = [] if history_path.exists(): try: history = json.loads(history_path.read_text(encoding='utf-8')) except Exception: history = [] entry = { 'timestamp': timestamps[0], 'url': req.url, 'org_name': req.org_name, 'runs': runs, 'scores': scores, 'mean': mean_score, 'median': median_score, 'variance': variance, 'runs_info': run_results } history.append(entry) history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding='utf-8') return { 'ok': True, 'message': 'crawl completed', 'mean_score': mean_score, 'median_score': median_score, 'variance': variance, 'history_entry': entry } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class JobRequest(BaseModel): url: str org_name: str org_url: str max_pages: int = 3 runs: int = 1 industry_override: Optional[str] = None @app.post('/api/jobs') # @limiter.limit("30/minute") # Rate limiting disabled for testing async def api_enqueue(job: JobRequest, background_tasks: BackgroundTasks, request: Request): try: # ── Usage limit check ── TEMPORARILY DISABLED FOR DEVELOPMENT user_id = None try: auth = request.headers.get('authorization', '') token = auth.split(' ', 1)[1].strip() user_id = user_mgmt.verify_token(token) except Exception: pass # TEMPORARILY DISABLED: Allow unlimited access during development # TODO: Re-enable usage limits later # if user_id: # # Check trial limit first # if onboarding: # trial_status = onboarding.get_trial_status(user_id) # subscription = _payments.get_subscription(user_id) # # # If on free plan and trial expired, show pricing # if subscription.get('plan') == 'free' and trial_status['trial_expired']: # return JSONResponse({ # 'ok': False, # 'error': 'انتهت فترة التجربة المجانية. يرجى الترقية إلى خطة مدفوعة.', # 'trial_expired': True, # 'show_pricing': True, # 'plan': 'free' # }, status_code=429) # # # Check if free user has used their one try # if subscription.get('plan') == 'free' and trial_status['tries_remaining'] <= 0: # return JSONResponse({ # 'ok': False, # 'error': 'لقد استخدمت محاولتك المجانية. يرجى الترقية للحصول على المزيد من التحليلات.', # 'trial_exhausted': True, # 'show_pricing': True, # 'plan': 'free', # 'tries_used': trial_status['tries_used'] # }, status_code=429) # # limit_check = _payments.check_usage_limit(user_id, 'crawls') # if not limit_check['allowed']: # plan = _payments.get_subscription(user_id).get('plan', 'free') # return JSONResponse({ # 'ok': False, # 'error': f'لقد استنفدت حد التحليلات الشهري ({limit_check["limit"]} تحليل). يرجى الترقية إلى خطة أعلى.', # 'limit_reached': True, # 'plan': plan, # 'used': limit_check['used'], # 'limit': limit_check['limit'] # }, status_code=429) jid = job_queue.enqueue_job( job.url, job.org_name, job.org_url, job.max_pages, job.runs, industry_override=job.industry_override ) # Track usage - TEMPORARILY DISABLED # TODO: Re-enable usage tracking later # if user_id: # try: # _payments.track_usage(user_id, 'crawls') # # Use trial attempt for free users # if onboarding: # subscription = _payments.get_subscription(user_id) # if subscription.get('plan') == 'free': # onboarding.use_trial(user_id) # except Exception: # pass # Dispatch background task immediately from server.worker import process_job job_data = job_queue.get_job(jid) if job_data: background_tasks.add_task(process_job, job_data) return {'ok': True, 'job_id': jid} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/jobs') async def api_list_jobs(): try: data = job_queue.list_jobs() return {'ok': True, 'jobs': data} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/jobs/{job_id}') async def api_get_job(job_id: int): try: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) # parse progress JSON try: job['progress'] = json.loads(job.get('progress') or '{}') except Exception: job['progress'] = {} return {'ok': True, 'job': job} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.websocket('/ws/jobs/{job_id}') async def ws_job_progress(ws: WebSocket, job_id: int): await ws.accept() last_updated = None try: while True: job = job_queue.get_job(job_id) if not job: # send a not-found once then keep waiting in case job appears try: await ws.send_json({'ok': False, 'error': 'job not found'}) except Exception: pass await asyncio.sleep(1) continue # normalize progress and datetime fields for JSON try: prog = job.get('progress') if isinstance(prog, str): try: job['progress'] = json.loads(prog or '{}') except Exception: job['progress'] = {} else: job['progress'] = prog or {} except Exception: job['progress'] = {} # convert timestamps to ISO strings for dt_key in ('created_at', 'updated_at'): try: val = job.get(dt_key) if hasattr(val, 'isoformat'): job[dt_key] = val.isoformat() else: job[dt_key] = str(val) if val is not None else None except Exception: job[dt_key] = str(job.get(dt_key)) updated = job.get('updated_at') if updated != last_updated: last_updated = updated try: await ws.send_json({'ok': True, 'job': job}) except Exception: # if sending fails (e.g., client gone), break loop break await asyncio.sleep(1) finally: try: await ws.close() except Exception: pass @app.get('/api/jobs/{job_id}/results') async def api_job_results(job_id: int): job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'no results yet'}, status_code=400) out = {'ok': True, 'job_id': job_id} audit_path = Path(result_path) / 'audit.json' analysis_path = Path(result_path) / 'analysis.json' schema_path = Path(result_path) / 'schema.jsonld' if audit_path.exists(): out['audit'] = json.loads(audit_path.read_text(encoding='utf-8')) if analysis_path.exists(): out['analysis'] = json.loads(analysis_path.read_text(encoding='utf-8')) if schema_path.exists(): out['schema'] = schema_path.read_text(encoding='utf-8') return out @app.get('/api/results') async def api_results(ts: str | None = None): from server.cache_manager import invalidate_results_cache if ts: snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json' if snapshot_path.exists(): try: result = {'audit': json.loads(snapshot_path.read_text(encoding='utf-8'))} return JSONResponse(result, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) except Exception as e: return JSONResponse({'ok': False, 'error': f'Failed to load snapshot: {str(e)}'}, status_code=500) audit_path = OUTPUT_DIR / 'audit.json' schema_path = OUTPUT_DIR / 'schema.jsonld' out = {} if audit_path.exists(): out['audit'] = json.loads(audit_path.read_text(encoding='utf-8')) if schema_path.exists(): out['schema'] = schema_path.read_text(encoding='utf-8') return JSONResponse(out, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) @app.on_event("startup") async def startup_event(): print(" GEO Platform API is ready!") print(f" Access at: http://0.0.0.0:7860") print(f" Health check: http://0.0.0.0:7860/health") try: admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moharek.com') admin_pass = os.environ.get('ADMIN_PASSWORD', 'moharek2025') users = user_mgmt.list_users() if not any(u.get('email') == admin_email for u in users): uid = user_mgmt.create_user( email=admin_email, password=admin_pass, role='admin' ) print(f" Seeded default admin user (id={uid})") except Exception as e: print(f" Seed user error: {e}") @app.get('/api/health') @app.get('/health') async def health_check(): return {'status': 'healthy', 'service': 'GEO Platform'} @app.get('/') async def index(): index_file = frontend_dir / 'index.html' if index_file.exists(): return FileResponse(str(index_file), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) return {'ok': True} @app.get('/{file_path:path}', include_in_schema=False) async def serve_static(file_path: str): """Serve static files including SVG, CSS, JS, etc.""" if file_path.startswith('api/'): return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) file_full_path = frontend_dir / file_path try: file_full_path = file_full_path.resolve() if not str(file_full_path).startswith(str(frontend_dir.resolve())): return JSONResponse({'ok': False, 'error': 'forbidden'}, status_code=403) except Exception: return JSONResponse({'ok': False, 'error': 'invalid path'}, status_code=400) if file_full_path.exists() and file_full_path.is_file(): media_type = 'application/octet-stream' if file_path.endswith('.svg'): media_type = 'image/svg+xml' elif file_path.endswith('.css'): media_type = 'text/css' elif file_path.endswith('.js'): media_type = 'application/javascript' elif file_path.endswith('.html'): media_type = 'text/html' elif file_path.endswith('.json'): media_type = 'application/json' elif file_path.endswith('.png'): media_type = 'image/png' elif file_path.endswith('.jpg') or file_path.endswith('.jpeg'): media_type = 'image/jpeg' elif file_path.endswith('.gif'): media_type = 'image/gif' elif file_path.endswith('.webp'): media_type = 'image/webp' return FileResponse(str(file_full_path), media_type=media_type, headers={"Cache-Control": "public, max-age=3600"}) if not file_path.endswith('.html'): html_path = frontend_dir / f'{file_path}.html' if html_path.exists(): return FileResponse(str(html_path), media_type='text/html', headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) @app.get('/api/jobs/{job_id}/report') async def api_job_report(job_id: int): job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) from server import reports html = reports.build_html_report(result_path) return JSONResponse({'ok': True, 'report_html': html}) @app.get('/api/jobs/{job_id}/report.pdf') async def api_job_report_pdf(job_id: int): job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) try: from server import reports html = reports.build_html_report(result_path) out_pdf = Path(result_path) / f'report-{job_id}.pdf' ok = reports.try_render_pdf(html, out_pdf) if ok and out_pdf.exists(): return FileResponse(str(out_pdf), media_type='application/pdf', filename=f'report-{job_id}.pdf') except Exception as e: print(f'PDF generation error: {e}') return JSONResponse({'ok': False, 'error': 'pdf_generation_failed'}, status_code=500) @app.get('/api/jobs/{job_id}/keywords') async def api_job_keywords(job_id: int, enrich: bool = False, analytics: bool = False): try: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) result = keyword_engine.extract_keywords_from_audit(audit, top_n=40, enrich=enrich, analytics=analytics) if analytics: return {'ok': True, 'analytics': result} else: return {'ok': True, 'keywords': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/jobs/{job_id}/serp') async def api_job_serp(job_id: int, gl: str = 'sa', hl: str = 'ar'): """Full SERP report via ScrapingBee (rank check, PAA, local, competitors).""" try: from server import scrapingbee_client job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit.json not found'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) site_url = job.get('url', '') kws_raw = keyword_engine.extract_keywords_from_audit(audit, top_n=8) keywords = [k['kw'] if isinstance(k, dict) else k for k in kws_raw[:8]] report = scrapingbee_client.full_serp_report(site_url, keywords, country_code=gl, language=hl) return report except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/jobs/{job_id}/actions') async def api_job_actions(job_id: int): try: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) from server import action_engine actions = action_engine.generate_action_plan(audit) return actions except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/actions/save') async def api_save_actions(request: Request): try: data = await request.json() job_id = data.get('job_id') actions = data.get('actions', []) from server.db import SessionLocal, Action db = SessionLocal() try: saved = [] for act in actions: new_act = Action( job_id=job_id, type=act.get('type', 'general'), task=act.get('task', ''), priority_score=100 if str(act.get('priority')).lower() == 'high' else (50 if str(act.get('priority')).lower() == 'medium' else 10), status='pending' ) db.add(new_act) db.flush() saved.append(new_act.id) db.commit() return {'ok': True, 'saved_ids': saved} finally: db.close() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/actions/list') async def api_list_actions(): try: from server.db import SessionLocal, Action db = SessionLocal() try: items = db.query(Action).order_by(Action.priority_score.desc()).limit(50).all() actions = [] for i in items: action_dict = { 'id': i.id, 'job_id': i.job_id, 'type': i.type, 'task': i.task, 'priority_score': i.priority_score, 'status': i.status, 'result': i.result, 'created_at': str(i.created_at), 'impact': 'high' if i.priority_score >= 80 else 'medium' if i.priority_score >= 40 else 'low', 'effort': 'low', 'timeline': '1-2 اسابيع' } if hasattr(i, 'initial_metrics') and i.initial_metrics: action_dict['initial_metrics'] = i.initial_metrics if hasattr(i, 'latest_metrics') and i.latest_metrics: action_dict['latest_metrics'] = i.latest_metrics if hasattr(i, 'impact_score') and i.impact_score: action_dict['impact_score'] = i.impact_score actions.append(action_dict) return {'ok': True, 'actions': actions} finally: db.close() except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/mentions/list') async def api_list_mentions(): try: from server.db import SessionLocal, Mention db = SessionLocal() try: items = db.query(Mention).order_by(Mention.created_at.desc()).limit(50).all() return {'ok': True, 'mentions': [{ 'id': i.id, 'brand': i.brand, 'source': i.source, 'content': i.content, 'url': i.url, 'sentiment_score': i.sentiment_score, 'created_at': str(i.created_at) } for i in items]} finally: db.close() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/graph/entities') async def api_list_entities(): try: from server.db import SessionLocal, Entity db = SessionLocal() try: items = db.query(Entity).order_by(Entity.roi_score.desc()).limit(100).all() return {'ok': True, 'entities': [{ 'id': i.id, 'name': i.name, 'type': i.type, 'roi_score': i.roi_score } for i in items]} finally: db.close() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/ai_visibility/radar') async def api_ai_radar(job_id: int): try: from server import ai_radar from server.db import SessionLocal, AIResponse, AIPrompt db = SessionLocal() try: responses = db.query(AIResponse).join(AIPrompt).filter(AIPrompt.job_id == job_id).order_by(AIResponse.created_at.desc()).limit(10).all() score = ai_radar.calculate_overall_visibility_score(job_id) return { 'ok': True, 'overall_score': score, 'latest_mentions': [{ 'prompt': r.prompt_id, # Simplified 'mentioned': r.mention_found, 'model': r.model_name, 'competitors': r.competitors_mentioned } for r in responses] } finally: db.close() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/autopilot/toggle') async def api_toggle_autopilot(job_id: int, enabled: bool): try: from server.db import SessionLocal, ProjectSettings db = SessionLocal() try: settings = db.query(ProjectSettings).filter(ProjectSettings.job_id == job_id).first() if not settings: settings = ProjectSettings(job_id=job_id, autopilot_enabled=enabled) db.add(settings) else: settings.autopilot_enabled = enabled db.commit() return {'ok': True, 'autopilot_enabled': enabled} finally: db.close() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/strategy/summary') async def api_strategy_summary(job_id: int): try: from server import growth_os summary = growth_os.get_growth_strategy(job_id) return {'ok': True, 'summary': summary} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/actions/{action_id}/execute') async def api_execute_action(action_id: int): try: from server import executor from server.db import SessionLocal, Action db = SessionLocal() try: act = db.query(Action).filter(Action.id == action_id).first() if act: executor.dispatch_action(action_id, act.type) finally: db.close() return {'ok': True, 'action_id': action_id, 'message': 'Dispatched successfully'} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class UserRegister(BaseModel): email: str password: str company_id: int | None = None class CompanyRegister(BaseModel): name: str domain: str | None = None class LoginRequest(BaseModel): email: str password: str @app.post('/api/companies/register') async def api_register_company(req: CompanyRegister): try: cid = user_mgmt.create_company(req.name, req.domain) return {'ok': True, 'company_id': cid} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/users/register') async def api_register_user(req: UserRegister): try: uid = user_mgmt.create_user(req.email, req.password, company_id=req.company_id) token = user_mgmt.make_token(uid) user = user_mgmt.get_user(uid) # Initialize trial and onboarding for new user if onboarding: onboarding.init_trial(uid) onboarding.init_onboarding(uid) return {'ok': True, 'user_id': uid, 'token': token, 'user': user} except sqlite3.IntegrityError: return JSONResponse({'ok': False, 'error': 'البريد الإلكتروني مسجل بالفعل'}, status_code=400) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/users/login') async def api_login(req: LoginRequest): try: user = user_mgmt.authenticate_user(req.email, req.password) if not user: return JSONResponse({'ok': False, 'error': 'invalid credentials'}, status_code=401) token = user_mgmt.make_token(user['id']) return {'ok': True, 'token': token, 'user': user} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/users/me') async def api_me(request: Request): auth = request.headers.get('authorization') or request.headers.get('Authorization') if not auth or not auth.lower().startswith('bearer '): return JSONResponse({'ok': False, 'error': 'missing token'}, status_code=401) try: token = auth.split(' ', 1)[1].strip() if not token or len(token) < 10: return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401) uid = user_mgmt.verify_token(token) if not uid: return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401) u = user_mgmt.get_user(uid) if not u: return JSONResponse({'ok': False, 'error': 'user not found'}, status_code=404) return {'ok': True, 'user': u} except Exception as e: return JSONResponse({'ok': False, 'error': 'token validation failed'}, status_code=401) @app.post('/api/simulate') async def api_simulate(req: SimulationRequest): try: prediction = ai_analysis.simulate_visibility(req.content, req.brand, api_keys=req.api_keys) return {'ok': True, 'prediction': prediction} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/analyze') # @limiter.limit("20/minute") # Rate limiting disabled for testing async def api_analyze(req: AnalysisRequest = None, request: Request = None, job_id: int = None): api_keys = req.api_keys if req else {} job_id = job_id or (req.job_id if req else None) # Resolve audit path: job-specific first, then global fallback if job_id: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': f'Job {job_id} is still processing (result_path is empty). Please wait for the crawler to finish.'}, status_code=400) audit_path = Path(result_path) / 'audit.json' analysis_out_path = Path(result_path) / 'analysis.json' else: audit_path = OUTPUT_DIR / 'audit.json' analysis_out_path = OUTPUT_DIR / 'analysis.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'no audit found; run crawl first'}, status_code=400) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) pages = audit.get('pages', []) # Pass user-provided keys to analysis analysis = ai_analysis.analyze_pages(pages, api_keys=api_keys) # AI Visibility Check (Optional Per-request override) ai_vis = audit.get('ai_visibility') or {} org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages) if api_keys: queries = [f"What is {org_name}?", f"Best services for {org_name}"] if api_keys.get('perplexity'): ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys.get('perplexity')) elif api_keys.get('openai'): ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) # 🔷 Deep AI Visibility Analysis (Sentiment/Shopping/Context) - Always Run if possible try: deep_ai = ai_analysis.analyze_ai_visibility_deep(pages, org_name, api_keys=api_keys) # Flatten deep_ai into ai_vis so compute_geo_score can find them easily if deep_ai: ai_vis.update(deep_ai) except Exception as e: print(f"Deep AI analysis error: {e}") audit['ai_visibility'] = ai_vis # Save updated audit with open(audit_path, 'w', encoding='utf-8') as f: json.dump(audit, f, ensure_ascii=False, indent=2) from server import geo_services # ── Enhanced Visibility Score v2 (API Based) ──────────────────────────────── org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages) # Fetch real search results using provided keys or environment fallbacks searches = [] serp_key = (api_keys.get("SERPAPI_KEY") if api_keys else None) or os.getenv("SERPAPI_KEY") zen_key = (api_keys.get("ZENSERP_KEY") if api_keys else None) or os.getenv("ZENSERP_KEY") core_queries = [f"{org_name}", f"تحميل {org_name}", f"{org_name} review"] for cq in core_queries[:2]: s_res = geo_services._serp_api_search(cq, api_key=serp_key) if not s_res: s_res = geo_services._zenserp_search(cq, api_key=zen_key) if s_res: searches.append(s_res) # ── Competitor Insight Enrichment ────────────────────────────────────────── comp_insight = {} try: # Pass industry override if available industry_override = audit.get('industry_override') comp_insight = geo_services.get_competitor_insights( org_name, audit.get('url'), api_keys=api_keys, industry_override=industry_override ) except Exception as e: print(f"Competitor insight error: {e}") pass # Hybrid Score Calculation (v2) ai_mentions = ai_vis.get("mentions", 0) if ai_vis else 0 total_queries = ai_vis.get("total_queries", 1) if ai_vis else 1 traffic_est = comp_insight.get("monthly_visits", "unknown") geo_v2 = geo_services.calculate_visibility_score_v2( org_name, searches, ai_mentions, total_queries, traffic_est ) geo = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis) geo["v2"] = geo_v2 # Combined 40/40/20 score analysis_out = { 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight } with open(analysis_out_path, 'w', encoding='utf-8') as fa: json.dump(analysis_out, fa, ensure_ascii=False, indent=2) return { 'ok': True, 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight } @app.get('/api/history') async def api_history(): history_path = OUTPUT_DIR / 'history.json' if not history_path.exists(): return { 'ok': True, 'history': [] } try: data = json.loads(history_path.read_text(encoding='utf-8')) return { 'ok': True, 'history': data } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class ActivateRequest(BaseModel): ts: str api_keys: Optional[dict] = None @app.post('/api/history/activate') async def api_activate_history(req: ActivateRequest): """ Switch the 'active' research to a historical snapshot. This overwrites audit.json and triggers a re-analysis. """ try: ts = req.ts snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json' if not snapshot_path.exists(): return JSONResponse({'ok': False, 'error': f'Snapshot {ts} not found'}, status_code=404) # 1. Update audit.json audit_main = OUTPUT_DIR / 'audit.json' with open(snapshot_path, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m: audit_obj = json.load(s) json.dump(audit_obj, m, ensure_ascii=False, indent=2) # 2. Trigger analysis and compute score pages = audit_obj.get('pages', []) api_keys = req.api_keys or {} # Pass user-provided keys to analysis analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys) # If brand name is generic, infer it org_name = audit_obj.get('org_name', 'Company') if not org_name or org_name == 'Company': inferred = ai_analysis.infer_brand_name(pages) if inferred and inferred != 'Company': org_name = inferred audit_obj['org_name'] = org_name # Re-save with inferred name with open(audit_main, 'w', encoding='utf-8') as f: json.dump(audit_obj, f, ensure_ascii=False, indent=2) ai_vis = audit_obj.get('ai_visibility') # Re-run visibility if keys provided if api_keys and (api_keys.get('perplexity') or api_keys.get('openai')): queries = [f"What is {org_name}?", f"Best services for {org_name}"] if api_keys.get('perplexity'): ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity']) elif api_keys.get('openai'): ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) audit_obj['ai_visibility'] = ai_vis with open(audit_main, 'w', encoding='utf-8') as f: json.dump(audit_obj, f, ensure_ascii=False, indent=2) geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=ai_vis) # 3. Save analysis.json full_report = { 'analysis': analysis_data, 'geo_score': geo_score } analysis_path = OUTPUT_DIR / 'analysis.json' with open(analysis_path, 'w', encoding='utf-8') as fa: json.dump(full_report, fa, ensure_ascii=False, indent=2) # 4. Generate Strategic Intelligence Report for the newly activated data # This ensures the Competitive Intelligence Matrix updates immediately try: from server import search_intelligence report = search_intelligence.run_complete_analysis(pages, source_url=audit_obj.get('url', 'http://example.com'), api_keys=api_keys) except Exception as e: print(f"Intelligence sync error: {e}") report = None return { 'ok': True, 'message': f'Research {ts} activated successfully', 'org_name': org_name, 'pages_count': len(pages), 'report': report } except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/recommendations') async def api_recommendations(req: RecommendationRequest = None): # load analysis and audit results then produce recommendations api_keys = req.api_keys if req else {} job_id = req.job_id if req else None extra_context = req.extra_context if (req and req.extra_context) else {} # Resolve paths: prefer job-specific result_path, fall back to global output/ if job_id: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': f'job {job_id} has no results yet — wait for it to complete'}, status_code=400) audit_path = Path(result_path) / 'audit.json' analysis_path = Path(result_path) / 'analysis.json' else: audit_path = OUTPUT_DIR / 'audit.json' analysis_path = OUTPUT_DIR / 'analysis.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'no audit found — run a crawl first'}, status_code=400) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) pages = audit.get('pages', []) # If keys are provided, we should probably re-run analysis to fill in the "Visibility" and "AI Analysis" gaps if api_keys: # Re-run visibility if brand changed or results are missing old_org = audit.get('org_name', 'Company') org_name = old_org if not org_name or org_name == 'Company': inferred = ai_analysis.infer_brand_name(pages) if inferred and inferred != 'Company': org_name = inferred audit['org_name'] = org_name queries = [f"What is {org_name}?", f"Best services for {org_name}"] ai_vis = audit.get('ai_visibility') # If brand name improved OR visibility is missing/demo, re-run it needs_visibility = (org_name != old_org) or not ai_vis or not ai_vis.get('enabled') if needs_visibility and (api_keys.get('perplexity') or api_keys.get('openai')): if api_keys.get('perplexity'): ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity']) elif api_keys.get('openai'): ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) audit['ai_visibility'] = ai_vis # Re-run analysis analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys) geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis) # Save updated results with open(audit_path, 'w', encoding='utf-8') as f: json.dump(audit, f, ensure_ascii=False, indent=2) with open(analysis_path, 'w', encoding='utf-8') as f: json.dump({ 'analysis': analysis_data, 'geo_score': geo_score }, f, ensure_ascii=False, indent=2) else: # Just load existing if not analysis_path.exists(): # Trigger basic analysis if missing analysis_data = ai_analysis.analyze_pages(pages) geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=audit.get('ai_visibility')) else: with open(analysis_path, 'r', encoding='utf-8') as f: ana_obj = json.load(f) analysis_data = ana_obj.get('analysis') geo_score = ana_obj.get('geo_score') # Get Search Intelligence for benchmarks search_intel_report = None try: from server import search_intelligence search_intel_report = search_intelligence.run_complete_analysis(pages, source_url=audit.get('url', ''), api_keys=api_keys) except Exception as e: print(f"Search intel error in recommendations: {e}") recs = ai_analysis.generate_recommendations(pages, geo_score=geo_score, api_keys=api_keys, ai_analysis_results=analysis_data, extra_context=extra_context) return { 'ok': True, 'recommendations': recs, 'audit': audit, 'ai_visibility': audit.get('ai_visibility'), 'analysis': analysis_data, 'geo_score': geo_score, 'search_intel': search_intel_report } class KeywordsRequest(BaseModel): url: str max_pages: int = 1 api_keys: Optional[dict] = None @app.post('/api/keywords') # @limiter.limit("15/minute") # Rate limiting disabled for testing async def api_keywords(req: KeywordsRequest, request: Request, enrich: bool = False, analytics: bool = False): try: # try to fetch pages via crawler (will fallback to requests) from src import crawler pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) # build a minimal audit-like object audit_obj = {'pages': pages} from server import keyword_engine result = keyword_engine.extract_keywords_from_audit(audit_obj, top_n=40, enrich=enrich, analytics=analytics) if analytics: # Return full analytics report return {'ok': True, 'analytics': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]} else: # Return simple keyword list return {'ok': True, 'keywords': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/test/keywords') async def api_test_keywords(): """Test endpoint to verify keyword extraction works.""" try: from src import crawler from server.keyword_engine import extract_keywords_from_audit # Test with abayanoir pages = crawler.crawl_seed('https://abayanoir.com', max_pages=1) audit = {'pages': pages} # Simple mode simple = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=False) # Analytics mode analytics = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=True) return { 'ok': True, 'simple_keywords': simple, 'analytics_type': str(type(analytics)), 'analytics_summary': analytics.get('summary') if isinstance(analytics, dict) else 'NOT A DICT', 'pages_crawled': len(pages) } except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/search/intelligence') async def api_search_intelligence(req: KeywordsRequest, enrich: bool = False): """Complete search intelligence analysis with keywords, competitors, and recommendations.""" try: from src import crawler from server import search_intelligence # Crawl pages pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) # Ensure pages is a list if not isinstance(pages, list): pages = [pages] if pages else [] # Run complete analysis report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich, api_keys=req.api_keys) return {'ok': True, 'report': report} except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/export/keywords/csv') async def api_export_keywords_csv(req: KeywordsRequest, enrich: bool = False): """Export keywords to CSV format.""" try: from src import crawler from server import search_intelligence import csv from io import StringIO from fastapi.responses import StreamingResponse # Crawl and analyze pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) if not isinstance(pages, list): pages = [pages] if pages else [] report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich) # Create CSV output = StringIO() writer = csv.writer(output) # Header writer.writerow(['Keyword', 'Count', 'Density (%)', 'Volume', 'CPC ($)', 'Competition', 'Classification']) # Primary keywords for kw in report['keyword_results']['classification']['primary']['keywords']: writer.writerow([ kw['kw'], kw['count'], kw.get('density', 'N/A'), kw.get('volume', 'N/A'), kw.get('cpc', 'N/A'), kw.get('competition', 'N/A'), 'Primary' ]) # Secondary keywords for kw in report['keyword_results']['classification']['secondary']['keywords']: writer.writerow([ kw['kw'], kw['count'], kw.get('density', 'N/A'), kw.get('volume', 'N/A'), kw.get('cpc', 'N/A'), kw.get('competition', 'N/A'), 'Secondary' ]) # Long-tail keywords for kw in report['keyword_results']['classification']['long_tail']['keywords']: writer.writerow([ kw['kw'], kw['count'], kw.get('density', 'N/A'), kw.get('volume', 'N/A'), kw.get('cpc', 'N/A'), kw.get('competition', 'N/A'), 'Long-tail' ]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=keywords_{req.url.replace('https://', '').replace('/', '_')}.csv"} ) except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/search/competitors') async def api_search_competitors(req: KeywordsRequest): try: from src import crawler from server import competitor_analysis pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) competitors = competitor_analysis.detect_competitors(pages, req.url, min_mentions=1) summary = competitor_analysis.get_competitor_summary(competitors) return { 'ok': True, 'result': { 'competitors': competitors, 'summary': summary, 'pages_analyzed': len(pages) } } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/search/gsc') async def api_search_gsc(site: str, start: str, end: str): try: res = search_intel.gsc_query(site, start, end) return {'ok': True, 'result': res} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ── AI Content Engine ────────────────────────────────────────────────────────── class ArticleRequest(BaseModel): keyword: str lang: str = 'en' target_site: str = "" research_insights: list = [] competitors_content: list = [] crawl_data: dict = {} prefer_backend: str = 'groq' api_keys: dict = {} class OptimizeRequest(BaseModel): content: str keyword: str lang: str = 'en' target_site: str = "" research_insights: list = [] crawl_data: dict = {} prefer_backend: str = 'groq' api_keys: dict = {} class FaqRequest(BaseModel): topic: str page_content: str = '' lang: str = 'en' target_site: str = "" research_insights: list = [] crawl_data: dict = {} count: int = 5 prefer_backend: str = 'groq' api_keys: dict = {} class SemanticRequest(BaseModel): content: str lang: str = 'en' prefer_backend: str = 'groq' api_keys: dict = {} class IdentityRequest(BaseModel): crawl_data: dict = {} lang: str = 'en' prefer_backend: str = 'groq' api_keys: dict = {} @app.post('/api/content/generate') async def api_content_generate(req: ArticleRequest): try: from server import content_engine result = content_engine.generate_article( req.keyword, lang=req.lang, target_site=req.target_site, research_insights=req.research_insights, competitors_content=req.competitors_content, crawl_data=req.crawl_data, prefer_backend=req.prefer_backend, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: print(f"⚠️ [API] Content Gen Failed: {e}. Falling back to Demo.") try: from server import content_engine # Force demo mode result = content_engine.generate_article(req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo') return {'ok': True, 'result': result, 'warn': str(e)} except Exception as e2: return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500) @app.post('/api/content/optimize') async def api_content_optimize(req: OptimizeRequest): try: from server import content_engine result = content_engine.optimize_content( req.content, req.keyword, lang=req.lang, target_site=req.target_site, research_insights=req.research_insights, crawl_data=req.crawl_data, prefer_backend=req.prefer_backend, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: print(f"⚠️ [API] Content Optimization Failed: {e}. Falling back to Demo.") try: from server import content_engine result = content_engine.optimize_content(req.content, req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo') return {'ok': True, 'result': result, 'warn': str(e)} except Exception as e2: return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500) @app.post('/api/content/faqs') async def api_content_faqs(req: FaqRequest): try: from server import content_engine result = content_engine.generate_faqs( req.topic, page_content=req.page_content, lang=req.lang, count=req.count, target_site=req.target_site, research_insights=req.research_insights, crawl_data=req.crawl_data, prefer_backend=req.prefer_backend, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/content/semantic') async def api_content_semantic(req: SemanticRequest): try: from server import content_engine result = content_engine.semantic_optimize( req.content, lang=req.lang, prefer_backend=req.prefer_backend, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/content/identity') async def api_content_identity(req: IdentityRequest): try: from server import content_engine result = content_engine.generate_identity( crawl_data=req.crawl_data, lang=req.lang, prefer_backend=req.prefer_backend, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ═══════════════════════════════════════════════════════════════════════════════ # PAID ADS MANAGEMENT MODULE # ═══════════════════════════════════════════════════════════════════════════════ from server import ads_manager, ads_ai class AdsConnectRequest(BaseModel): developer_token: str client_id: str client_secret: str refresh_token: str customer_id: str class AdsAIRequest(BaseModel): api_keys: Optional[dict] = None lang: Optional[str] = 'ar' service_name: Optional[str] = 'خدمات SEO' usp: Optional[str] = 'نتائج مضمونة في 90 يوم' target_audience: Optional[str] = 'شركات سعودية' class CampaignCreateRequest(BaseModel): name: str budget_usd: float = 5.0 target_cpa: Optional[float] = None @app.post('/api/ads/connect') async def api_ads_connect(req: AdsConnectRequest): """Save Google Ads credentials and verify they work.""" try: credentials = { 'developer_token': req.developer_token, 'client_id': req.client_id, 'client_secret': req.client_secret, 'refresh_token': req.refresh_token, 'customer_id': req.customer_id } result = ads_manager.verify_google_connection(credentials) if result.get('ok'): ads_manager.save_ads_config(credentials) return result except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/ads/dashboard') async def api_ads_dashboard(demo: bool = False, days: int = 30): """Return unified KPI dashboard data — uses saved credentials or demo.""" try: credentials = {} if demo else ads_manager.load_ads_config() campaigns = ads_manager.get_campaign_performance(credentials, days=days) summary = ads_manager.build_ads_summary(campaigns, credentials=credentials) return {'ok': True, 'summary': summary, 'campaigns': campaigns} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/ads/keywords') async def api_ads_keywords(demo: bool = False): """Return keyword performance with Quality Scores.""" try: credentials = {} if demo else ads_manager.load_ads_config() keywords = ads_manager.get_keyword_performance(credentials) return {'ok': True, 'keywords': keywords, 'count': len(keywords)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/ads/search-terms') async def api_ads_search_terms(demo: bool = False, min_clicks: int = 5): """Return search term analysis — converting vs. wasted spend.""" try: credentials = {} if demo else ads_manager.load_ads_config() data = ads_manager.get_search_terms(credentials, min_clicks=min_clicks) return {'ok': True, 'data': data} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/ads/campaigns') async def api_ads_create_campaign(req: CampaignCreateRequest): """Create a new Google Ads campaign (starts PAUSED for safety).""" try: credentials = ads_manager.load_ads_config() result = ads_manager.create_campaign( credentials, req.name, req.budget_usd, req.target_cpa ) return result except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/ads/ai/bid-suggestions') async def api_ads_bid_suggestions(req: AdsAIRequest): """AI-powered bid adjustment recommendations for each keyword.""" try: credentials = ads_manager.load_ads_config() keywords = ads_manager.get_keyword_performance(credentials) suggestions = ads_ai.ai_bid_suggestion(keywords, api_keys=req.api_keys or {}) return {'ok': True, 'suggestions': suggestions, 'count': len(suggestions)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/ads/ai/copy') async def api_ads_generate_copy(req: AdsAIRequest): """Generate Arabic + English RSA ad copy using AI.""" try: result = ads_ai.generate_ad_copy( service_name=req.service_name or 'خدمات SEO', usp=req.usp or 'نتائج مضمونة في 90 يوم', target_audience=req.target_audience or 'شركات سعودية', lang=req.lang or 'ar', api_keys=req.api_keys or {} ) return {'ok': True, 'copy': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/ads/ai/negatives') async def api_ads_negative_keywords(req: AdsAIRequest): """Detect irrelevant search terms and suggest negative keywords.""" try: credentials = ads_manager.load_ads_config() search_terms = ads_manager.get_search_terms(credentials) result = ads_ai.detect_negative_keywords(search_terms, api_keys=req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/ads/ai/weekly-report') async def api_ads_weekly_report(req: AdsAIRequest): """Generate AI-written weekly performance report in Arabic or English.""" try: credentials = ads_manager.load_ads_config() campaigns = ads_manager.get_campaign_performance(credentials) keywords = ads_manager.get_keyword_performance(credentials) search_terms = ads_manager.get_search_terms(credentials) report = ads_ai.generate_weekly_report( campaigns, keywords, search_terms, api_keys=req.api_keys or {}, lang=req.lang or 'ar' ) return {'ok': True, 'report': report} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ══════════════════════════════════════════════════════════════════════════════ # GEO SERVICES — 6 AI Visibility Services # ══════════════════════════════════════════════════════════════════════════════ from server import geo_services class GeoServiceRequest(BaseModel): brand: str url: Optional[str] = None queries: Optional[List[str]] = None competitors: Optional[List[str]] = None brand_variants: Optional[List[str]] = None api_keys: Optional[dict] = None class SimulatorRequest(BaseModel): brand: str original_content: str improved_content: str test_queries: List[str] api_keys: Optional[dict] = None @app.post('/api/geo/visibility') async def api_geo_visibility(req: GeoServiceRequest): try: queries = req.queries or geo_services.DEFAULT_QUERIES result = geo_services.visibility_score(req.brand, queries, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/recognition') async def api_geo_recognition(req: GeoServiceRequest): try: queries = req.queries or geo_services.DEFAULT_QUERIES variants = req.brand_variants or [req.brand] result = geo_services.brand_recognition(req.brand, variants, queries, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/sentiment') async def api_geo_sentiment(req: GeoServiceRequest): try: queries = req.queries or geo_services.DEFAULT_QUERIES result = geo_services.sentiment_analysis(req.brand, queries, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/competitors') async def api_geo_competitors(req: GeoServiceRequest): try: queries = req.queries or geo_services.DEFAULT_QUERIES competitors = req.competitors or ['SEMrush', 'Ahrefs', 'Moz'] result = geo_services.competitor_ranking(req.brand, competitors, queries, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/regional') async def api_geo_regional(req: GeoServiceRequest): try: result = geo_services.geo_regional_analysis(req.brand, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/fix') async def api_geo_fix(req: GeoServiceRequest): try: if not req.url: return JSONResponse({'ok': False, 'error': 'url required'}, status_code=400) result = geo_services.fix_recommendations(req.url, req.brand, {}, req.api_keys or {}) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/simulate') async def api_geo_simulate(req: SimulatorRequest): try: result = geo_services.visibility_simulator( req.original_content, req.improved_content, req.test_queries, req.brand, req.api_keys or {} ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/geo/suite') async def api_geo_suite(req: GeoServiceRequest, background_tasks: BackgroundTasks): """Run all 6 GEO services at once for a brand.""" try: result = geo_services.run_full_suite( req.brand, req.url, req.competitors, req.api_keys or {} ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ═══════════════════════════════════════════════════════════════════════════════ # ADVANCED FEATURES — Keyword Tracking, Alerts, Scheduler, Bulk, Gap, Email # ═══════════════════════════════════════════════════════════════════════════════ from server.advanced_features import ( save_keyword_snapshot, save_geo_score_snapshot, get_keyword_trends, get_geo_score_trends, check_and_create_alerts, get_alerts, mark_alerts_seen, add_scheduled_crawl, list_scheduled_crawls, delete_scheduled_crawl, competitor_keyword_gap, bulk_enqueue, send_weekly_report, start_scheduler, init_advanced_tables ) try: init_advanced_tables() start_scheduler() except Exception: pass @app.get('/api/tracking/keywords') async def api_keyword_trends(url: str, keyword: str = None, days: int = 30): try: return {'ok': True, 'trends': get_keyword_trends(url, keyword, days)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/tracking/geo-score') async def api_geo_score_trends(url: str, days: int = 90): try: return {'ok': True, 'trends': get_geo_score_trends(url, days)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/alerts') async def api_get_alerts(url: str = None, unseen_only: bool = False): try: alerts = get_alerts(url, unseen_only) return {'ok': True, 'alerts': alerts, 'count': len(alerts)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/alerts/seen') async def api_mark_alerts_seen(req: dict): try: mark_alerts_seen(req.get('ids', [])) return {'ok': True} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class ScheduleRequest(BaseModel): url: str org_name: str = '' org_url: str = '' max_pages: int = 3 frequency: str = 'weekly' @app.post('/api/schedule') async def api_add_schedule(req: ScheduleRequest): try: sid = add_scheduled_crawl(req.url, req.org_name, req.org_url, req.max_pages, req.frequency) return {'ok': True, 'schedule_id': sid} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/schedule') async def api_list_schedules(): try: return {'ok': True, 'schedules': list_scheduled_crawls()} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.delete('/api/schedule/{schedule_id}') async def api_delete_schedule(schedule_id: int): try: delete_scheduled_crawl(schedule_id) return {'ok': True} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class GapRequest(BaseModel): your_keywords: list = [] competitor_url: str max_pages: int = 3 @app.post('/api/competitor/gap') async def api_competitor_gap(req: GapRequest): try: return {'ok': True, 'gap': competitor_keyword_gap(req.your_keywords, req.competitor_url, req.max_pages)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class BulkRequest(BaseModel): urls: list org_name: str = '' max_pages: int = 2 @app.post('/api/bulk/crawl') async def api_bulk_crawl(req: BulkRequest): try: job_ids = bulk_enqueue(req.urls, req.org_name, req.max_pages) return {'ok': True, 'job_ids': job_ids, 'count': len(job_ids)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) class EmailReportRequest(BaseModel): to_email: str url: str @app.post('/api/reports/email') async def api_send_email_report(req: EmailReportRequest): try: ok = send_weekly_report(req.to_email, req.url) if ok: return {'ok': True, 'message': f'تم الإرسال إلى {req.to_email}'} return JSONResponse({'ok': False, 'error': 'فشل الإرسال — أضف SMTP_HOST و SMTP_USER و SMTP_PASS في .env'}, status_code=400) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) _SETTINGS_PATH = Path(os.environ.get('OUTPUT_DIR', Path(__file__).resolve().parent.parent / 'output')) / 'settings.json' def _load_settings() -> dict: if _SETTINGS_PATH.exists(): try: return json.loads(_SETTINGS_PATH.read_text()) except Exception: pass return {} def _save_settings(data: dict): _SETTINGS_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2)) @app.get('/api/settings') async def api_get_settings(): s = _load_settings() safe = {} for k, v in s.items(): safe[k] = '***' if (v and (k.endswith('_key') or k.endswith('_KEY') or 'pass' in k.lower())) else v return {'ok': True, 'settings': safe} @app.post('/api/settings') async def api_save_settings(req: dict): try: current = _load_settings() for k, v in req.items(): if v and v != '***': current[k] = v os.environ[k.upper()] = v _save_settings(current) return {'ok': True} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) try: for k, v in _load_settings().items(): if v and v != '***': os.environ.setdefault(k.upper(), v) except Exception: pass # ── Competitor Intelligence Analyzer ───────────────────────────────────────── class CompetitorIntelRequest(BaseModel): url: str region: str = 'Saudi Arabia' industry: str = '' count: int = 7 api_keys: dict = {} @app.post('/api/competitor/intelligence') async def api_competitor_intelligence(req: CompetitorIntelRequest): try: from server.competitor_intel import analyze_competitors result = analyze_competitors( req.url, region=req.region, industry=req.industry, count=req.count, api_keys=req.api_keys ) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ── Projects (Multi-site) ───────────────────────────────────────────────────── import sqlite3 as _sqlite3 from pathlib import Path as _Path _PROJ_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'projects.db' def _init_projects_db(): conn = _sqlite3.connect(str(_PROJ_DB)) conn.execute("""CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, name TEXT NOT NULL, url TEXT NOT NULL, industry TEXT, color TEXT DEFAULT '#3b82f6', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.execute("""CREATE TABLE IF NOT EXISTS project_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER, job_id INTEGER, geo_score INTEGER, seo_score INTEGER, ai_visibility INTEGER, pages_crawled INTEGER, keywords_count INTEGER, snapshot_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.commit() conn.close() _init_projects_db() class ProjectCreate(BaseModel): name: str url: str industry: str = '' color: str = '#3b82f6' @app.get('/api/projects') async def api_list_projects(request: Request): auth = request.headers.get('authorization','') uid = None try: token = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token) except: pass conn = _sqlite3.connect(str(_PROJ_DB)) conn.row_factory = _sqlite3.Row rows = conn.execute('SELECT * FROM projects WHERE user_id=? OR user_id IS NULL ORDER BY created_at DESC', (uid,)).fetchall() projects = [] for r in rows: p = dict(r) # Get latest snapshot snap = conn.execute('SELECT * FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 1', (p['id'],)).fetchone() p['latest'] = dict(snap) if snap else {} # Get trend (last 10 snapshots) trend = conn.execute('SELECT geo_score, snapshot_date FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 10', (p['id'],)).fetchall() p['trend'] = [dict(t) for t in reversed(trend)] projects.append(p) conn.close() return {'ok': True, 'projects': projects} @app.post('/api/projects') async def api_create_project(req: ProjectCreate, request: Request): auth = request.headers.get('authorization','') uid = None try: token = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token) except: pass conn = _sqlite3.connect(str(_PROJ_DB)) cur = conn.execute('INSERT INTO projects (user_id,name,url,industry,color) VALUES (?,?,?,?,?)', (uid, req.name, req.url, req.industry, req.color)) pid = cur.lastrowid conn.commit() conn.close() return {'ok': True, 'project_id': pid} @app.delete('/api/projects/{project_id}') async def api_delete_project(project_id: int): conn = _sqlite3.connect(str(_PROJ_DB)) conn.execute('DELETE FROM projects WHERE id=?', (project_id,)) conn.execute('DELETE FROM project_snapshots WHERE project_id=?', (project_id,)) conn.commit() conn.close() return {'ok': True} @app.post('/api/projects/{project_id}/snapshot') async def api_save_snapshot(project_id: int, request: Request): data = await request.json() conn = _sqlite3.connect(str(_PROJ_DB)) conn.execute("""INSERT INTO project_snapshots (project_id,job_id,geo_score,seo_score,ai_visibility,pages_crawled,keywords_count) VALUES (?,?,?,?,?,?,?)""", (project_id, data.get('job_id'), data.get('geo_score',0), data.get('seo_score',0), data.get('ai_visibility',0), data.get('pages_crawled',0), data.get('keywords_count',0))) conn.commit() conn.close() return {'ok': True} # ── Subscription & Plans ────────────────────────────────────────────────────── from server import payments as _payments @app.get('/api/plans') async def api_get_plans(): return {'ok': True, 'plans': _payments.PLANS} @app.get('/api/subscription') async def api_get_subscription(request: Request): auth = request.headers.get('authorization','') try: token = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token) if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) sub = _payments.get_subscription(uid) usage = _payments.get_usage(uid) return {'ok': True, 'subscription': sub, 'usage': usage} except Exception as e: return JSONResponse({'ok':False,'error':str(e)},status_code=500) @app.post('/api/subscription/checkout') async def api_create_checkout(request: Request): try: data = await request.json() auth = request.headers.get('authorization','') token = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token) if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) result = _payments.create_checkout_session( uid, data.get('plan','pro'), data.get('success_url', '/portal.html?upgraded=1'), data.get('cancel_url', '/pricing.html') ) return {'ok': True, **result} except Exception as e: return JSONResponse({'ok':False,'error':str(e)},status_code=500) @app.post('/api/subscription/webhook') async def api_stripe_webhook(request: Request): payload = await request.body() sig = request.headers.get('stripe-signature','') result = _payments.handle_webhook(payload, sig) return result @app.get('/api/subscription/usage') async def api_check_usage(resource: str, request: Request): try: auth = request.headers.get('authorization','') token = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token) if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) result = _payments.check_usage_limit(uid, resource) return {'ok': True, **result} except Exception as e: return JSONResponse({'ok':False,'error':str(e)},status_code=500) # ── SEO Score + Core Web Vitals ─────────────────────────────────────────────── import requests as _requests @app.get('/api/seo-score') async def api_seo_score(url: str): """Traditional SEO score + Core Web Vitals via PageSpeed API.""" try: from server.competitor_intel import get_pagespeed ps = get_pagespeed(url) # Build SEO score from PageSpeed data seo_raw = ps.get('seo', 0) perf_raw = ps.get('performance', 0) access_raw = ps.get('accessibility', 70) bp_raw = ps.get('best_practices', 80) overall = round((seo_raw * 0.4 + perf_raw * 0.3 + access_raw * 0.15 + bp_raw * 0.15)) # Core Web Vitals cwv = { 'lcp': ps.get('lcp', 'N/A'), 'fid': ps.get('fid', 'N/A'), 'cls': ps.get('cls', 'N/A'), 'fcp': ps.get('fcp', 'N/A'), 'ttfb': ps.get('ttfb', 'N/A'), } status = 'ممتاز' if overall >= 80 else 'جيد' if overall >= 60 else 'يحتاج تحسين' return { 'ok': True, 'url': url, 'seo_score': overall, 'status': status, 'breakdown': { 'seo': seo_raw, 'performance': perf_raw, 'accessibility': access_raw, 'best_practices': bp_raw, }, 'core_web_vitals': cwv, 'source': ps.get('source', 'pagespeed') } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ── Schema Validation ───────────────────────────────────────────────────────── @app.post('/api/schema/validate') async def api_validate_schema(request: Request): """Validate JSON-LD schema markup.""" try: data = await request.json() url = data.get('url', '') schema_str = data.get('schema', '') issues = [] score = 100 if not schema_str: # Try to fetch from URL if url: try: import re resp = _requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'}) matches = re.findall(r"]*type=[\"']application/ld\+json[\"'][^>]*>(.*?)", resp.text, re.DOTALL) schema_str = matches[0] if matches else '' except: pass if not schema_str: return {'ok': True, 'score': 0, 'issues': [ {'type': 'missing', 'severity': 'critical', 'message': 'لا توجد بيانات JSON-LD منظمة في الصفحة'} ], 'has_schema': False} import json as _json try: schema_obj = _json.loads(schema_str) except: return {'ok': True, 'score': 10, 'issues': [ {'type': 'invalid_json', 'severity': 'critical', 'message': 'بيانات JSON-LD غير صالحة — خطأ في التنسيق'} ], 'has_schema': True} # Check required fields schema_type = schema_obj.get('@type', '') context = schema_obj.get('@context', '') if not context: issues.append({'type': 'missing_context', 'severity': 'high', 'message': 'حقل @context مفقود — يجب أن يكون https://schema.org'}) score -= 20 if not schema_type: issues.append({'type': 'missing_type', 'severity': 'high', 'message': 'حقل @type مفقود — حدد نوع الكيان (Organization, Product, etc.)'}) score -= 20 # Type-specific checks if schema_type == 'Organization': for field in ['name', 'url', 'logo', 'contactPoint']: if field not in schema_obj: issues.append({'type': f'missing_{field}', 'severity': 'medium', 'message': f'حقل {field} مفقود من Organization schema'}) score -= 10 elif schema_type == 'Product': for field in ['name', 'description', 'offers']: if field not in schema_obj: issues.append({'type': f'missing_{field}', 'severity': 'medium', 'message': f'حقل {field} مفقود من Product schema'}) score -= 10 elif schema_type == 'Article': for field in ['headline', 'author', 'datePublished']: if field not in schema_obj: issues.append({'type': f'missing_{field}', 'severity': 'medium', 'message': f'حقل {field} مفقود من Article schema'}) score -= 10 if not issues: issues.append({'type': 'valid', 'severity': 'ok', 'message': f'Schema صالح من نوع {schema_type}'}) return { 'ok': True, 'score': max(0, score), 'schema_type': schema_type, 'has_schema': True, 'issues': issues, 'recommendations': [ 'أضف FAQ Schema لتحسين الظهور في نتائج الذكاء الاصطناعي', 'أضف BreadcrumbList Schema لتحسين التنقل', 'أضف SiteLinksSearchBox Schema للبحث المباشر', ] if score < 80 else [] } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # ── Client Portal (Read-only) ───────────────────────────────────────────────── class ClientTokenCreate(BaseModel): job_id: int client_email: str expires_days: int = 30 _CLIENT_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'clients.db' def _init_client_db(): conn = _sqlite3.connect(str(_CLIENT_DB)) conn.execute("""CREATE TABLE IF NOT EXISTS client_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT UNIQUE NOT NULL, job_id INTEGER NOT NULL, client_email TEXT, owner_user_id INTEGER, expires_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.commit() conn.close() _init_client_db() @app.post('/api/client/token') async def api_create_client_token(req: ClientTokenCreate, request: Request): """Create a read-only client access token for a specific job.""" import secrets from datetime import datetime, timedelta auth = request.headers.get('authorization','') try: token_str = auth.split(' ',1)[1].strip() uid = user_mgmt.verify_token(token_str) if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) except: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) client_token = secrets.token_urlsafe(32) expires = datetime.utcnow() + timedelta(days=req.expires_days) conn = _sqlite3.connect(str(_CLIENT_DB)) conn.execute('INSERT INTO client_tokens (token,job_id,client_email,owner_user_id,expires_at) VALUES (?,?,?,?,?)', (client_token, req.job_id, req.client_email, uid, expires)) conn.commit() conn.close() portal_url = f'/client.html?token={client_token}' return {'ok': True, 'token': client_token, 'portal_url': portal_url, 'expires_at': str(expires)} @app.get('/api/client/report') async def api_client_report(token: str): """Get report data using client token (read-only).""" from datetime import datetime conn = _sqlite3.connect(str(_CLIENT_DB)) conn.row_factory = _sqlite3.Row row = conn.execute('SELECT * FROM client_tokens WHERE token=?', (token,)).fetchone() conn.close() if not row: return JSONResponse({'ok':False,'error':'رابط غير صالح'},status_code=404) row = dict(row) if row.get('expires_at'): try: exp = datetime.fromisoformat(str(row['expires_at'])) if datetime.utcnow() > exp: return JSONResponse({'ok':False,'error':'انتهت صلاحية الرابط'},status_code=403) except: pass job_id = row['job_id'] job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok':False,'error':'التقرير غير موجود'},status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok':False,'error':'التقرير لم يكتمل بعد'},status_code=400) out = {'ok': True, 'job_id': job_id, 'client_email': row.get('client_email')} import json as _json for fname in ['audit.json', 'analysis.json']: fpath = _Path(result_path) / fname if fpath.exists(): key = fname.replace('.json','') out[key] = _json.loads(fpath.read_text(encoding='utf-8')) return out # ── Email Reports ───────────────────────────────────────────────────────────── class EmailReportRequest2(BaseModel): to_email: str job_id: int subject: str = '' include_recommendations: bool = True @app.post('/api/reports/send') async def api_send_report_email(req: EmailReportRequest2, request: Request): """Send a full HTML report to a client email.""" try: from server import reports as _reports from server.advanced_features import send_email_report job = job_queue.get_job(req.job_id) if not job or not job.get('result_path'): return JSONResponse({'ok':False,'error':'التقرير غير جاهز'},status_code=400) html = _reports.build_html_report(job['result_path']) subject = req.subject or f'تقرير GEO — {job.get("url","")}' ok = send_email_report(req.to_email, subject, html) if ok: return {'ok': True, 'message': f'تم إرسال التقرير إلى {req.to_email}'} return JSONResponse({'ok':False,'error':'فشل الإرسال — تحقق من إعدادات SMTP في .env'},status_code=400) except Exception as e: return JSONResponse({'ok':False,'error':str(e)},status_code=500) # ── White-label Settings ────────────────────────────────────────────────────── class WhiteLabelSettings(BaseModel): agency_name: str = '' agency_logo: str = '' primary_color: str = '#3b82f6' report_footer: str = '' custom_domain: str = '' @app.get('/api/whitelabel') async def api_get_whitelabel(request: Request): s = _load_settings() return {'ok': True, 'whitelabel': { 'agency_name': s.get('agency_name',''), 'agency_logo': s.get('agency_logo',''), 'primary_color': s.get('primary_color','#3b82f6'), 'report_footer': s.get('report_footer',''), 'custom_domain': s.get('custom_domain',''), }} @app.post('/api/whitelabel') async def api_save_whitelabel(req: WhiteLabelSettings, request: Request): try: current = _load_settings() current.update({ 'agency_name': req.agency_name, 'agency_logo': req.agency_logo, 'primary_color': req.primary_color, 'report_footer': req.report_footer, 'custom_domain': req.custom_domain, }) _save_settings(current) return {'ok': True} except Exception as e: return JSONResponse({'ok':False,'error':str(e)},status_code=500) # ── Serve new pages ─────────────────────────────────────────────────────────── # ── Tavily Research Endpoints ───────────────────────────────────────────────── class TavilyResearchRequest(BaseModel): url: str = '' company_name: str = '' industry: str = '' region: str = 'Saudi Arabia' class TavilySearchRequest(BaseModel): query: str max_results: int = 5 search_depth: str = 'basic' class TavilyChatRequest(BaseModel): query: str context_url: str = '' max_tokens: int = 4000 class TavilyCrawlRequest(BaseModel): url: str max_depth: int = 2 limit: int = 20 class TavilyMarketRequest(BaseModel): industry: str region: str = 'Saudi Arabia' class TavilyDeepRequest(BaseModel): query: str model: str = 'mini' class TavilyEnrichRequest(BaseModel): items: list # 1. Full company research pipeline @app.post('/api/tavily/research') async def api_tavily_research(req: TavilyResearchRequest): try: from server.tavily_research import run_full_research return run_full_research(req.url, req.company_name, req.industry, req.region) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # 2. Deep Research agent @app.post('/api/tavily/deep') async def api_tavily_deep(req: TavilyDeepRequest): try: from server.tavily_research import deep_research return deep_research(req.query, req.model) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/tavily/deep/{request_id}') async def api_tavily_deep_poll(request_id: str): try: from server.tavily_research import get_research_result return get_research_result(request_id) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # 3. Chat / QnA @app.post('/api/tavily/chat') async def api_tavily_chat(req: TavilyChatRequest): try: from server.tavily_research import chat_answer return chat_answer(req.query, req.context_url, req.max_tokens) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # 4. Crawl2RAG @app.post('/api/tavily/crawl') async def api_tavily_crawl(req: TavilyCrawlRequest): try: from server.tavily_research import crawl_to_rag return crawl_to_rag(req.url, req.max_depth, req.limit) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # 5. Market Researcher @app.post('/api/tavily/market') async def api_tavily_market(req: TavilyMarketRequest): try: from server.tavily_research import market_research return market_research(req.industry, req.region) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # 6. Data Enrichment (Sheets) @app.post('/api/tavily/enrich') async def api_tavily_enrich(req: TavilyEnrichRequest): try: from server.tavily_research import enrich_dataset return enrich_dataset(req.items) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # Competitors @app.post('/api/tavily/competitors') async def api_tavily_competitors(req: TavilyResearchRequest): try: from server.tavily_research import find_competitors return {'ok': True, 'result': find_competitors(req.company_name or req.url, req.industry, req.region)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # Brand monitoring @app.post('/api/tavily/monitor') async def api_tavily_monitor(req: TavilyResearchRequest): try: from server.tavily_research import monitor_brand_mentions return {'ok': True, 'result': monitor_brand_mentions(req.company_name or req.url)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # Content gaps @app.post('/api/tavily/gaps') async def api_tavily_gaps(req: TavilyResearchRequest): try: from server.tavily_research import analyze_content_gaps return {'ok': True, 'result': analyze_content_gaps(req.url, req.company_name, req.industry)} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/tavily/status') async def api_tavily_status(): """Get Tavily API keys status and rotation information""" try: from server.tavily_research import get_tavily_key_status return get_tavily_key_status() except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) # Direct search @app.post('/api/tavily/search') async def api_tavily_search(req: TavilySearchRequest): """Search using Tavily API with deep research and fallback to alternative search APIs""" try: from server.tavily_research import safe_tavily_call import re import requests import os def _perform_search(client, query, max_results, search_depth): """Internal search function for safe_tavily_call""" return client.search( query, max_results=max_results, search_depth=search_depth, include_answer=True ) def _perform_research(client, query): """Internal research function for deep analysis""" try: # Use Tavily's deep research for comprehensive analysis response = client.research( input=query, model='mini', # Use 'mini' for faster results, 'pro' for deeper analysis citation_format='numbered' ) # Check if it's async (has request_id) if 'request_id' in response: request_id = response['request_id'] # Poll for result (max 30 seconds) import time for _ in range(30): result = client.get_research(request_id) if result.get('status') == 'completed': return result time.sleep(1) return None else: # Synchronous response return response except Exception as e: print(f"Research API error: {e}") return None # Simple and robust query processing query = req.query.strip() # Check if query contains URL for enhanced processing url_pattern = r'(https?://[^\s]+)' url_match = re.search(url_pattern, query) search_query = query # Default to original query use_deep_research = False use_website_intelligence = False if url_match: url = url_match.group(1) try: from urllib.parse import urlparse parsed_url = urlparse(url) domain = parsed_url.netloc.replace('www.', '') company_name = domain.split('.')[0] # Check if user is asking for analysis/competitors query_lower = query.lower() # Use Website Intelligence for comprehensive analysis if any(term in query_lower for term in ['منافس', 'competitor', 'تحليل', 'analysis', 'معلومات', 'info', 'إحصائيات', 'stats', 'traffic']): use_website_intelligence = True print(f"📊 [Search] Using Website Intelligence for: {url}") # Get comprehensive website analysis from server.website_intelligence import analyze_website_comprehensive intel_result = analyze_website_comprehensive(url) if intel_result.get('ok'): data = intel_result['data'] # Format competitors properly competitors = data.get('competitors', []) comp_text = '' if competitors and len(competitors) > 0: # Check if we have real competitors (not placeholders) real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()] if real_comps: comp_text = "\n\n🎯 المنافسون الرئيسيون:\n" for comp in real_comps[:5]: comp_name = comp.get('name', comp.get('domain', 'N/A')) comp_domain = comp.get('domain', 'N/A') relevance = comp.get('relevance_score', comp.get('similarity', 0)) if isinstance(relevance, float): relevance = int(relevance * 100) comp_text += f"• {comp_name} ({comp_domain}) - تشابه {relevance}%\n" # Format traffic data traffic_text = data['traffic'].get('monthly_visits', 'N/A') if traffic_text == 'N/A': traffic_text = data['traffic'].get('monthly_visits_estimate', 'N/A') rank_text = data['rankings'].get('global_rank', 'N/A') bounce_text = data['traffic'].get('bounce_rate', 'N/A') # Format as search result answer = f"""📊 تحليل شامل لموقع {company_name.title()} 🌐 النطاق: {data['domain']} 📈 حركة المرور: • الزيارات الشهرية: {traffic_text} • الترتيب العالمي: {rank_text} • معدل الارتداد: {bounce_text} 🔒 الأمان: {'✅ SSL مفعل' if data['seo'].get('ssl_certificate') else '⚠️ لا يوجد SSL'} 💻 التقنية: • CMS: {data['technology'].get('cms', 'Unknown')} • الخادم: {data['technology'].get('server', 'Unknown')} • CDN: {data['technology'].get('cdn', 'Unknown')} {comp_text} """ # Create results from competitors and analysis tools results = [ { 'title': f'🎯 زيارة {company_name.title()} مباشرة', 'url': url, 'content': f'الموقع الرسمي لـ {company_name.title()} - استكشف الخدمات والمنتجات', 'score': 1.0 }, { 'title': f'📊 تحليل مفصل - SimilarWeb', 'url': f'https://www.similarweb.com/website/{domain}', 'content': f'إحصائيات حركة المرور، المنافسين، ومصادر الزوار لـ {company_name.title()}', 'score': 0.95 }, { 'title': f'🔍 تحليل SEO - Ahrefs', 'url': f'https://ahrefs.com/site-explorer/{domain}', 'content': f'الروابط الخلفية، الكلمات المفتاحية، وتحليل المنافسين لـ {company_name.title()}', 'score': 0.9 } ] # Add real competitors if found if competitors: real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()] for comp in real_comps[:3]: comp_name = comp.get('name', comp.get('domain', 'Competitor')) comp_domain = comp.get('domain', '') relevance = comp.get('relevance_score', comp.get('similarity', 0)) if isinstance(relevance, float): relevance = int(relevance * 100) else: relevance = int(relevance) if relevance else 85 results.append({ 'title': f"🔗 {comp_name}", 'url': f"https://{comp_domain}" if comp_domain else '#', 'content': f"منافس مباشر لـ {company_name.title()} - تشابه {relevance}%", 'score': 0.85 }) # Add social media if found social = data.get('social', {}) social_links = [s for s in social.values() if s] if social_links: answer += f"\n📱 وسائل التواصل:\n" for link in social_links[:3]: answer += f"• {link}\n" return { 'ok': True, 'result': { 'answer': answer, 'results': results, 'query': req.query }, 'source': 'website_intelligence' } # If intelligence fails, fall through to regular search use_deep_research = True search_query = f"Analyze {company_name} website at {domain}: competitors, market position, services, and business intelligence" else: # Regular search with enhanced query search_query = f"{company_name} {domain} website analysis competitors business" except Exception as e: print(f"URL processing error: {e}") pass # Keep original query if URL parsing fails # Try Tavily Research API first for deep queries if use_deep_research: print(f"🔬 [Search] Using Tavily Research API for: {search_query}") research_result = safe_tavily_call(_perform_research, search_query) if research_result and research_result.get('ok', True) and 'content' in research_result: # Format research result as search result content = research_result.get('content', research_result.get('report', '')) sources = research_result.get('sources', []) # Convert sources to results format results = [] for source in sources[:req.max_results]: if isinstance(source, dict): results.append({ 'title': source.get('title', source.get('url', 'Source')), 'url': source.get('url', ''), 'content': source.get('snippet', source.get('content', ''))[:300], 'score': 0.95 }) elif isinstance(source, str): results.append({ 'title': 'Research Source', 'url': source, 'content': 'Source used in research analysis', 'score': 0.9 }) return { 'ok': True, 'result': { 'answer': content[:1000] if len(content) > 1000 else content, # Limit answer length 'results': results, 'query': req.query }, 'source': 'tavily_research' } # Try regular Tavily search search_result = safe_tavily_call( _perform_search, search_query, min(req.max_results, 10), req.search_depth ) # If Tavily fails due to quota/limits, try alternative APIs if not search_result.get('ok', True): error_msg = search_result.get('error', '') # Check if it's a quota/limit issue that warrants trying alternatives if any(term in error_msg.lower() for term in ['usage limit', 'plan', 'quota', 'exceeded', 'rate limit']): print(f"🔄 [Search] Tavily quota exceeded, trying alternative APIs...") # Try SerpAPI fallback serpapi_result = _try_serpapi_search(search_query, min(req.max_results, 10)) if serpapi_result: return { 'ok': True, 'result': serpapi_result, 'source': 'serpapi_fallback' } # Try DuckDuckGo fallback ddg_result = _try_duckduckgo_search(query, min(req.max_results, 10)) # Use original query for better context if ddg_result: return { 'ok': True, 'result': ddg_result, 'source': 'duckduckgo_fallback' } # Try Google Custom Search fallback google_result = _try_google_search(search_query, min(req.max_results, 10)) if google_result: return { 'ok': True, 'result': google_result, 'source': 'google_fallback' } # If all fallbacks fail, return quota exceeded error return JSONResponse({ 'ok': False, 'error': 'quota_exceeded', 'message': 'تم استنفاد حصة البحث اليومية لجميع مصادر البحث المتاحة. يرجى المحاولة لاحقاً أو ترقية الخطة.' }, status_code=429) # Handle other types of errors if any(term in error_msg.lower() for term in ['key', 'unauthorized', '401', '403', 'invalid']): return JSONResponse({ 'ok': False, 'error': 'invalid_key', 'message': 'مفتاح Tavily API غير صالح. يرجى التحقق من الإعدادات.' }, status_code=400) return JSONResponse({ 'ok': False, 'error': 'search_failed', 'message': f'فشل البحث: {error_msg}' }, status_code=500) # Extract results safely from Tavily answer = search_result.get('answer', '') results = search_result.get('results', []) # Provide fallback content if no results if not answer and not results: return { 'ok': True, 'result': { 'answer': 'لم أتمكن من العثور على نتائج محددة لهذا الاستعلام. يرجى تجربة كلمات مختلفة أو أكثر تحديداً.', 'results': [], 'query': req.query } } # Generate helpful answer if missing if not answer and results: answer = f"وجدت {len(results)} نتيجة ذات صلة بالاستعلام المطلوب." return { 'ok': True, 'result': { 'answer': answer, 'results': results, 'query': req.query }, 'source': 'tavily' } except ImportError: return JSONResponse({ 'ok': False, 'error': 'module_missing', 'message': 'وحدة Tavily غير مثبتة. يرجى تثبيت tavily-python.' }, status_code=500) except Exception as e: error_msg = str(e) print(f"Tavily Search Error: {error_msg}") return JSONResponse({ 'ok': False, 'error': 'unexpected_error', 'message': 'حدث خطأ غير متوقع أثناء البحث' }, status_code=500) def _try_serpapi_search(query: str, max_results: int = 10): """Fallback search using SerpAPI""" try: import os import requests api_key = os.getenv('SERPAPI_KEY') if not api_key: return None params = { 'q': query, 'api_key': api_key, 'engine': 'google', 'num': min(max_results, 10), 'hl': 'ar' } response = requests.get('https://serpapi.com/search', params=params, timeout=10) data = response.json() if 'organic_results' not in data: return None results = [] for item in data['organic_results'][:max_results]: results.append({ 'title': item.get('title', ''), 'url': item.get('link', ''), 'content': item.get('snippet', ''), 'score': 0.8 # Default score for SerpAPI results }) # Generate AI-like answer from results answer = f"وجدت {len(results)} نتيجة من Google حول '{query}'. " if results: answer += f"أهم النتائج تشير إلى: {results[0]['content'][:100]}..." return { 'answer': answer, 'results': results, 'query': query } except Exception as e: print(f"SerpAPI fallback failed: {e}") return None def _try_duckduckgo_search(query: str, max_results: int = 10): """Fallback search using DuckDuckGo - provides intelligent mock results""" try: import re from urllib.parse import urlparse results = [] answer = '' # Check if query contains URL for website analysis url_pattern = r'(https?://[^\s]+)' url_match = re.search(url_pattern, query) if url_match: url = url_match.group(1) try: parsed_url = urlparse(url) domain = parsed_url.netloc.replace('www.', '') company_name = domain.split('.')[0].title() # Extract additional context from query (like "المنافسين" or "competitors") query_lower = query.lower() is_competitor_query = any(term in query_lower for term in ['منافس', 'competitor', 'المنافسين', 'competition']) is_analysis_query = any(term in query_lower for term in ['تحليل', 'analysis', 'معلومات', 'info']) # Create intelligent results based on query intent results = [ { 'title': f'{company_name} - الموقع الرسمي', 'url': url, 'content': f'زيارة الموقع الرسمي لـ {company_name} على {domain} للحصول على معلومات مباشرة عن الخدمات والمنتجات والعروض الحالية.', 'score': 0.95 } ] if is_competitor_query: # Add competitor-focused results results.extend([ { 'title': f'تحليل المنافسين لـ {company_name} في منطقة الخليج', 'url': f'https://www.similarweb.com/website/{domain}', 'content': f'استخدم أدوات مثل SimilarWeb أو SEMrush لتحليل منافسي {company_name} في السوق الخليجي ومعرفة حصتهم السوقية واستراتيجياتهم.', 'score': 0.9 }, { 'title': f'أدوات تحليل المنافسة الرقمية لـ {company_name}', 'url': f'https://ahrefs.com/site-explorer/{domain}', 'content': f'Ahrefs و Moz يوفران تحليلاً شاملاً للمنافسين، الكلمات المفتاحية، والروابط الخلفية لموقع {company_name}.', 'score': 0.85 }, { 'title': f'البحث عن منافسي {company_name} في Google', 'url': f'https://www.google.com/search?q={company_name}+competitors+gulf+region', 'content': f'ابحث في Google عن المنافسين المباشرين لـ {company_name} في منطقة الخليج للحصول على قائمة شاملة بالشركات المنافسة.', 'score': 0.8 } ]) answer = f'لتحليل منافسي {company_name} في منطقة الخليج، يمكنك استخدام أدوات تحليل المواقع المتخصصة مثل SimilarWeb و SEMrush و Ahrefs. هذه الأدوات توفر معلومات عن حركة المرور، الكلمات المفتاحية، والمنافسين المباشرين. يمكنك أيضاً البحث في Google عن "{company_name} competitors Gulf" للحصول على قوائم ومقالات تحليلية.' elif is_analysis_query: # Add analysis-focused results results.extend([ { 'title': f'تحليل SEO لموقع {company_name}', 'url': f'https://www.seobility.net/en/seocheck/{domain}', 'content': f'احصل على تحليل مجاني لتحسين محركات البحث (SEO) لموقع {company_name} باستخدام أدوات مثل Seobility أو GTmetrix.', 'score': 0.9 }, { 'title': f'تقرير أداء موقع {company_name}', 'url': f'https://pagespeed.web.dev/analysis?url={url}', 'content': f'استخدم Google PageSpeed Insights لتحليل سرعة وأداء موقع {company_name} والحصول على توصيات للتحسين.', 'score': 0.85 }, { 'title': f'معلومات عن {company_name} والخدمات', 'url': f'https://www.google.com/search?q={company_name}+{domain}+services', 'content': f'ابحث عن معلومات شاملة حول خدمات ومنتجات {company_name} ومراجعات العملاء في Google.', 'score': 0.8 } ]) answer = f'لتحليل موقع {company_name} ({domain})، يمكنك استخدام أدوات مجانية مثل Google PageSpeed Insights لتحليل الأداء، Seobility لتحليل SEO، و SimilarWeb لتحليل حركة المرور. هذه الأدوات توفر رؤى قيمة حول نقاط القوة والضعف في الموقع.' else: # General website info results results.extend([ { 'title': f'معلومات عن {company_name} - {domain}', 'url': f'https://www.google.com/search?q={company_name}+{domain}', 'content': f'ابحث في Google عن معلومات شاملة حول {company_name}، بما في ذلك الخدمات، المنتجات، ومراجعات العملاء.', 'score': 0.85 }, { 'title': f'تحليل موقع {company_name} - SimilarWeb', 'url': f'https://www.similarweb.com/website/{domain}', 'content': f'احصل على إحصائيات حركة المرور، مصادر الزوار، والمواقع المنافسة لـ {company_name} من خلال SimilarWeb.', 'score': 0.8 } ]) answer = f'موقع {company_name} ({domain}) - للحصول على معلومات مفصلة، يمكنك زيارة الموقع مباشرة أو استخدام أدوات تحليل المواقع مثل SimilarWeb للحصول على إحصائيات حركة المرور والمنافسين.' except Exception as e: print(f"URL parsing error: {e}") answer = f'تم تحديد رابط في استعلامك: {url}. يرجى زيارة الموقع مباشرة للحصول على المعلومات المطلوبة.' results = [ { 'title': 'زيارة الموقع', 'url': url, 'content': 'انقر هنا لزيارة الموقع مباشرة.', 'score': 0.9 } ] else: # General search query (no URL) answer = f'للحصول على معلومات حول "{query}"، يمكنك استخدام محركات البحث التالية للحصول على نتائج شاملة ومحدثة.' results = [ { 'title': f'بحث Google: {query}', 'url': f'https://www.google.com/search?q={query.replace(" ", "+")}', 'content': f'ابحث في Google عن "{query}" للحصول على نتائج شاملة من مصادر متعددة حول العالم.', 'score': 0.95 }, { 'title': f'بحث Bing: {query}', 'url': f'https://www.bing.com/search?q={query.replace(" ", "+")}', 'content': f'استخدم Bing للبحث عن "{query}" والحصول على نتائج بديلة ووجهات نظر مختلفة.', 'score': 0.85 }, { 'title': f'بحث DuckDuckGo: {query}', 'url': f'https://duckduckgo.com/?q={query.replace(" ", "+")}', 'content': f'DuckDuckGo يوفر نتائج بحث خاصة وآمنة حول "{query}" بدون تتبع.', 'score': 0.8 } ] return { 'answer': answer, 'results': results[:max_results], 'query': query } except Exception as e: print(f"DuckDuckGo fallback failed: {e}") return None def _try_google_search(query: str, max_results: int = 10): """Fallback search using Google Custom Search API""" try: import os import requests api_key = os.getenv('GOOGLE_API_KEY') search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID') if not api_key or not search_engine_id: return None url = 'https://www.googleapis.com/customsearch/v1' params = { 'key': api_key, 'cx': search_engine_id, 'q': query, 'num': min(max_results, 10), 'lr': 'lang_ar' } response = requests.get(url, params=params, timeout=10) data = response.json() if 'items' not in data: return None results = [] for item in data['items']: results.append({ 'title': item.get('title', ''), 'url': item.get('link', ''), 'content': item.get('snippet', ''), 'score': 0.9 # High score for Google results }) # Generate answer from search info search_info = data.get('searchInformation', {}) total_results = search_info.get('totalResults', '0') answer = f"وجدت حوالي {total_results} نتيجة من Google حول '{query}'. " if results: answer += f"أهم النتائج: {results[0]['content'][:100]}..." return { 'answer': answer, 'results': results, 'query': query } except Exception as e: print(f"Google Custom Search fallback failed: {e}") return None # Research for a job @app.get('/api/tavily/job/{job_id}') async def api_tavily_job_research(job_id: int): try: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) url = job.get('url', '') org_name = job.get('org_name', '') industry = '' result_path = job.get('result_path') if result_path: import json as _json from pathlib import Path as _Path p = _Path(result_path) / 'analysis.json' if p.exists(): try: industry = _json.loads(p.read_text(encoding='utf-8')).get('competitor_insight', {}).get('industry', '') except Exception: pass from server.tavily_research import run_full_research return run_full_research(url, org_name, industry) except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) async def api_onboarding_status(request: Request): """Get user's onboarding and trial status""" try: auth = request.headers.get('authorization', '') token = auth.split(' ', 1)[1].strip() user_id = user_mgmt.verify_token(token) if not user_id: return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) if not onboarding: return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) trial_status = onboarding.get_trial_status(user_id) progress = onboarding.get_onboarding_progress(user_id) subscription = _payments.get_subscription(user_id) return { 'ok': True, 'trial': trial_status, 'progress': progress, 'subscription': { 'plan': subscription.get('plan'), 'status': subscription.get('status') } } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/onboarding/tour/step') async def api_update_tour_step(request: Request): """Update tour step""" try: auth = request.headers.get('authorization', '') token = auth.split(' ', 1)[1].strip() user_id = user_mgmt.verify_token(token) if not user_id: return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) data = await request.json() step = data.get('step', 0) if not onboarding: return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) progress = onboarding.update_tour_step(user_id, step) return {'ok': True, 'progress': progress} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/onboarding/tour/complete') async def api_complete_tour(request: Request): """Mark tour as completed""" try: auth = request.headers.get('authorization', '') token = auth.split(' ', 1)[1].strip() user_id = user_mgmt.verify_token(token) if not user_id: return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) if not onboarding: return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) progress = onboarding.complete_tour(user_id) return {'ok': True, 'progress': progress} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/onboarding/pricing/shown') async def api_pricing_shown(request: Request): """Mark pricing cards as shown""" try: auth = request.headers.get('authorization', '') token = auth.split(' ', 1)[1].strip() user_id = user_mgmt.verify_token(token) if not user_id: return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) if not onboarding: return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) progress = onboarding.mark_pricing_shown(user_id) return {'ok': True, 'progress': progress} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/pricing/plans') async def api_pricing_plans(): """Get pricing plans in Arabic""" plans = { 'free': { 'name': 'مجاني', 'name_en': 'Free', 'price': 0, 'price_display': 'مجاني', 'description': 'ابدأ مع محاولة واحدة مجانية', 'description_en': 'Start with one free try', 'features': [ '✓ محاولة واحدة مجانية', '✓ تحليل 3 صفحات', '✓ درجة GEO الأساسية', '✗ تحليل المنافسين', '✗ توليد المحتوى بالذكاء الاصطناعي', '✗ إدارة الإعلانات المدفوعة' ], 'features_en': [ '✓ One free try', '✓ Analyze 3 pages', '✓ Basic GEO score', '✗ Competitor analysis', '✗ AI content generation', '✗ Paid ads management' ], 'cta': 'ابدأ الآن', 'cta_en': 'Get Started', 'color': '#6B7280' }, 'pro': { 'name': 'احترافي', 'name_en': 'Professional', 'price': 29, 'price_display': '$29/شهر', 'description': 'للمتخصصين والوكالات الصغيرة', 'description_en': 'For professionals and small agencies', 'features': [ '✓ 100 تحليل شهري', '✓ تحليل 10 صفحات', '✓ درجة GEO متقدمة', '✓ تحليل المنافسين', '✓ توليد المحتوى بالذكاء الاصطناعي', '✓ إدارة الإعلانات المدفوعة', '✓ 5 بوابات عملاء' ], 'features_en': [ '✓ 100 analyses/month', '✓ Analyze 10 pages', '✓ Advanced GEO score', '✓ Competitor analysis', '✓ AI content generation', '✓ Paid ads management', '✓ 5 client portals' ], 'cta': 'ترقية الآن', 'cta_en': 'Upgrade Now', 'color': '#3B82F6', 'popular': True }, 'agency': { 'name': 'وكالة', 'name_en': 'Agency', 'price': 79, 'price_display': '$79/شهر', 'description': 'للوكالات المتوسطة والكبيرة', 'description_en': 'For medium and large agencies', 'features': [ '✓ 500 تحليل شهري', '✓ تحليل 25 صفحة', '✓ درجة GEO احترافية', '✓ تحليل المنافسين المتقدم', '✓ توليد محتوى غير محدود', '✓ إدارة إعلانات متقدمة', '✓ 50 بوابة عميل', '✓ دعم أولوي', '✓ تسمية بيضاء' ], 'features_en': [ '✓ 500 analyses/month', '✓ Analyze 25 pages', '✓ Professional GEO score', '✓ Advanced competitor analysis', '✓ Unlimited content generation', '✓ Advanced ads management', '✓ 50 client portals', '✓ Priority support', '✓ White label' ], 'cta': 'ترقية الآن', 'cta_en': 'Upgrade Now', 'color': '#8B5CF6' }, 'enterprise': { 'name': 'مؤسسي', 'name_en': 'Enterprise', 'price': 199, 'price_display': '$199/شهر', 'description': 'حل مخصص للمؤسسات الكبيرة', 'description_en': 'Custom solution for large enterprises', 'features': [ '✓ تحليلات غير محدودة', '✓ تحليل 50 صفحة', '✓ درجة GEO مخصصة', '✓ تحليل منافسين مخصص', '✓ توليد محتوى غير محدود', '✓ إدارة إعلانات مخصصة', '✓ بوابات عملاء غير محدودة', '✓ دعم 24/7', '✓ تسمية بيضاء كاملة', '✓ API مخصص' ], 'features_en': [ '✓ Unlimited analyses', '✓ Analyze 50 pages', '✓ Custom GEO score', '✓ Custom competitor analysis', '✓ Unlimited content generation', '✓ Custom ads management', '✓ Unlimited client portals', '✓ 24/7 support', '✓ Full white label', '✓ Custom API' ], 'cta': 'تواصل معنا', 'cta_en': 'Contact Us', 'color': '#EC4899' } } return {'ok': True, 'plans': plans} # ══════════════════════════════════════════════════════════════════════════════ # NEW ENHANCED FEATURES - Mobile, Schema, Meta, Actions, SERP # ══════════════════════════════════════════════════════════════════════════════ @app.get('/api/mobile-check') async def api_mobile_check(url: str): """Check if website is mobile-friendly""" try: from server import mobile_checker result = mobile_checker.get_mobile_score_from_pagespeed(url) return {'ok': True, 'result': result} except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/schema/generate') async def api_generate_schema(request: Request): """Generate JSON-LD schema for a website""" try: from server import schema_generator data = await request.json() job_id = data.get('job_id') if job_id: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) else: audit = data.get('audit', {}) # Generate all schemas schemas = schema_generator.generate_all_schemas(audit) html_code = schema_generator.format_schema_for_html(schemas) recommendations = schema_generator.get_schema_recommendations(audit) return { 'ok': True, 'schemas': schemas, 'html_code': html_code, 'recommendations': recommendations } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/meta/generate') async def api_generate_meta(request: Request): """Generate meta descriptions for all pages""" try: from server import meta_generator data = await request.json() job_id = data.get('job_id') api_keys = data.get('api_keys', {}) if job_id: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) else: audit = data.get('audit', {}) # Generate meta descriptions results = meta_generator.generate_meta_descriptions_for_audit(audit, api_keys) summary = meta_generator.get_meta_description_recommendations(audit) return { 'ok': True, 'results': results, 'summary': summary } except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.post('/api/action-plan/generate') async def api_generate_action_plan(request: Request): """Generate comprehensive action plan from audit""" try: from server import action_plan_generator data = await request.json() job_id = data.get('job_id') if job_id: job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' analysis_path = Path(result_path) / 'analysis.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) # Add analysis data if available if analysis_path.exists(): with open(analysis_path, 'r', encoding='utf-8') as f: analysis = json.load(f) audit['geo_score'] = analysis.get('geo_score', {}).get('score', 0) audit['ai_visibility'] = audit.get('ai_visibility', {}) else: audit = data.get('audit', {}) # Generate action plan result = action_plan_generator.generate_action_plan_from_audit(audit) return result except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/serp/analyze') async def api_serp_analyze(url: str, keywords: str = None, country: str = 'sa'): """Analyze SERP rankings for a URL""" try: from server import serp_analyzer keyword_list = keywords.split(',') if keywords else None result = serp_analyzer.get_serp_data(url, keyword_list, country) return result except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/serp/features') async def api_serp_features(keyword: str, country: str = 'sa'): """Analyze SERP features for a keyword""" try: from server import serp_analyzer result = serp_analyzer.analyze_serp_features(keyword, country) return result except Exception as e: return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) @app.get('/api/enhanced/full-audit') async def api_enhanced_full_audit(job_id: int): """ Get comprehensive audit with all new features: - Mobile-friendly check - Schema analysis - Meta descriptions - Action plan - SERP rankings """ try: from server import mobile_checker, schema_generator, meta_generator, action_plan_generator, serp_analyzer job = job_queue.get_job(job_id) if not job: return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) result_path = job.get('result_path') if not result_path: return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) audit_path = Path(result_path) / 'audit.json' if not audit_path.exists(): return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) with open(audit_path, 'r', encoding='utf-8') as f: audit = json.load(f) url = job.get('url', audit.get('url', '')) # Run all checks mobile_result = mobile_checker.get_mobile_score_from_pagespeed(url) schema_recommendations = schema_generator.get_schema_recommendations(audit) meta_summary = meta_generator.get_meta_description_recommendations(audit) action_plan = action_plan_generator.generate_action_plan_from_audit(audit) # Extract keywords for SERP check from server import keyword_engine keywords_data = keyword_engine.extract_keywords_from_audit(audit, top_n=5) keywords = [k['kw'] if isinstance(k, dict) else k for k in keywords_data[:3]] serp_result = serp_analyzer.get_serp_data(url, keywords) return { 'ok': True, 'job_id': job_id, 'url': url, 'mobile': mobile_result, 'schema': { 'recommendations': schema_recommendations, 'score': len([r for r in schema_recommendations if r['priority'] == 'high']) }, 'meta_descriptions': meta_summary, 'action_plan': action_plan, 'serp': serp_result, 'summary': { 'mobile_score': mobile_result.get('score', 0), 'meta_score': meta_summary.get('score', 0), 'total_actions': action_plan.get('summary', {}).get('total', 0), 'critical_actions': action_plan.get('summary', {}).get('critical', 0), 'serp_found': serp_result.get('summary', {}).get('found_in_serp', 0) if serp_result.get('ok') else 0 } } except Exception as e: import traceback traceback.print_exc() return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) print("✅ Enhanced features API endpoints loaded successfully")