# app.py - Hugging Face Spaces entry point import json import threading import uuid from queue import Empty, Queue from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import uvicorn import torch import os from models import ( AnalysisRequest, AnalysisResponse, SemanticAnalyzeRequest, SemanticAnalyzeResponse, SemanticSearchRequest, SemanticSearchResponse, UrlFetchRequest, UrlFetchResponse, UserAgentsResponse, OptimizerRequest, OptimizerResponse, OptimizerCancelRequest, ) import logic import nlp_processor import semantic_graph import highlighter import summarizer import search import url_fetcher import optimizer app = FastAPI(title="SEO AI Editor MVP") _static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") if os.path.isdir(_static_dir): app.mount("/static", StaticFiles(directory=_static_dir), name="static") _OPTIMIZER_JOBS_LOCK = threading.Lock() _OPTIMIZER_CANCEL_EVENTS: dict = {} # Подключаем папку с шаблонами templates = Jinja2Templates(directory="templates") @app.on_event("startup") async def startup_event(): device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🚀 Application starting. ML Device: {device}") # Важный фикс для HF cpu-basic: # не загружаем все spaCy модели на старте, чтобы не ловить OOM/Restarting. # Модели подгружаются лениво в logic.get_doc() по факту запроса. @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/analyze", response_model=AnalysisResponse) async def analyze_text(request: AnalysisRequest): # 1. Считаем слова (ДВОЙНОЙ подсчёт: total + significant) wc_target = logic.count_words(request.target_text, request.language) wc_competitors_list = [logic.count_words(t, request.language) for t in request.competitors] if wc_competitors_list: avg_total = sum(c["total"] for c in wc_competitors_list) / len(wc_competitors_list) avg_sig = sum(c["significant"] for c in wc_competitors_list) / len(wc_competitors_list) else: avg_total = 0 avg_sig = 0 word_counts_data = { "target": wc_target, "competitors": wc_competitors_list, "avg": { "total": round(avg_total), "significant": round(avg_sig) } } # 2. N-gram stats ngram_stats_result = logic.calculate_ngram_stats( request.target_text, request.competitors, request.language ) # 3. BM25 key_phrases, key_words_unigrams = logic.parse_keywords(request.keywords, request.language) bm25_recs = logic.calculate_bm25_recommendations( request.target_text, request.competitors, request.keywords, request.language ) # 4. BERT bert_results = logic.perform_bert_analysis( request.target_text, request.competitors, key_phrases, request.language ) # 5. Title Analysis title_data = {} if request.target_title.strip(): title_data = logic.analyze_title( request.target_title, request.competitor_titles, request.keywords, request.language ) return AnalysisResponse( ngram_stats=ngram_stats_result, bm25_recommendations=bm25_recs, bert_analysis=bert_results, word_counts=word_counts_data, title_analysis=title_data ) @app.post("/api/v1/semantic/analyze", response_model=SemanticAnalyzeResponse) async def semantic_analyze(request: SemanticAnalyzeRequest): def _build_doc_semantic(text: str, doc_name: str, doc_id: int): sentences_data = nlp_processor.preprocess_text(text, request.language) graph, word_weights = semantic_graph.build_semantic_graph( sentences_data, lang=request.language, ) graph_data = semantic_graph.get_graph_data_for_frontend(graph) markup_text = highlighter.generate_markup_for_frontend( sentences_data, word_weights, threshold=request.threshold ) summary_data = summarizer.generate_summary( sentences_data, word_weights, compression_ratio=request.compression_ratio ) top_keywords = semantic_graph.get_top_keywords(word_weights, top_n=20) return { "id": doc_id, "name": doc_name, "text": text, "graph": graph_data, "markup_text": markup_text, "summary": summary_data, "top_keywords": top_keywords, "word_weights": word_weights, "stats": { "nodes": len(graph_data.get("nodes", [])), "links": len(graph_data.get("links", [])), "summary_sentences": len(summary_data), }, } target_doc = _build_doc_semantic(request.text, "Мой текст", 0) competitor_docs = [] valid_competitors = [c for c in request.competitors if c.strip()] for idx, comp_text in enumerate(valid_competitors): competitor_docs.append(_build_doc_semantic(comp_text, f"Конкурент #{idx + 1}", idx + 1)) if competitor_docs: avg_nodes = round(sum(c["stats"]["nodes"] for c in competitor_docs) / len(competitor_docs), 2) avg_links = round(sum(c["stats"]["links"] for c in competitor_docs) / len(competitor_docs), 2) else: avg_nodes = 0 avg_links = 0 # Таблица сравнения "мощных терминов" (слова + фразы). num_competitors = len(competitor_docs) target_weights = target_doc.get("word_weights", {}) all_terms = set(target_weights.keys()) for comp in competitor_docs: all_terms.update(comp.get("word_weights", {}).keys()) term_power_table = [] for term in all_terms: target_weight = int(target_weights.get(term, 0)) comp_weights = [int(comp.get("word_weights", {}).get(term, 0)) for comp in competitor_docs] comp_avg_weight = round(sum(comp_weights) / max(num_competitors, 1), 2) comp_occurrence = sum(1 for w in comp_weights if w > 0) term_power_table.append( { "term": term, "term_type": "phrase" if " " in term else "word", "target_weight": target_weight, "competitor_avg_weight": comp_avg_weight, "competitor_weights": comp_weights, "comp_occurrence": comp_occurrence, "comp_total": num_competitors, } ) term_power_table.sort( key=lambda x: ( max([x["target_weight"]] + x["competitor_weights"]), x["comp_occurrence"], x["term"], ), reverse=True, ) comparison = { "target_nodes": target_doc["stats"]["nodes"], "target_links": target_doc["stats"]["links"], "avg_comp_nodes": avg_nodes, "avg_comp_links": avg_links, "num_competitors": num_competitors, "term_power_table": term_power_table, } return SemanticAnalyzeResponse( target=target_doc, competitors=competitor_docs, comparison=comparison, ) @app.post("/api/v1/semantic/search", response_model=SemanticSearchResponse) async def semantic_search_endpoint(request: SemanticSearchRequest): sentences_data = nlp_processor.preprocess_text(request.text, request.language) graph, word_weights = semantic_graph.build_semantic_graph( sentences_data, lang=request.language, ) results = search.semantic_search( request.query_text, graph, word_weights, request.language, top_n=request.top_n, ) return SemanticSearchResponse(results=results) @app.get("/api/v1/url/user-agents", response_model=UserAgentsResponse) async def get_user_agents(): return UserAgentsResponse(user_agents=url_fetcher.get_user_agent_presets()) @app.post("/api/v1/url/fetch", response_model=UrlFetchResponse) async def fetch_url_endpoint(request: UrlFetchRequest): try: parsed = url_fetcher.fetch_url_content( url=request.url, user_agent_key=request.user_agent, timeout_seconds=request.timeout_seconds, ) return UrlFetchResponse(**parsed) except Exception as e: return UrlFetchResponse( ok=False, url=request.url or "", user_agent_key=request.user_agent or "", error=str(e), ) @app.post("/api/v1/optimizer/run", response_model=OptimizerResponse) async def run_optimizer(request: OptimizerRequest): try: result = optimizer.optimize_text(request.model_dump()) return OptimizerResponse(**result) except Exception as e: return OptimizerResponse(ok=False, error=str(e)) @app.post("/api/v1/optimizer/cancel") async def optimizer_cancel(body: OptimizerCancelRequest): with _OPTIMIZER_JOBS_LOCK: ev = _OPTIMIZER_CANCEL_EVENTS.get(body.job_id) if ev is not None: ev.set() return {"ok": True} @app.post("/api/v1/optimizer/run-stream") async def run_optimizer_stream(request: OptimizerRequest): """SSE: события прогресса + финальный JSON. Клиент ведёт локальный лог, без глобального лоадера.""" job_id = str(uuid.uuid4()) cancel_ev = threading.Event() payload = request.model_dump() q: Queue = Queue() with _OPTIMIZER_JOBS_LOCK: _OPTIMIZER_CANCEL_EVENTS[job_id] = cancel_ev def worker(): try: def progress_cb(data): q.put(("progress", data)) result = optimizer.optimize_text( payload, progress_callback=progress_cb, cancel_event=cancel_ev, ) q.put(("done", result)) except Exception as e: q.put(("error", str(e))) threading.Thread(target=worker, daemon=True).start() def gen(): try: yield f"data: {json.dumps({'event': 'job', 'job_id': job_id})}\n\n" while True: try: kind, data = q.get(timeout=0.3) except Empty: yield ": ping\n\n" continue if kind == "progress": yield f"data: {json.dumps(data)}\n\n" elif kind == "done": yield f"data: {json.dumps({'event': 'complete', 'result': data})}\n\n" break elif kind == "error": yield f"data: {json.dumps({'event': 'error', 'error': data})}\n\n" break finally: with _OPTIMIZER_JOBS_LOCK: _OPTIMIZER_CANCEL_EVENTS.pop(job_id, None) return StreamingResponse( gen(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # Hugging Face Spaces использует порт 7860 if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)