Spaces:
Running
Running
| # app.py - Hugging Face Spaces entry point | |
| import json | |
| import threading | |
| import uuid | |
| from queue import Empty, Queue | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| import uvicorn | |
| import torch | |
| import os | |
| from models import ( | |
| AnalysisRequest, | |
| AnalysisResponse, | |
| SemanticAnalyzeRequest, | |
| SemanticAnalyzeResponse, | |
| SemanticSearchRequest, | |
| SemanticSearchResponse, | |
| UrlFetchRequest, | |
| UrlFetchResponse, | |
| UserAgentsResponse, | |
| OptimizerRequest, | |
| OptimizerResponse, | |
| OptimizerCancelRequest, | |
| ) | |
| import logic | |
| import nlp_processor | |
| import semantic_graph | |
| import highlighter | |
| import summarizer | |
| import search | |
| import url_fetcher | |
| import optimizer | |
| app = FastAPI(title="SEO AI Editor MVP") | |
| _static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") | |
| if os.path.isdir(_static_dir): | |
| app.mount("/static", StaticFiles(directory=_static_dir), name="static") | |
| _OPTIMIZER_JOBS_LOCK = threading.Lock() | |
| _OPTIMIZER_CANCEL_EVENTS: dict = {} | |
| # Подключаем папку с шаблонами | |
| templates = Jinja2Templates(directory="templates") | |
| async def startup_event(): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"🚀 Application starting. ML Device: {device}") | |
| # Важный фикс для HF cpu-basic: | |
| # не загружаем все spaCy модели на старте, чтобы не ловить OOM/Restarting. | |
| # Модели подгружаются лениво в logic.get_doc() по факту запроса. | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def analyze_text(request: AnalysisRequest): | |
| # 1. Считаем слова (ДВОЙНОЙ подсчёт: total + significant) | |
| wc_target = logic.count_words(request.target_text, request.language) | |
| wc_competitors_list = [logic.count_words(t, request.language) for t in request.competitors] | |
| if wc_competitors_list: | |
| avg_total = sum(c["total"] for c in wc_competitors_list) / len(wc_competitors_list) | |
| avg_sig = sum(c["significant"] for c in wc_competitors_list) / len(wc_competitors_list) | |
| else: | |
| avg_total = 0 | |
| avg_sig = 0 | |
| word_counts_data = { | |
| "target": wc_target, | |
| "competitors": wc_competitors_list, | |
| "avg": { | |
| "total": round(avg_total), | |
| "significant": round(avg_sig) | |
| } | |
| } | |
| # 2. N-gram stats | |
| ngram_stats_result = logic.calculate_ngram_stats( | |
| request.target_text, | |
| request.competitors, | |
| request.language | |
| ) | |
| # 3. BM25 | |
| key_phrases, key_words_unigrams = logic.parse_keywords(request.keywords, request.language) | |
| bm25_recs = logic.calculate_bm25_recommendations( | |
| request.target_text, | |
| request.competitors, | |
| request.keywords, | |
| request.language | |
| ) | |
| # 4. BERT | |
| bert_results = logic.perform_bert_analysis( | |
| request.target_text, | |
| request.competitors, | |
| key_phrases, | |
| request.language | |
| ) | |
| # 5. Title Analysis | |
| title_data = {} | |
| if request.target_title.strip(): | |
| title_data = logic.analyze_title( | |
| request.target_title, | |
| request.competitor_titles, | |
| request.keywords, | |
| request.language | |
| ) | |
| return AnalysisResponse( | |
| ngram_stats=ngram_stats_result, | |
| bm25_recommendations=bm25_recs, | |
| bert_analysis=bert_results, | |
| word_counts=word_counts_data, | |
| title_analysis=title_data | |
| ) | |
| async def semantic_analyze(request: SemanticAnalyzeRequest): | |
| def _build_doc_semantic(text: str, doc_name: str, doc_id: int): | |
| sentences_data = nlp_processor.preprocess_text(text, request.language) | |
| graph, word_weights = semantic_graph.build_semantic_graph( | |
| sentences_data, | |
| lang=request.language, | |
| ) | |
| graph_data = semantic_graph.get_graph_data_for_frontend(graph) | |
| markup_text = highlighter.generate_markup_for_frontend( | |
| sentences_data, word_weights, threshold=request.threshold | |
| ) | |
| summary_data = summarizer.generate_summary( | |
| sentences_data, word_weights, compression_ratio=request.compression_ratio | |
| ) | |
| top_keywords = semantic_graph.get_top_keywords(word_weights, top_n=20) | |
| return { | |
| "id": doc_id, | |
| "name": doc_name, | |
| "text": text, | |
| "graph": graph_data, | |
| "markup_text": markup_text, | |
| "summary": summary_data, | |
| "top_keywords": top_keywords, | |
| "word_weights": word_weights, | |
| "stats": { | |
| "nodes": len(graph_data.get("nodes", [])), | |
| "links": len(graph_data.get("links", [])), | |
| "summary_sentences": len(summary_data), | |
| }, | |
| } | |
| target_doc = _build_doc_semantic(request.text, "Мой текст", 0) | |
| competitor_docs = [] | |
| valid_competitors = [c for c in request.competitors if c.strip()] | |
| for idx, comp_text in enumerate(valid_competitors): | |
| competitor_docs.append(_build_doc_semantic(comp_text, f"Конкурент #{idx + 1}", idx + 1)) | |
| if competitor_docs: | |
| avg_nodes = round(sum(c["stats"]["nodes"] for c in competitor_docs) / len(competitor_docs), 2) | |
| avg_links = round(sum(c["stats"]["links"] for c in competitor_docs) / len(competitor_docs), 2) | |
| else: | |
| avg_nodes = 0 | |
| avg_links = 0 | |
| # Таблица сравнения "мощных терминов" (слова + фразы). | |
| num_competitors = len(competitor_docs) | |
| target_weights = target_doc.get("word_weights", {}) | |
| all_terms = set(target_weights.keys()) | |
| for comp in competitor_docs: | |
| all_terms.update(comp.get("word_weights", {}).keys()) | |
| term_power_table = [] | |
| for term in all_terms: | |
| target_weight = int(target_weights.get(term, 0)) | |
| comp_weights = [int(comp.get("word_weights", {}).get(term, 0)) for comp in competitor_docs] | |
| comp_avg_weight = round(sum(comp_weights) / max(num_competitors, 1), 2) | |
| comp_occurrence = sum(1 for w in comp_weights if w > 0) | |
| term_power_table.append( | |
| { | |
| "term": term, | |
| "term_type": "phrase" if " " in term else "word", | |
| "target_weight": target_weight, | |
| "competitor_avg_weight": comp_avg_weight, | |
| "competitor_weights": comp_weights, | |
| "comp_occurrence": comp_occurrence, | |
| "comp_total": num_competitors, | |
| } | |
| ) | |
| term_power_table.sort( | |
| key=lambda x: ( | |
| max([x["target_weight"]] + x["competitor_weights"]), | |
| x["comp_occurrence"], | |
| x["term"], | |
| ), | |
| reverse=True, | |
| ) | |
| comparison = { | |
| "target_nodes": target_doc["stats"]["nodes"], | |
| "target_links": target_doc["stats"]["links"], | |
| "avg_comp_nodes": avg_nodes, | |
| "avg_comp_links": avg_links, | |
| "num_competitors": num_competitors, | |
| "term_power_table": term_power_table, | |
| } | |
| return SemanticAnalyzeResponse( | |
| target=target_doc, | |
| competitors=competitor_docs, | |
| comparison=comparison, | |
| ) | |
| async def semantic_search_endpoint(request: SemanticSearchRequest): | |
| sentences_data = nlp_processor.preprocess_text(request.text, request.language) | |
| graph, word_weights = semantic_graph.build_semantic_graph( | |
| sentences_data, | |
| lang=request.language, | |
| ) | |
| results = search.semantic_search( | |
| request.query_text, | |
| graph, | |
| word_weights, | |
| request.language, | |
| top_n=request.top_n, | |
| ) | |
| return SemanticSearchResponse(results=results) | |
| async def get_user_agents(): | |
| return UserAgentsResponse(user_agents=url_fetcher.get_user_agent_presets()) | |
| async def fetch_url_endpoint(request: UrlFetchRequest): | |
| try: | |
| parsed = url_fetcher.fetch_url_content( | |
| url=request.url, | |
| user_agent_key=request.user_agent, | |
| timeout_seconds=request.timeout_seconds, | |
| ) | |
| return UrlFetchResponse(**parsed) | |
| except Exception as e: | |
| return UrlFetchResponse( | |
| ok=False, | |
| url=request.url or "", | |
| user_agent_key=request.user_agent or "", | |
| error=str(e), | |
| ) | |
| async def run_optimizer(request: OptimizerRequest): | |
| try: | |
| result = optimizer.optimize_text(request.model_dump()) | |
| return OptimizerResponse(**result) | |
| except Exception as e: | |
| return OptimizerResponse(ok=False, error=str(e)) | |
| async def optimizer_cancel(body: OptimizerCancelRequest): | |
| with _OPTIMIZER_JOBS_LOCK: | |
| ev = _OPTIMIZER_CANCEL_EVENTS.get(body.job_id) | |
| if ev is not None: | |
| ev.set() | |
| return {"ok": True} | |
| async def run_optimizer_stream(request: OptimizerRequest): | |
| """SSE: события прогресса + финальный JSON. Клиент ведёт локальный лог, без глобального лоадера.""" | |
| job_id = str(uuid.uuid4()) | |
| cancel_ev = threading.Event() | |
| payload = request.model_dump() | |
| q: Queue = Queue() | |
| with _OPTIMIZER_JOBS_LOCK: | |
| _OPTIMIZER_CANCEL_EVENTS[job_id] = cancel_ev | |
| def worker(): | |
| try: | |
| def progress_cb(data): | |
| q.put(("progress", data)) | |
| result = optimizer.optimize_text( | |
| payload, | |
| progress_callback=progress_cb, | |
| cancel_event=cancel_ev, | |
| ) | |
| q.put(("done", result)) | |
| except Exception as e: | |
| q.put(("error", str(e))) | |
| threading.Thread(target=worker, daemon=True).start() | |
| def gen(): | |
| try: | |
| yield f"data: {json.dumps({'event': 'job', 'job_id': job_id})}\n\n" | |
| while True: | |
| try: | |
| kind, data = q.get(timeout=0.3) | |
| except Empty: | |
| yield ": ping\n\n" | |
| continue | |
| if kind == "progress": | |
| yield f"data: {json.dumps(data)}\n\n" | |
| elif kind == "done": | |
| yield f"data: {json.dumps({'event': 'complete', 'result': data})}\n\n" | |
| break | |
| elif kind == "error": | |
| yield f"data: {json.dumps({'event': 'error', 'error': data})}\n\n" | |
| break | |
| finally: | |
| with _OPTIMIZER_JOBS_LOCK: | |
| _OPTIMIZER_CANCEL_EVENTS.pop(job_id, None) | |
| return StreamingResponse( | |
| gen(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| # Hugging Face Spaces использует порт 7860 | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |