| | import os |
| | import re |
| | import logging |
| | import requests |
| | import PyPDF2 |
| | import numpy as np |
| | import pandas as pd |
| | from io import BytesIO |
| | from typing import List, Dict, Tuple, Optional |
| | from urllib.parse import urlparse, urljoin |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from bs4 import BeautifulSoup |
| | from pathlib import Path |
| | from datetime import datetime |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from requests.adapters import HTTPAdapter |
| | from urllib3.util.retry import Retry |
| | from transformers import pipeline |
| | from sentence_transformers import SentenceTransformer, util |
| | import torch |
| | import spacy |
| | import matplotlib.pyplot as plt |
| | from utils import sanitize_filename |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | PROHIBITED_TERMS = [ |
| | "gratis", "garantizado", "rentabilidad asegurada", "sin compromiso", |
| | "resultados inmediatos", "cero riesgo", "sin letra pequeña" |
| | ] |
| |
|
| | class SEOSpaceAnalyzer: |
| | def __init__(self, max_urls: int = 20, max_workers: int = 4): |
| | self.max_urls = max_urls |
| | self.max_workers = max_workers |
| | self.session = self._configure_session() |
| | self.models = self._load_models() |
| | self.base_dir = Path("content_storage") |
| | self.base_dir.mkdir(parents=True, exist_ok=True) |
| | self.current_analysis: Dict = {} |
| |
|
| | def _configure_session(self): |
| | session = requests.Session() |
| | retry = Retry(total=3, backoff_factor=1, |
| | status_forcelist=[500, 502, 503, 504], |
| | allowed_methods=["GET"]) |
| | session.mount("http://", HTTPAdapter(max_retries=retry)) |
| | session.mount("https://", HTTPAdapter(max_retries=retry)) |
| | session.headers.update({ |
| | "User-Agent": "SEOAnalyzer/1.0", |
| | "Accept-Language": "es-ES,es;q=0.9" |
| | }) |
| | return session |
| |
|
| | def _load_models(self): |
| | device = 0 if torch.cuda.is_available() else -1 |
| | return { |
| | "spacy": spacy.load("es_core_news_lg"), |
| | "summarizer": pipeline("summarization", model="facebook/bart-large-cnn", device=device), |
| | "ner": pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device), |
| | "semantic": SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2"), |
| | "zeroshot": pipeline("zero-shot-classification", model="facebook/bart-large-mnli") |
| | } |
| |
|
| | def analyze_sitemap( |
| | self, |
| | sitemap_url: str, |
| | progress_callback: Optional[callable] = None, |
| | status_callback: Optional[callable] = None |
| | ) -> Tuple: |
| | urls = self._parse_sitemap(sitemap_url) |
| | if not urls: |
| | return {"error": "No se pudieron extraer URLs"}, [], {}, {}, {}, {}, {} |
| | results = [] |
| | batch_size = 5 |
| | num_urls = min(len(urls), self.max_urls) |
| | total_batches = (num_urls + batch_size - 1) // batch_size |
| |
|
| | for batch_index in range(total_batches): |
| | start = batch_index * batch_size |
| | batch_urls = urls[start:start+batch_size] |
| | if status_callback: |
| | status_callback(f"Procesando batch {batch_index+1}/{total_batches}: {batch_urls}") |
| | with ThreadPoolExecutor(max_workers=len(batch_urls)) as executor: |
| | futures = {executor.submit(self._process_url, url): url for url in batch_urls} |
| | for future in as_completed(futures): |
| | try: |
| | results.append(future.result()) |
| | except Exception as e: |
| | results.append({"url": futures[future], "status": "error", "error": str(e)}) |
| | if progress_callback: |
| | progress_callback(batch_index+1, total_batches) |
| |
|
| | |
| | summaries, entities = self._apply_nlp(results) |
| | similarities = self._compute_similarity(results) |
| | flags = self._flag_prohibited_terms(results) |
| | topics = self._classify_topics(results) |
| | seo_tags = self._generate_seo_tags(results, summaries, topics, flags) |
| |
|
| | self.current_analysis = { |
| | "stats": self._calculate_stats(results), |
| | "content_analysis": self._analyze_content(results), |
| | "links": self._analyze_links(results), |
| | "recommendations": self._generate_recommendations(results), |
| | "details": results, |
| | "summaries": summaries, |
| | "entities": entities, |
| | "similarities": similarities, |
| | "flags": flags, |
| | "topics": topics, |
| | "seo_tags": seo_tags, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | a = self.current_analysis |
| | |
| | return ( |
| | a["stats"], a["recommendations"], a["content_analysis"], |
| | a["links"], a["details"], a["similarities"], |
| | a["seo_tags"] |
| | ) |
| |
|
| | def _process_url(self, url: str) -> Dict: |
| | try: |
| | response = self.session.get(url, timeout=10) |
| | content_type = response.headers.get("Content-Type", "") |
| | if "application/pdf" in content_type: |
| | return self._process_pdf(url, response.content) |
| | return self._process_html(url, response.text) |
| | except Exception as e: |
| | return {"url": url, "status": "error", "error": str(e)} |
| |
|
| | def _process_html(self, url: str, html: str) -> Dict: |
| | soup = BeautifulSoup(html, "html.parser") |
| | text = re.sub(r"\s+", " ", soup.get_text()) |
| | return { |
| | "url": url, |
| | "type": "html", |
| | "status": "success", |
| | "content": text, |
| | "word_count": len(text.split()), |
| | "metadata": self._extract_metadata(soup), |
| | "links": self._extract_links(soup, url) |
| | } |
| |
|
| | def _process_pdf(self, url: str, content: bytes) -> Dict: |
| | try: |
| | reader = PyPDF2.PdfReader(BytesIO(content)) |
| | text = "".join(p.extract_text() or "" for p in reader.pages) |
| | return { |
| | "url": url, |
| | "type": "pdf", |
| | "status": "success", |
| | "content": text, |
| | "word_count": len(text.split()), |
| | "page_count": len(reader.pages) |
| | } |
| | except Exception as e: |
| | return {"url": url, "status": "error", "error": str(e)} |
| |
|
| | def _extract_metadata(self, soup: BeautifulSoup) -> Dict: |
| | meta = {"title": "", "description": ""} |
| | if soup.title: |
| | meta["title"] = soup.title.string.strip() |
| | for tag in soup.find_all("meta"): |
| | if tag.get("name") == "description": |
| | meta["description"] = tag.get("content", "") |
| | return meta |
| |
|
| | def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: |
| | links = [] |
| | base_domain = urlparse(base_url).netloc |
| | for tag in soup.find_all("a", href=True): |
| | href = tag["href"] |
| | full_url = urljoin(base_url, href) |
| | netloc = urlparse(full_url).netloc |
| | links.append({ |
| | "url": full_url, |
| | "type": "internal" if netloc == base_domain else "external", |
| | "anchor": tag.get_text(strip=True) |
| | }) |
| | return links |
| |
|
| | def _parse_sitemap(self, sitemap_url: str) -> List[str]: |
| | try: |
| | r = self.session.get(sitemap_url) |
| | soup = BeautifulSoup(r.text, "lxml-xml") |
| | return [loc.text for loc in soup.find_all("loc")] |
| | except Exception as e: |
| | logger.error(f"Error al parsear sitemap {sitemap_url}: {e}") |
| | return [] |
| |
|
| | def _save_content(self, url: str, content: bytes) -> None: |
| | try: |
| | parsed = urlparse(url) |
| | domain_dir = self.base_dir / parsed.netloc |
| | path = parsed.path.lstrip("/") |
| | if not path or path.endswith("/"): |
| | path = os.path.join(path, "index.html") |
| | safe_path = sanitize_filename(path) |
| | save_path = domain_dir / safe_path |
| | save_path.parent.mkdir(parents=True, exist_ok=True) |
| | new_hash = hash(content) |
| | if save_path.exists(): |
| | with open(save_path, "rb") as f: |
| | if hash(f.read()) == new_hash: |
| | logger.debug(f"El contenido de {url} ya está guardado.") |
| | return |
| | with open(save_path, "wb") as f: |
| | f.write(content) |
| | logger.info(f"Guardado contenido en: {save_path}") |
| | except Exception as e: |
| | logger.error(f"Error guardando contenido para {url}: {e}") |
| |
|
| | def _calculate_stats(self, results: List[Dict]) -> Dict: |
| | success = [r for r in results if r.get("status") == "success"] |
| | return { |
| | "total": len(results), |
| | "success": len(success), |
| | "failed": len(results) - len(success), |
| | "avg_words": round(np.mean([r.get("word_count", 0) for r in success]) if success else 0, 1) |
| | } |
| |
|
| | def _analyze_content(self, results: List[Dict]) -> Dict: |
| | texts = [r["content"] for r in results if r.get("status") == "success" and r.get("content")] |
| | if not texts: |
| | return {} |
| | tfidf = TfidfVectorizer(max_features=20, stop_words=list(self.models["spacy"].Defaults.stop_words)) |
| | tfidf.fit(texts) |
| | top = tfidf.get_feature_names_out().tolist() |
| | return {"top_keywords": top, "samples": texts[:3]} |
| |
|
| | def _analyze_links(self, results: List[Dict]) -> Dict: |
| | all_links = [] |
| | for r in results: |
| | all_links.extend(r.get("links", [])) |
| | if not all_links: |
| | return {} |
| | df = pd.DataFrame(all_links) |
| | return { |
| | "internal_links": df[df["type"] == "internal"]["url"].value_counts().head(10).to_dict(), |
| | "external_links": df[df["type"] == "external"]["url"].value_counts().head(10).to_dict() |
| | } |
| |
|
| | def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict, Dict]: |
| | summaries, entities = {}, {} |
| | for r in results: |
| | if r.get("status") != "success" or not r.get("content"): |
| | continue |
| | text = r["content"][:1024] |
| | try: |
| | summaries[r["url"]] = self.models["summarizer"](text, max_length=100, min_length=30)[0]["summary_text"] |
| | ents = self.models["ner"](text) |
| | entities[r["url"]] = list({e["word"] for e in ents if e["score"] > 0.8}) |
| | except Exception as e: |
| | continue |
| | return summaries, entities |
| |
|
| | def _compute_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]: |
| | docs = [(r["url"], r["content"]) for r in results if r.get("status") == "success" and r.get("content")] |
| | if len(docs) < 2: |
| | return {} |
| | urls, texts = zip(*docs) |
| | emb = self.models["semantic"].encode(texts, convert_to_tensor=True) |
| | sim = util.pytorch_cos_sim(emb, emb) |
| | return { |
| | urls[i]: [{"url": urls[j], "score": float(sim[i][j])} |
| | for j in np.argsort(-sim[i]) if i != j][:3] |
| | for i in range(len(urls)) |
| | } |
| |
|
| | def _flag_prohibited_terms(self, results: List[Dict]) -> Dict[str, List[str]]: |
| | flags = {} |
| | for r in results: |
| | found = [term for term in PROHIBITED_TERMS if term in r.get("content", "").lower()] |
| | if found: |
| | flags[r["url"]] = found |
| | return flags |
| |
|
| | def _classify_topics(self, results: List[Dict]) -> Dict[str, List[str]]: |
| | labels = [ |
| | "hipotecas", "préstamos", "cuentas", "tarjetas", |
| | "seguros", "inversión", "educación financiera" |
| | ] |
| | topics = {} |
| | for r in results: |
| | if r.get("status") != "success": |
| | continue |
| | try: |
| | res = self.models["zeroshot"](r["content"][:1000], candidate_labels=labels, multi_label=True) |
| | topics[r["url"]] = [l for l, s in zip(res["labels"], res["scores"]) if s > 0.5] |
| | except Exception as e: |
| | continue |
| | return topics |
| |
|
| | def _generate_seo_tags(self, results: List[Dict], summaries: Dict, topics: Dict, flags: Dict) -> Dict[str, Dict]: |
| | seo_tags = {} |
| | for r in results: |
| | url = r["url"] |
| | base = summaries.get(url, r.get("content", "")[:300]) |
| | topic = topics.get(url, ["contenido"])[0] if topics.get(url) else "contenido" |
| | try: |
| | prompt = f"Genera un título SEO formal y una meta descripción para contenido sobre {topic}: {base}" |
| | output = self.models["summarizer"](prompt, max_length=60, min_length=20)[0]["summary_text"] |
| | title, desc = output.split(".")[0], output |
| | except Exception as e: |
| | title, desc = "", "" |
| | seo_tags[url] = { |
| | "title": title, |
| | "meta_description": desc, |
| | "flags": flags.get(url, []) |
| | } |
| | return seo_tags |
| |
|
| | def _generate_recommendations(self, results: List[Dict]) -> List[str]: |
| | recs = [] |
| | if any(r.get("word_count", 0) < 300 for r in results): |
| | recs.append("✍️ Algunos contenidos son demasiado breves (<300 palabras)") |
| | if any("gratis" in r.get("content", "").lower() for r in results): |
| | recs.append("⚠️ Detectado uso de lenguaje no permitido") |
| | return recs or ["✅ Todo parece correcto"] |
| |
|
| | def plot_internal_links(self, links: Dict) -> any: |
| | if not links or not links.get("internal_links"): |
| | fig, ax = plt.subplots() |
| | ax.text(0.5, 0.5, "No hay enlaces internos", ha="center") |
| | return fig |
| | top = links["internal_links"] |
| | fig, ax = plt.subplots() |
| | ax.barh(list(top.keys()), list(top.values())) |
| | ax.set_title("Top Enlaces Internos") |
| | plt.tight_layout() |
| | return fig |
| |
|