|
|
import os, re, json, requests, urllib.parse, hashlib, html |
|
|
from functools import lru_cache |
|
|
from typing import List, Optional, Tuple |
|
|
|
|
|
|
|
|
import torch, torch.nn.functional as F |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
|
|
|
|
|
|
from bs4 import BeautifulSoup |
|
|
import tldextract |
|
|
import trafilatura |
|
|
|
|
|
|
|
|
try: |
|
|
import cloudscraper |
|
|
HAS_CLOUDSCRAPER = True |
|
|
except Exception: |
|
|
HAS_CLOUDSCRAPER = False |
|
|
|
|
|
try: |
|
|
from pdfminer.high_level import extract_text as pdf_extract_text |
|
|
HAS_PDFMINER = True |
|
|
except Exception: |
|
|
HAS_PDFMINER = False |
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL = "michiyasunaga/LinkBERT-base" |
|
|
UA = { |
|
|
"User-Agent": ( |
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
|
|
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36" |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
PREFERRED_OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") |
|
|
FALLBACK_OPENAI_MODEL = "gpt-4o-mini" |
|
|
OPENAI_CHAT_URL = "https://api.openai.com/v1/chat/completions" |
|
|
|
|
|
|
|
|
EMBEDDING_CACHE = {} |
|
|
API_RESPONSE_CACHE = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tok = AutoTokenizer.from_pretrained(MODEL) |
|
|
enc = AutoModel.from_pretrained(MODEL) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from langdetect import detect, LangDetectException |
|
|
|
|
|
def detect_language(text: str) -> str: |
|
|
try: |
|
|
return detect(text) |
|
|
except LangDetectException: |
|
|
return 'en' |
|
|
|
|
|
def get_language_name(lang_code: str) -> str: |
|
|
lang_map = { |
|
|
'en': 'English', 'es': 'Spanish', 'fr': 'French', 'de': 'German', |
|
|
'it': 'Italian', 'pt': 'Portuguese', 'ru': 'Russian', 'ja': 'Japanese', |
|
|
'ko': 'Korean', 'zh': 'Chinese', 'ar': 'Arabic', 'hi': 'Hindi', |
|
|
'sr': 'Serbian', 'hr': 'Croatian', 'bs': 'Bosnian', 'sl': 'Slovenian', |
|
|
'mk': 'Macedonian', 'bg': 'Bulgarian', 'cs': 'Czech', 'sk': 'Slovak', |
|
|
'pl': 'Polish', 'uk': 'Ukrainian', 'ro': 'Romanian', 'hu': 'Hungarian' |
|
|
} |
|
|
return lang_map.get(lang_code, 'English') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def looks_like_url(text: str) -> bool: |
|
|
if not text: |
|
|
return False |
|
|
text = text.strip() |
|
|
if re.match(r'^(https?://)', text, flags=re.I): |
|
|
return True |
|
|
parts = urllib.parse.urlparse("http://" + text if "://" not in text else text) |
|
|
return bool(parts.netloc and "." in parts.netloc) |
|
|
|
|
|
def normalize_url(url: str) -> str: |
|
|
if not url: |
|
|
return url |
|
|
if not re.match(r'^https?://', url, flags=re.I): |
|
|
return "https://" + url |
|
|
return url |
|
|
|
|
|
def _norm(s: str) -> str: |
|
|
return re.sub(r'\s+', ' ', re.sub(r'[^a-z0-9 ]', ' ', s.lower())).strip() |
|
|
|
|
|
def _contains_anchor(text: str, anchor: str) -> bool: |
|
|
if not text or not anchor: |
|
|
return False |
|
|
t = _norm(text) |
|
|
a = _norm(anchor) |
|
|
return a in t |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fetch_bytes(url: str, timeout: int = 25) -> Optional[requests.Response]: |
|
|
sess = requests.Session() |
|
|
sess.headers.update({ |
|
|
"User-Agent": UA["User-Agent"], |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Cache-Control": "no-cache", |
|
|
}) |
|
|
try: |
|
|
r = sess.get(url, timeout=timeout, allow_redirects=True) |
|
|
print(f"[fetch] requests: {r.status_code} {len(r.content)} bytes from {r.url}") |
|
|
if r.ok and r.content: |
|
|
return r |
|
|
except Exception as e: |
|
|
print(f"[fetch] requests error: {e}") |
|
|
|
|
|
if HAS_CLOUDSCRAPER: |
|
|
try: |
|
|
scraper = cloudscraper.create_scraper(browser={'custom': UA["User-Agent"]}) |
|
|
r = scraper.get(url, timeout=timeout, allow_redirects=True) |
|
|
print(f"[fetch] cloudscraper: {r.status_code} {len(r.content)} bytes from {r.url}") |
|
|
if r.ok and r.content: |
|
|
return r |
|
|
except Exception as e: |
|
|
print(f"[fetch] cloudscraper error: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _split_to_blocks(raw: str, max_paragraphs: int) -> List[str]: |
|
|
raw = re.sub(r'\r', '\n', raw) |
|
|
raw = re.sub(r'\n{3,}', '\n\n', raw) |
|
|
chunks = [c.strip() for c in re.split(r'\n\s*\n', raw) if c.strip()] |
|
|
blocks: List[str] = [] |
|
|
for c in chunks: |
|
|
merged = re.sub(r'\s*\n\s*', ' ', c) |
|
|
if len(merged) >= 40: |
|
|
blocks.append(merged) |
|
|
if len(blocks) >= max_paragraphs: |
|
|
break |
|
|
return blocks |
|
|
|
|
|
def get_text_blocks(url: str, max_paragraphs: int = 8) -> List[str]: |
|
|
try: |
|
|
if re.search(r'\.pdf($|\?)', url, flags=re.I): |
|
|
if HAS_PDFMINER: |
|
|
try: |
|
|
r = _fetch_bytes(url) |
|
|
if not r: |
|
|
print("PDF fetch failed.") |
|
|
return [] |
|
|
txt = pdf_extract_text(fp=bytes(r.content)) |
|
|
blocks = _split_to_blocks(txt or "", max_paragraphs) |
|
|
print(f"PDF extracted {len(blocks)} blocks") |
|
|
return blocks |
|
|
except Exception as pe: |
|
|
print(f"PDF extract error: {pe}") |
|
|
return [] |
|
|
else: |
|
|
print("PDF detected but pdfminer.six not installed.") |
|
|
return [] |
|
|
|
|
|
r = _fetch_bytes(url) |
|
|
if not r: |
|
|
print("No response fetched (blocked or network).") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
txt = trafilatura.extract( |
|
|
r.content, |
|
|
base_url=r.url, |
|
|
include_comments=False, |
|
|
include_tables=False, |
|
|
deduplicate=True, |
|
|
output_format="txt", |
|
|
favor_precision=False |
|
|
) |
|
|
except Exception as te: |
|
|
print(f"Trafilatura extract error: {te}") |
|
|
txt = None |
|
|
|
|
|
if txt: |
|
|
blocks = _split_to_blocks(txt, max_paragraphs) |
|
|
if blocks: |
|
|
print(f"Trafilatura extracted {len(blocks)} blocks") |
|
|
return blocks |
|
|
|
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
for tag in soup(["script", "style", "noscript", "header", "nav", "aside", "form", "footer"]): |
|
|
tag.decompose() |
|
|
|
|
|
paras = [p.get_text(" ", strip=True) for p in soup.find_all(["p", "li"]) if p.get_text(strip=True)] |
|
|
combined: List[str] = [] |
|
|
buf: List[str] = [] |
|
|
for p in paras: |
|
|
buf.append(p) |
|
|
if len(" ".join(buf)) >= 120: |
|
|
combined.append(" ".join(buf)) |
|
|
buf = [] |
|
|
if len(combined) >= max_paragraphs: |
|
|
break |
|
|
if buf and len(combined) < max_paragraphs: |
|
|
if len(" ".join(buf)) >= 40: |
|
|
combined.append(" ".join(buf)) |
|
|
|
|
|
if combined: |
|
|
print(f"BeautifulSoup fallback collected {len(combined)} blocks") |
|
|
return combined |
|
|
|
|
|
print("No usable text extracted after all fallbacks.") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"get_text_blocks fatal: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def get_target_context(url: str) -> Tuple[str, str, str, List[str]]: |
|
|
""" |
|
|
Return (title, meta_description, h1, content_blocks) |
|
|
""" |
|
|
title = ""; meta = ""; h1 = ""; blocks: List[str] = [] |
|
|
try: |
|
|
r = _fetch_bytes(url) |
|
|
if not r: |
|
|
return title, meta, h1, blocks |
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
if soup.title and soup.title.get_text(): |
|
|
title = soup.title.get_text().strip() |
|
|
md = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property":"og:description"}) |
|
|
if md and md.get("content"): |
|
|
meta = md["content"].strip() |
|
|
h1_tag = soup.find("h1") |
|
|
if h1_tag: |
|
|
h1 = h1_tag.get_text(" ", strip=True) |
|
|
except Exception as e: |
|
|
print(f"[target] soup err: {e}") |
|
|
|
|
|
tb = get_text_blocks(url, max_paragraphs=6) |
|
|
if tb: |
|
|
blocks = tb |
|
|
return title, meta, h1, blocks |
|
|
|
|
|
def keyword_fallback_from_title_domain(title: str, url: str) -> List[str]: |
|
|
ext = tldextract.extract(url) |
|
|
brand = (ext.domain or "").replace("-", " ").strip() |
|
|
base = [] |
|
|
if title: |
|
|
t = _norm(title) |
|
|
tokens = [w for w in t.split() if len(w) >= 4] |
|
|
base.extend(tokens[:6]) |
|
|
if brand: |
|
|
base.extend([brand, f"{brand} reviews", f"{brand} guide"]) |
|
|
seen = set(); out=[] |
|
|
for k in base: |
|
|
k2 = k.strip() |
|
|
if k2 and k2 not in seen: |
|
|
out.append(k2); seen.add(k2) |
|
|
if not out: |
|
|
out = ["learn more", "full guide", "product details"] |
|
|
return out[:8] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _paragraph_sentences_from_html(url: str) -> List[str]: |
|
|
""" |
|
|
Return a flat list of sentences taken only from <p> tags of the source page. |
|
|
Excludes headings/lists to avoid proposing H tags. |
|
|
""" |
|
|
sents: List[str] = [] |
|
|
try: |
|
|
r = _fetch_bytes(url) |
|
|
if not r: |
|
|
return sents |
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
paras = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)] |
|
|
for p in paras: |
|
|
split = re.split(r'(?<=[.!?])\s+|\n+', p) |
|
|
for s in split: |
|
|
s = s.strip() |
|
|
if len(s) >= 10: |
|
|
sents.append(s) |
|
|
except Exception as e: |
|
|
print(f"[p-sents] error: {e}") |
|
|
return sents |
|
|
|
|
|
def _sentence_contains_anchor(s: str, anchor: str) -> bool: |
|
|
return _contains_anchor(s, anchor) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def mean_pool(last_hidden_state, mask): |
|
|
x = last_hidden_state |
|
|
mask = mask.unsqueeze(-1) |
|
|
return (x * mask).sum(1) / mask.sum(1) |
|
|
|
|
|
@lru_cache(maxsize=1000) |
|
|
def embed_cached(text_tuple): |
|
|
texts = list(text_tuple) |
|
|
batch = tok(texts, padding=True, truncation=True, return_tensors="pt") |
|
|
with torch.no_grad(): |
|
|
out = enc(**batch) |
|
|
return mean_pool(out.last_hidden_state, batch["attention_mask"]) |
|
|
|
|
|
def embed(texts: List[str]): |
|
|
return embed_cached(tuple(texts)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def inject_anchor_into_sentence(sentence, anchor_text, target_url): |
|
|
""" |
|
|
If the sentence already has the anchor text β wrap it; else append a short clause. |
|
|
(Used only when anchor exists in article.) |
|
|
""" |
|
|
if not sentence or not anchor_text: |
|
|
return sentence, False |
|
|
try: |
|
|
pattern = re.compile(r'\b' + re.escape(anchor_text) + r'\b', re.IGNORECASE) |
|
|
if pattern.search(sentence): |
|
|
result = pattern.sub(f'<a href="{target_url}">{anchor_text}</a>', sentence) |
|
|
return result, True |
|
|
except Exception: |
|
|
pass |
|
|
if len(sentence) > 0 and sentence[-1] in '.!?': |
|
|
base, punct = sentence[:-1], sentence[-1] |
|
|
else: |
|
|
base, punct = sentence, '.' |
|
|
rewritten = f'{base} <a href="{target_url}">{anchor_text}</a>{punct}' |
|
|
return rewritten, False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _openai_chat_simple(model_name: str, system: str, user_json: dict): |
|
|
""" |
|
|
Minimal body: model + messages only (no response_format/max_tokens/etc.). |
|
|
""" |
|
|
if not OPENAI_API_KEY: |
|
|
raise RuntimeError("OPENAI_API_KEY not set") |
|
|
|
|
|
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"} |
|
|
body = { |
|
|
"model": model_name, |
|
|
"messages": [ |
|
|
{"role": "system", "content": system}, |
|
|
{"role": "user", "content": json.dumps(user_json, ensure_ascii=False)} |
|
|
] |
|
|
} |
|
|
r = requests.post(OPENAI_CHAT_URL, headers=headers, json=body, timeout=60) |
|
|
print(f"[GPT] Model={model_name} HTTP {r.status_code}") |
|
|
r.raise_for_status() |
|
|
txt = r.json()["choices"][0]["message"]["content"] |
|
|
try: |
|
|
return json.loads(txt) |
|
|
except Exception: |
|
|
return {"text": txt} |
|
|
|
|
|
def _openai_chat_cached(cache_key: str, model_name: str, system: str, user_json: dict): |
|
|
if cache_key in API_RESPONSE_CACHE: |
|
|
print(f"[GPT] Using cached response for {cache_key[:8]}...") |
|
|
return API_RESPONSE_CACHE[cache_key] |
|
|
try: |
|
|
result = _openai_chat_simple(model_name, system, user_json) |
|
|
except Exception as e: |
|
|
print(f"[GPT] Preferred model failed: {e}. Falling back to {FALLBACK_OPENAI_MODEL}.") |
|
|
result = _openai_chat_simple(FALLBACK_OPENAI_MODEL, system, user_json) |
|
|
API_RESPONSE_CACHE[cache_key] = result |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_target_context_string(target_url: str) -> str: |
|
|
title, meta, h1, blocks = get_target_context(target_url) |
|
|
ctx_parts = [] |
|
|
if title: ctx_parts.append(f"Title: {title}") |
|
|
if meta: ctx_parts.append(f"Meta: {meta}") |
|
|
if h1: ctx_parts.append(f"H1: {h1}") |
|
|
if blocks: ctx_parts.append("Body: " + " ".join(blocks[:3])) |
|
|
return "\n".join(ctx_parts)[:2000] |
|
|
|
|
|
def gpt_generate_insert_paragraph(anchor_text: str, target_url: str, language: str, |
|
|
insert_after_sentence: str, article_context: List[str], |
|
|
target_context: str) -> str: |
|
|
""" |
|
|
Generate 1β3 sentences paragraph (HTML) that includes the exact anchor as a link, |
|
|
written to fit right after the given sentence. |
|
|
""" |
|
|
if not OPENAI_API_KEY: |
|
|
|
|
|
return f'<p>For more details, see <a href="{target_url}">{anchor_text}</a>.</p>' |
|
|
|
|
|
cache_key = hashlib.md5( |
|
|
f"para_{anchor_text}_{target_url}_{language}_{insert_after_sentence}_{' '.join(article_context)[:400]}_{target_context[:400]}".encode() |
|
|
).hexdigest() |
|
|
|
|
|
system = ( |
|
|
f"You are a precise copywriter in {language}. " |
|
|
"Write a short paragraph (1β3 sentences) that fits naturally into the article context, " |
|
|
"goes immediately AFTER the given sentence, and includes an <a href> with the EXACT provided anchor text " |
|
|
"pointing to the target URL. No em dashes. Output JSON with key 'paragraph_html'." |
|
|
) |
|
|
user = { |
|
|
"insert_after_sentence": insert_after_sentence, |
|
|
"article_context": article_context[:8], |
|
|
"target_context": target_context, |
|
|
"anchor_text": anchor_text, |
|
|
"target_url": target_url |
|
|
} |
|
|
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) |
|
|
return obj.get("paragraph_html", obj.get("text", f'<p><a href="{target_url}">{anchor_text}</a></p>')) |
|
|
|
|
|
def gpt_get_search_keywords_from_context(ctx_text: str, target_url: str) -> List[str]: |
|
|
if not OPENAI_API_KEY: |
|
|
return [] |
|
|
cache_key = hashlib.md5(f"kw_{target_url}_{ctx_text[:600]}".encode()).hexdigest() |
|
|
system = ( |
|
|
"You are an SEO assistant. From the provided target page context, return 5-10 realistic keyword phrases " |
|
|
"users would search for to find it. Return JSON {'keywords': [...] } only." |
|
|
) |
|
|
user = {"url": target_url, "context": ctx_text} |
|
|
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) |
|
|
return obj.get("keywords", []) |
|
|
|
|
|
def gpt_generate_content_with_keyword(source_blocks, keywords, target_url, language="English"): |
|
|
if not OPENAI_API_KEY or not keywords: |
|
|
return None |
|
|
source_preview = " ".join(source_blocks[:3])[:500] |
|
|
cache_key = hashlib.md5(f"gen_{source_preview}_{str(keywords)}_{target_url}_{language}".encode()).hexdigest() |
|
|
system = ( |
|
|
f"You are a skilled content writer in {language}. Given article paragraphs and keyword candidates " |
|
|
"for a target link, do: 1) choose ONE best keyword; 2) write 1β2 natural sentences that include it " |
|
|
"as an <a href> to target_url; 3) provide the exact source sentence AFTER WHICH to insert. " |
|
|
"Return JSON keys: chosen_keyword, new_content, insert_after_sentence." |
|
|
) |
|
|
user = { |
|
|
"article_paragraphs": source_blocks[:7], |
|
|
"available_keywords": keywords, |
|
|
"target_url": target_url, |
|
|
"language": language |
|
|
} |
|
|
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) |
|
|
return obj |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_alternative_anchor(blocks, target_url, original_anchor): |
|
|
try: |
|
|
ctx = build_target_context_string(target_url) |
|
|
print(f"[Alt] Target context len={len(ctx)}") |
|
|
keywords = gpt_get_search_keywords_from_context(ctx, target_url) |
|
|
if not keywords: |
|
|
title, _, _, _ = get_target_context(target_url) |
|
|
keywords = keyword_fallback_from_title_domain(title, target_url) |
|
|
|
|
|
if not keywords: |
|
|
return None, None |
|
|
|
|
|
source_text = " ".join(blocks[:2]) |
|
|
language_name = get_language_name(detect_language(source_text)) |
|
|
|
|
|
result = gpt_generate_content_with_keyword( |
|
|
source_blocks=blocks, |
|
|
keywords=keywords, |
|
|
target_url=target_url, |
|
|
language=language_name |
|
|
) |
|
|
if not result: |
|
|
return None, None |
|
|
|
|
|
chosen_keyword = result.get("chosen_keyword", keywords[0]) |
|
|
new_content = result.get("new_content", "") |
|
|
insert_after_sentence = result.get("insert_after_sentence", "") |
|
|
|
|
|
if insert_after_sentence: |
|
|
if len(insert_after_sentence) > 100: |
|
|
position_text = f"[Insert after: ...{insert_after_sentence[-80:]}]" |
|
|
else: |
|
|
position_text = f"[Insert after: {insert_after_sentence}]" |
|
|
else: |
|
|
position_text = "" |
|
|
|
|
|
return chosen_keyword, f"{position_text}\n\n{new_content}" if position_text else new_content |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Alt] Critical error: {e}") |
|
|
return None, None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=False): |
|
|
""" |
|
|
- Pull sentences only from <p> tags. |
|
|
- If anchor exists β return the exact sentence with anchor injection. |
|
|
- If anchor doesn't exist β propose ADDITIONAL PARAGRAPH with [Insert after: β¦] marker. |
|
|
""" |
|
|
try: |
|
|
para_sents = _paragraph_sentences_from_html(source_url) |
|
|
if not para_sents: |
|
|
blocks = get_text_blocks(source_url) |
|
|
if not blocks: |
|
|
return [{"error": f"No text blocks found on the page: {source_url}"}] |
|
|
para_sents = [] |
|
|
for blk in blocks: |
|
|
for s in re.split(r'(?<=[.!?])\s+|\n+', blk): |
|
|
s = s.strip() |
|
|
if len(s) >= 10: |
|
|
para_sents.append(s) |
|
|
if not para_sents: |
|
|
return [{"error": f"No sentences found on the page: {source_url}"}] |
|
|
|
|
|
keyword_present = any(_sentence_contains_anchor(s, anchor_text) for s in para_sents) |
|
|
|
|
|
t_title, t_meta, t_h1, _ = get_target_context(target_url) |
|
|
ext = tldextract.extract(target_url) |
|
|
tgt_domain = ".".join([p for p in [ext.domain, ext.suffix] if p]) |
|
|
sent_query = f"{anchor_text} β relevant to: {t_title or t_h1} | {t_meta} ({tgt_domain})" |
|
|
|
|
|
|
|
|
article_blocks_ctx = get_text_blocks(source_url) or [] |
|
|
|
|
|
results = [] |
|
|
|
|
|
if keyword_present: |
|
|
|
|
|
best_sent = next(s for s in para_sents if _sentence_contains_anchor(s, anchor_text)) |
|
|
rewritten_sent, _ = inject_anchor_into_sentence(best_sent, anchor_text, target_url) |
|
|
|
|
|
results.append({ |
|
|
"anchor_was_present": True, |
|
|
"best_sentence_original": best_sent, |
|
|
"best_sentence_with_anchor": rewritten_sent, |
|
|
"keyword_in_article": True |
|
|
}) |
|
|
|
|
|
else: |
|
|
|
|
|
try: |
|
|
q = embed([sent_query])[0] |
|
|
s_embs = embed(para_sents) |
|
|
sims = F.cosine_similarity(s_embs, q.repeat(len(para_sents), 1)) |
|
|
si = int(torch.argmax(sims).item()) |
|
|
insert_after_sentence = para_sents[si] |
|
|
except Exception as e: |
|
|
print(f"Sentence similarity error: {e}") |
|
|
insert_after_sentence = para_sents[0] |
|
|
|
|
|
|
|
|
language_name = get_language_name(detect_language(" ".join(para_sents[:2]) or "en")) |
|
|
target_ctx = build_target_context_string(target_url) |
|
|
paragraph_html = gpt_generate_insert_paragraph( |
|
|
anchor_text=anchor_text, |
|
|
target_url=target_url, |
|
|
language=language_name, |
|
|
insert_after_sentence=insert_after_sentence, |
|
|
article_context=article_blocks_ctx, |
|
|
target_context=target_ctx |
|
|
) |
|
|
|
|
|
position_text = insert_after_sentence |
|
|
results.append({ |
|
|
"anchor_was_present": False, |
|
|
"best_sentence_original": position_text, |
|
|
"best_sentence_with_anchor": paragraph_html, |
|
|
"keyword_in_article": False, |
|
|
"is_new_paragraph": True |
|
|
}) |
|
|
|
|
|
|
|
|
if suggest_alternative: |
|
|
alt_anchor, alt_content = find_alternative_anchor(article_blocks_ctx, target_url, anchor_text) |
|
|
if alt_anchor and alt_content: |
|
|
results[-1]["alternative_anchor"] = alt_anchor |
|
|
results[-1]["alternative_sentence_original"] = "" |
|
|
results[-1]["alternative_sentence"] = alt_content |
|
|
results[-1]["alternative_exact_match"] = True |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Critical error in suggest_insertions: {e}") |
|
|
return [{ |
|
|
"error": f"Error processing the page: {str(e)}", |
|
|
"anchor_was_present": False, |
|
|
"best_sentence_original": "Error occurred", |
|
|
"best_sentence_with_anchor": f"Error occurred. Try manually: <a href='{target_url}'>{anchor_text}</a>", |
|
|
"keyword_in_article": False |
|
|
}] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_tool(source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor): |
|
|
if not source_url or not target_url or not anchor_text: |
|
|
return "β Please provide Source URL, Target URL, and Anchor Text." |
|
|
|
|
|
warn = "" |
|
|
if looks_like_url(anchor_text) and not looks_like_url(target_url): |
|
|
anchor_text, target_url = target_url, anchor_text |
|
|
warn = "βΉοΈ Detected swapped inputs. I used the URL as Target URL and the text as Anchor.\n\n" |
|
|
|
|
|
source_url = normalize_url(source_url) |
|
|
target_url = normalize_url(target_url) |
|
|
|
|
|
try: |
|
|
results = suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=suggest_alternative_anchor) |
|
|
res = results[0] |
|
|
except Exception as e: |
|
|
return f"β Error processing the page: {str(e)}" |
|
|
|
|
|
if "error" in res: |
|
|
return f"β {res['error']}" |
|
|
|
|
|
original_sentence = res['best_sentence_original'] |
|
|
draft_html = res["best_sentence_with_anchor"] |
|
|
anchor_was_present = res.get("anchor_was_present", False) |
|
|
keyword_in_article = res.get("keyword_in_article", False) |
|
|
is_new_paragraph = res.get("is_new_paragraph", False) |
|
|
|
|
|
|
|
|
final_html = draft_html |
|
|
if smart_rewrite and not is_new_paragraph and anchor_was_present: |
|
|
language_name = get_language_name(detect_language(original_sentence)) |
|
|
g = gpt_rewrite(final_html, anchor_text, target_url, language=language_name, target_context=build_target_context_string(target_url)) |
|
|
final_html = g["sentence_html"] |
|
|
|
|
|
final_output = to_plain_text(final_html) if plain_text else final_html |
|
|
|
|
|
if keyword_in_article and not is_new_paragraph: |
|
|
result = warn + f"β
**Anchor text '{anchor_text}' found in article!**\n\n" |
|
|
result += "π Add link here:\n\n" |
|
|
result += f"{final_output}" |
|
|
else: |
|
|
|
|
|
result = warn + f"β οΈ **Anchor text '{anchor_text}' not found in article**\n\n" |
|
|
result += "π Result 1 β **Add this NEW paragraph** after the sentence below:\n\n" |
|
|
result += f"π [Insert after:] {original_sentence}\n\n" |
|
|
result += f"{final_output}" |
|
|
|
|
|
if suggest_alternative_anchor and res.get("alternative_anchor"): |
|
|
alt_anchor = res["alternative_anchor"] |
|
|
alt_content = res.get("alternative_sentence", "") |
|
|
if alt_content: |
|
|
if "[Insert after:" in alt_content: |
|
|
parts = alt_content.split("\n\n", 1) |
|
|
position_info = parts[0] if len(parts) > 0 else "" |
|
|
actual_content = parts[1] if len(parts) > 1 else alt_content |
|
|
else: |
|
|
position_info = "" |
|
|
actual_content = alt_content |
|
|
alt_output = to_plain_text(actual_content) if plain_text else actual_content |
|
|
result += f"\n\n{'='*50}\n\n" |
|
|
result += "π Result 2 β **Suggested new anchor & paragraph**:\n" |
|
|
result += f"π‘ Using keyword: '{alt_anchor}'\n" |
|
|
if position_info and "[Insert after:" in position_info: |
|
|
result += f"π {position_info}\n" |
|
|
result += f"\n{alt_output}" |
|
|
|
|
|
return result |
|
|
|
|
|
def to_plain_text(html_or_text: str) -> str: |
|
|
text = BeautifulSoup(html_or_text, "html.parser").get_text(separator=" ", strip=True) |
|
|
return html.unescape(text) |
|
|
|
|
|
def clear_cache(): |
|
|
global EMBEDDING_CACHE, API_RESPONSE_CACHE |
|
|
EMBEDDING_CACHE.clear() |
|
|
API_RESPONSE_CACHE.clear() |
|
|
embed_cached.cache_clear() |
|
|
return "β
Cache cleared successfully!" |
|
|
|
|
|
|
|
|
gpt_status = "ON" if OPENAI_API_KEY else "OFF" |
|
|
title_model = PREFERRED_OPENAI_MODEL if OPENAI_API_KEY else "OFF" |
|
|
|
|
|
with gr.Blocks(title=f"Link Insertion Helper β’ GPT: {gpt_status}") as demo: |
|
|
gr.Markdown(f"# Link Insertion Helper β’ GPT: {gpt_status} β’ Model: {title_model}") |
|
|
gr.Markdown("Suggests the best place to add your link with intelligent language detection and caching.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
source_url = gr.Textbox(label="Source URL", placeholder="https://example.com/article") |
|
|
target_url = gr.Textbox(label="Target URL", placeholder="https://example.com/target") |
|
|
anchor_text = gr.Textbox(label="Anchor Text", placeholder="your anchor text") |
|
|
|
|
|
with gr.Row(): |
|
|
smart_rewrite = gr.Checkbox(label="Smart rewrite (GPT)", value=True) |
|
|
plain_text = gr.Checkbox(label="Plain text (no URL)", value=True) |
|
|
suggest_alternative_anchor = gr.Checkbox( |
|
|
label="Suggest alternative anchor", |
|
|
value=True, |
|
|
info="Also propose a second option with a different anchor and its own paragraph" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
submit_btn = gr.Button("Process", variant="primary") |
|
|
clear_cache_btn = gr.Button("Clear Cache", variant="secondary") |
|
|
|
|
|
with gr.Column(): |
|
|
output = gr.Textbox(label="Result", lines=14) |
|
|
cache_status = gr.Textbox(label="Cache Status", interactive=False) |
|
|
|
|
|
submit_btn.click( |
|
|
fn=run_tool, |
|
|
inputs=[source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor], |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
clear_cache_btn.click( |
|
|
fn=clear_cache, |
|
|
outputs=cache_status |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
### Features: |
|
|
- π§© **Paragraph-Only Selection**: Never proposes headings; picks sentences from <p> tags only |
|
|
- π― **Anchor-First**: If anchor exists, returns the exact sentence containing it |
|
|
- β **No Anchor? Add a Paragraph**: Result 1 always gives a new paragraph with [Insert after:] |
|
|
- π§ **Target-Aware**: Uses title/meta/H1/body of the target URL for relevance |
|
|
- π **Alternative Anchor**: Optional Result 2 with a different anchor + ready paragraph |
|
|
- π§° Robust extraction: Trafilatura + BS4; optional Cloudflare/PDF handling |
|
|
""") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |