ai-seo-analyzer / app.py
lsdf's picture
Optimizer: SSE log panel + static app.js (SES-safe nv), no global loader; HF_SES doc
6b11536
# app.py - Hugging Face Spaces entry point
import json
import threading
import uuid
from queue import Empty, Queue
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import uvicorn
import torch
import os
from models import (
AnalysisRequest,
AnalysisResponse,
SemanticAnalyzeRequest,
SemanticAnalyzeResponse,
SemanticSearchRequest,
SemanticSearchResponse,
UrlFetchRequest,
UrlFetchResponse,
UserAgentsResponse,
OptimizerRequest,
OptimizerResponse,
OptimizerCancelRequest,
)
import logic
import nlp_processor
import semantic_graph
import highlighter
import summarizer
import search
import url_fetcher
import optimizer
app = FastAPI(title="SEO AI Editor MVP")
_static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
if os.path.isdir(_static_dir):
app.mount("/static", StaticFiles(directory=_static_dir), name="static")
_OPTIMIZER_JOBS_LOCK = threading.Lock()
_OPTIMIZER_CANCEL_EVENTS: dict = {}
# Подключаем папку с шаблонами
templates = Jinja2Templates(directory="templates")
@app.on_event("startup")
async def startup_event():
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 Application starting. ML Device: {device}")
# Важный фикс для HF cpu-basic:
# не загружаем все spaCy модели на старте, чтобы не ловить OOM/Restarting.
# Модели подгружаются лениво в logic.get_doc() по факту запроса.
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_text(request: AnalysisRequest):
# 1. Считаем слова (ДВОЙНОЙ подсчёт: total + significant)
wc_target = logic.count_words(request.target_text, request.language)
wc_competitors_list = [logic.count_words(t, request.language) for t in request.competitors]
if wc_competitors_list:
avg_total = sum(c["total"] for c in wc_competitors_list) / len(wc_competitors_list)
avg_sig = sum(c["significant"] for c in wc_competitors_list) / len(wc_competitors_list)
else:
avg_total = 0
avg_sig = 0
word_counts_data = {
"target": wc_target,
"competitors": wc_competitors_list,
"avg": {
"total": round(avg_total),
"significant": round(avg_sig)
}
}
# 2. N-gram stats
ngram_stats_result = logic.calculate_ngram_stats(
request.target_text,
request.competitors,
request.language
)
# 3. BM25
key_phrases, key_words_unigrams = logic.parse_keywords(request.keywords, request.language)
bm25_recs = logic.calculate_bm25_recommendations(
request.target_text,
request.competitors,
request.keywords,
request.language
)
# 4. BERT
bert_results = logic.perform_bert_analysis(
request.target_text,
request.competitors,
key_phrases,
request.language
)
# 5. Title Analysis
title_data = {}
if request.target_title.strip():
title_data = logic.analyze_title(
request.target_title,
request.competitor_titles,
request.keywords,
request.language
)
return AnalysisResponse(
ngram_stats=ngram_stats_result,
bm25_recommendations=bm25_recs,
bert_analysis=bert_results,
word_counts=word_counts_data,
title_analysis=title_data
)
@app.post("/api/v1/semantic/analyze", response_model=SemanticAnalyzeResponse)
async def semantic_analyze(request: SemanticAnalyzeRequest):
def _build_doc_semantic(text: str, doc_name: str, doc_id: int):
sentences_data = nlp_processor.preprocess_text(text, request.language)
graph, word_weights = semantic_graph.build_semantic_graph(
sentences_data,
lang=request.language,
)
graph_data = semantic_graph.get_graph_data_for_frontend(graph)
markup_text = highlighter.generate_markup_for_frontend(
sentences_data, word_weights, threshold=request.threshold
)
summary_data = summarizer.generate_summary(
sentences_data, word_weights, compression_ratio=request.compression_ratio
)
top_keywords = semantic_graph.get_top_keywords(word_weights, top_n=20)
return {
"id": doc_id,
"name": doc_name,
"text": text,
"graph": graph_data,
"markup_text": markup_text,
"summary": summary_data,
"top_keywords": top_keywords,
"word_weights": word_weights,
"stats": {
"nodes": len(graph_data.get("nodes", [])),
"links": len(graph_data.get("links", [])),
"summary_sentences": len(summary_data),
},
}
target_doc = _build_doc_semantic(request.text, "Мой текст", 0)
competitor_docs = []
valid_competitors = [c for c in request.competitors if c.strip()]
for idx, comp_text in enumerate(valid_competitors):
competitor_docs.append(_build_doc_semantic(comp_text, f"Конкурент #{idx + 1}", idx + 1))
if competitor_docs:
avg_nodes = round(sum(c["stats"]["nodes"] for c in competitor_docs) / len(competitor_docs), 2)
avg_links = round(sum(c["stats"]["links"] for c in competitor_docs) / len(competitor_docs), 2)
else:
avg_nodes = 0
avg_links = 0
# Таблица сравнения "мощных терминов" (слова + фразы).
num_competitors = len(competitor_docs)
target_weights = target_doc.get("word_weights", {})
all_terms = set(target_weights.keys())
for comp in competitor_docs:
all_terms.update(comp.get("word_weights", {}).keys())
term_power_table = []
for term in all_terms:
target_weight = int(target_weights.get(term, 0))
comp_weights = [int(comp.get("word_weights", {}).get(term, 0)) for comp in competitor_docs]
comp_avg_weight = round(sum(comp_weights) / max(num_competitors, 1), 2)
comp_occurrence = sum(1 for w in comp_weights if w > 0)
term_power_table.append(
{
"term": term,
"term_type": "phrase" if " " in term else "word",
"target_weight": target_weight,
"competitor_avg_weight": comp_avg_weight,
"competitor_weights": comp_weights,
"comp_occurrence": comp_occurrence,
"comp_total": num_competitors,
}
)
term_power_table.sort(
key=lambda x: (
max([x["target_weight"]] + x["competitor_weights"]),
x["comp_occurrence"],
x["term"],
),
reverse=True,
)
comparison = {
"target_nodes": target_doc["stats"]["nodes"],
"target_links": target_doc["stats"]["links"],
"avg_comp_nodes": avg_nodes,
"avg_comp_links": avg_links,
"num_competitors": num_competitors,
"term_power_table": term_power_table,
}
return SemanticAnalyzeResponse(
target=target_doc,
competitors=competitor_docs,
comparison=comparison,
)
@app.post("/api/v1/semantic/search", response_model=SemanticSearchResponse)
async def semantic_search_endpoint(request: SemanticSearchRequest):
sentences_data = nlp_processor.preprocess_text(request.text, request.language)
graph, word_weights = semantic_graph.build_semantic_graph(
sentences_data,
lang=request.language,
)
results = search.semantic_search(
request.query_text,
graph,
word_weights,
request.language,
top_n=request.top_n,
)
return SemanticSearchResponse(results=results)
@app.get("/api/v1/url/user-agents", response_model=UserAgentsResponse)
async def get_user_agents():
return UserAgentsResponse(user_agents=url_fetcher.get_user_agent_presets())
@app.post("/api/v1/url/fetch", response_model=UrlFetchResponse)
async def fetch_url_endpoint(request: UrlFetchRequest):
try:
parsed = url_fetcher.fetch_url_content(
url=request.url,
user_agent_key=request.user_agent,
timeout_seconds=request.timeout_seconds,
)
return UrlFetchResponse(**parsed)
except Exception as e:
return UrlFetchResponse(
ok=False,
url=request.url or "",
user_agent_key=request.user_agent or "",
error=str(e),
)
@app.post("/api/v1/optimizer/run", response_model=OptimizerResponse)
async def run_optimizer(request: OptimizerRequest):
try:
result = optimizer.optimize_text(request.model_dump())
return OptimizerResponse(**result)
except Exception as e:
return OptimizerResponse(ok=False, error=str(e))
@app.post("/api/v1/optimizer/cancel")
async def optimizer_cancel(body: OptimizerCancelRequest):
with _OPTIMIZER_JOBS_LOCK:
ev = _OPTIMIZER_CANCEL_EVENTS.get(body.job_id)
if ev is not None:
ev.set()
return {"ok": True}
@app.post("/api/v1/optimizer/run-stream")
async def run_optimizer_stream(request: OptimizerRequest):
"""SSE: события прогресса + финальный JSON. Клиент ведёт локальный лог, без глобального лоадера."""
job_id = str(uuid.uuid4())
cancel_ev = threading.Event()
payload = request.model_dump()
q: Queue = Queue()
with _OPTIMIZER_JOBS_LOCK:
_OPTIMIZER_CANCEL_EVENTS[job_id] = cancel_ev
def worker():
try:
def progress_cb(data):
q.put(("progress", data))
result = optimizer.optimize_text(
payload,
progress_callback=progress_cb,
cancel_event=cancel_ev,
)
q.put(("done", result))
except Exception as e:
q.put(("error", str(e)))
threading.Thread(target=worker, daemon=True).start()
def gen():
try:
yield f"data: {json.dumps({'event': 'job', 'job_id': job_id})}\n\n"
while True:
try:
kind, data = q.get(timeout=0.3)
except Empty:
yield ": ping\n\n"
continue
if kind == "progress":
yield f"data: {json.dumps(data)}\n\n"
elif kind == "done":
yield f"data: {json.dumps({'event': 'complete', 'result': data})}\n\n"
break
elif kind == "error":
yield f"data: {json.dumps({'event': 'error', 'error': data})}\n\n"
break
finally:
with _OPTIMIZER_JOBS_LOCK:
_OPTIMIZER_CANCEL_EVENTS.pop(job_id, None)
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# Hugging Face Spaces использует порт 7860
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)