File size: 29,004 Bytes
dec1bc1 ab9bead dec1bc1 8a8251c dec1bc1 890b1ad 89a4ac7 dec1bc1 890b1ad 8a8251c dec1bc1 890b1ad ab9bead 87321e9 99d0fdc 87321e9 3840a00 890b1ad dec1bc1 87321e9 890b1ad dec1bc1 890b1ad 8a8251c 87321e9 dec1bc1 87321e9 dec1bc1 87321e9 dec1bc1 87321e9 dec1bc1 87321e9 890b1ad 3840a00 890b1ad 331928c 0ee888e dec1bc1 89a4ac7 dec1bc1 89a4ac7 dec1bc1 0ee888e dec1bc1 d1d93ff dec1bc1 89a4ac7 dec1bc1 89a4ac7 dec1bc1 89a4ac7 3840a00 ab9bead 99d0fdc ab9bead 99d0fdc ab9bead 99d0fdc dec1bc1 8a8251c 87321e9 8a8251c dec1bc1 87321e9 dec1bc1 99d0fdc dec1bc1 eb92310 99d0fdc a072005 0ee888e a072005 0ee888e a072005 eb92310 99d0fdc eb92310 de36d64 dec1bc1 ab9bead dec1bc1 ab9bead 99d0fdc ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 99d0fdc ab9bead 99d0fdc ab9bead dec1bc1 99d0fdc dec1bc1 99d0fdc dec1bc1 99d0fdc ab9bead 99d0fdc dec1bc1 ab9bead 99d0fdc dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 99d0fdc ab9bead dec1bc1 ab9bead dec1bc1 ab9bead dec1bc1 99d0fdc dec1bc1 6e710b3 d107e20 ab9bead 6e710b3 dec1bc1 ab9bead dec1bc1 74325d3 6e710b3 dec1bc1 ab9bead 74325d3 e4a1ef0 dec1bc1 e4a1ef0 dec1bc1 e4a1ef0 dec1bc1 8263900 ab9bead 6e710b3 b9459ca 99d0fdc 4bc41f9 99d0fdc 8a8251c 99d0fdc dec1bc1 ab9bead 99d0fdc ab9bead 99d0fdc 8263900 99d0fdc ab9bead 99d0fdc ab9bead 99d0fdc ab9bead 99d0fdc ab9bead 99d0fdc dec1bc1 8263900 dec1bc1 8263900 8a8251c 890b1ad d6c6bec 890b1ad c42fb18 3840a00 c42fb18 3840a00 c42fb18 87321e9 3840a00 c42fb18 87321e9 c42fb18 87321e9 c42fb18 dec1bc1 c42fb18 4fbef2b d6c6bec 99d0fdc dec1bc1 99d0fdc ab9bead 99d0fdc ab9bead 99d0fdc 0ee888e f54486b 99d0fdc eae84e3 99d0fdc dec1bc1 ab9bead 99d0fdc ab9bead c42fb18 3d8425d 99d0fdc 87321e9 eb92310 1cee888 3840a00 8a8251c 87321e9 dec1bc1 87321e9 dec1bc1 87321e9 c42fb18 dec1bc1 d6c6bec 99d0fdc c42fb18 dec1bc1 87321e9 dec1bc1 87321e9 99d0fdc 87321e9 dec1bc1 87321e9 4bc41f9 87321e9 dec1bc1 87321e9 dec1bc1 87321e9 dec1bc1 99d0fdc dec1bc1 8a8251c c42fb18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 |
import os, re, json, requests, urllib.parse, hashlib, html
from functools import lru_cache
from typing import List, Optional, Tuple
# Torch / Transformers
import torch, torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
# Parsing / Extraction
from bs4 import BeautifulSoup
import tldextract
import trafilatura
# Optional fallbacks
try:
import cloudscraper
HAS_CLOUDSCRAPER = True
except Exception:
HAS_CLOUDSCRAPER = False
try:
from pdfminer.high_level import extract_text as pdf_extract_text
HAS_PDFMINER = True
except Exception:
HAS_PDFMINER = False
# UI
import gradio as gr
# =========================
# Config
# =========================
MODEL = "michiyasunaga/LinkBERT-base"
UA = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
)
}
# --- OpenAI settings (simplified for GPT-5) ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PREFERRED_OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") # per your request
FALLBACK_OPENAI_MODEL = "gpt-4o-mini"
OPENAI_CHAT_URL = "https://api.openai.com/v1/chat/completions"
# Caches
EMBEDDING_CACHE = {}
API_RESPONSE_CACHE = {}
# =========================
# Load LinkBERT (sentence-level embeddings)
# =========================
tok = AutoTokenizer.from_pretrained(MODEL)
enc = AutoModel.from_pretrained(MODEL)
# =========================
# Language Detection
# =========================
from langdetect import detect, LangDetectException
def detect_language(text: str) -> str:
try:
return detect(text)
except LangDetectException:
return 'en'
def get_language_name(lang_code: str) -> str:
lang_map = {
'en': 'English', 'es': 'Spanish', 'fr': 'French', 'de': 'German',
'it': 'Italian', 'pt': 'Portuguese', 'ru': 'Russian', 'ja': 'Japanese',
'ko': 'Korean', 'zh': 'Chinese', 'ar': 'Arabic', 'hi': 'Hindi',
'sr': 'Serbian', 'hr': 'Croatian', 'bs': 'Bosnian', 'sl': 'Slovenian',
'mk': 'Macedonian', 'bg': 'Bulgarian', 'cs': 'Czech', 'sk': 'Slovak',
'pl': 'Polish', 'uk': 'Ukrainian', 'ro': 'Romanian', 'hu': 'Hungarian'
}
return lang_map.get(lang_code, 'English')
# =========================
# Helpers
# =========================
def looks_like_url(text: str) -> bool:
if not text:
return False
text = text.strip()
if re.match(r'^(https?://)', text, flags=re.I):
return True
parts = urllib.parse.urlparse("http://" + text if "://" not in text else text)
return bool(parts.netloc and "." in parts.netloc)
def normalize_url(url: str) -> str:
if not url:
return url
if not re.match(r'^https?://', url, flags=re.I):
return "https://" + url
return url
def _norm(s: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'[^a-z0-9 ]', ' ', s.lower())).strip()
def _contains_anchor(text: str, anchor: str) -> bool:
if not text or not anchor:
return False
t = _norm(text)
a = _norm(anchor)
return a in t
# =========================
# Robust fetching + text extraction
# =========================
def _fetch_bytes(url: str, timeout: int = 25) -> Optional[requests.Response]:
sess = requests.Session()
sess.headers.update({
"User-Agent": UA["User-Agent"],
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
})
try:
r = sess.get(url, timeout=timeout, allow_redirects=True)
print(f"[fetch] requests: {r.status_code} {len(r.content)} bytes from {r.url}")
if r.ok and r.content:
return r
except Exception as e:
print(f"[fetch] requests error: {e}")
if HAS_CLOUDSCRAPER:
try:
scraper = cloudscraper.create_scraper(browser={'custom': UA["User-Agent"]})
r = scraper.get(url, timeout=timeout, allow_redirects=True)
print(f"[fetch] cloudscraper: {r.status_code} {len(r.content)} bytes from {r.url}")
if r.ok and r.content:
return r
except Exception as e:
print(f"[fetch] cloudscraper error: {e}")
return None
def _split_to_blocks(raw: str, max_paragraphs: int) -> List[str]:
raw = re.sub(r'\r', '\n', raw)
raw = re.sub(r'\n{3,}', '\n\n', raw)
chunks = [c.strip() for c in re.split(r'\n\s*\n', raw) if c.strip()]
blocks: List[str] = []
for c in chunks:
merged = re.sub(r'\s*\n\s*', ' ', c)
if len(merged) >= 40:
blocks.append(merged)
if len(blocks) >= max_paragraphs:
break
return blocks
def get_text_blocks(url: str, max_paragraphs: int = 8) -> List[str]:
try:
if re.search(r'\.pdf($|\?)', url, flags=re.I):
if HAS_PDFMINER:
try:
r = _fetch_bytes(url)
if not r:
print("PDF fetch failed.")
return []
txt = pdf_extract_text(fp=bytes(r.content))
blocks = _split_to_blocks(txt or "", max_paragraphs)
print(f"PDF extracted {len(blocks)} blocks")
return blocks
except Exception as pe:
print(f"PDF extract error: {pe}")
return []
else:
print("PDF detected but pdfminer.six not installed.")
return []
r = _fetch_bytes(url)
if not r:
print("No response fetched (blocked or network).")
return []
try:
txt = trafilatura.extract(
r.content,
base_url=r.url,
include_comments=False,
include_tables=False,
deduplicate=True,
output_format="txt",
favor_precision=False
)
except Exception as te:
print(f"Trafilatura extract error: {te}")
txt = None
if txt:
blocks = _split_to_blocks(txt, max_paragraphs)
if blocks:
print(f"Trafilatura extracted {len(blocks)} blocks")
return blocks
soup = BeautifulSoup(r.text, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "nav", "aside", "form", "footer"]):
tag.decompose()
paras = [p.get_text(" ", strip=True) for p in soup.find_all(["p", "li"]) if p.get_text(strip=True)]
combined: List[str] = []
buf: List[str] = []
for p in paras:
buf.append(p)
if len(" ".join(buf)) >= 120:
combined.append(" ".join(buf))
buf = []
if len(combined) >= max_paragraphs:
break
if buf and len(combined) < max_paragraphs:
if len(" ".join(buf)) >= 40:
combined.append(" ".join(buf))
if combined:
print(f"BeautifulSoup fallback collected {len(combined)} blocks")
return combined
print("No usable text extracted after all fallbacks.")
return []
except Exception as e:
print(f"get_text_blocks fatal: {e}")
return []
# -------- target context helpers --------
def get_target_context(url: str) -> Tuple[str, str, str, List[str]]:
"""
Return (title, meta_description, h1, content_blocks)
"""
title = ""; meta = ""; h1 = ""; blocks: List[str] = []
try:
r = _fetch_bytes(url)
if not r:
return title, meta, h1, blocks
soup = BeautifulSoup(r.text, "html.parser")
if soup.title and soup.title.get_text():
title = soup.title.get_text().strip()
md = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property":"og:description"})
if md and md.get("content"):
meta = md["content"].strip()
h1_tag = soup.find("h1")
if h1_tag:
h1 = h1_tag.get_text(" ", strip=True)
except Exception as e:
print(f"[target] soup err: {e}")
tb = get_text_blocks(url, max_paragraphs=6)
if tb:
blocks = tb
return title, meta, h1, blocks
def keyword_fallback_from_title_domain(title: str, url: str) -> List[str]:
ext = tldextract.extract(url)
brand = (ext.domain or "").replace("-", " ").strip()
base = []
if title:
t = _norm(title)
tokens = [w for w in t.split() if len(w) >= 4]
base.extend(tokens[:6])
if brand:
base.extend([brand, f"{brand} reviews", f"{brand} guide"])
seen = set(); out=[]
for k in base:
k2 = k.strip()
if k2 and k2 not in seen:
out.append(k2); seen.add(k2)
if not out:
out = ["learn more", "full guide", "product details"]
return out[:8]
# =========================
# Extract paragraph sentences ONLY (no headings)
# =========================
def _paragraph_sentences_from_html(url: str) -> List[str]:
"""
Return a flat list of sentences taken only from <p> tags of the source page.
Excludes headings/lists to avoid proposing H tags.
"""
sents: List[str] = []
try:
r = _fetch_bytes(url)
if not r:
return sents
soup = BeautifulSoup(r.text, "html.parser")
paras = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)]
for p in paras:
split = re.split(r'(?<=[.!?])\s+|\n+', p)
for s in split:
s = s.strip()
if len(s) >= 10:
sents.append(s)
except Exception as e:
print(f"[p-sents] error: {e}")
return sents
def _sentence_contains_anchor(s: str, anchor: str) -> bool:
return _contains_anchor(s, anchor)
# =========================
# Embedding helpers
# =========================
def mean_pool(last_hidden_state, mask):
x = last_hidden_state
mask = mask.unsqueeze(-1)
return (x * mask).sum(1) / mask.sum(1)
@lru_cache(maxsize=1000)
def embed_cached(text_tuple):
texts = list(text_tuple)
batch = tok(texts, padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
out = enc(**batch)
return mean_pool(out.last_hidden_state, batch["attention_mask"])
def embed(texts: List[str]):
return embed_cached(tuple(texts))
# =========================
# Anchor injection helper
# =========================
def inject_anchor_into_sentence(sentence, anchor_text, target_url):
"""
If the sentence already has the anchor text β wrap it; else append a short clause.
(Used only when anchor exists in article.)
"""
if not sentence or not anchor_text:
return sentence, False
try:
pattern = re.compile(r'\b' + re.escape(anchor_text) + r'\b', re.IGNORECASE)
if pattern.search(sentence):
result = pattern.sub(f'<a href="{target_url}">{anchor_text}</a>', sentence)
return result, True
except Exception:
pass
if len(sentence) > 0 and sentence[-1] in '.!?':
base, punct = sentence[:-1], sentence[-1]
else:
base, punct = sentence, '.'
rewritten = f'{base} <a href="{target_url}">{anchor_text}</a>{punct}'
return rewritten, False
# =========================
# OpenAI helpers (SIMPLE BODY for GPT-5)
# =========================
def _openai_chat_simple(model_name: str, system: str, user_json: dict):
"""
Minimal body: model + messages only (no response_format/max_tokens/etc.).
"""
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY not set")
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}
body = {
"model": model_name,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": json.dumps(user_json, ensure_ascii=False)}
]
}
r = requests.post(OPENAI_CHAT_URL, headers=headers, json=body, timeout=60)
print(f"[GPT] Model={model_name} HTTP {r.status_code}")
r.raise_for_status()
txt = r.json()["choices"][0]["message"]["content"]
try:
return json.loads(txt)
except Exception:
return {"text": txt}
def _openai_chat_cached(cache_key: str, model_name: str, system: str, user_json: dict):
if cache_key in API_RESPONSE_CACHE:
print(f"[GPT] Using cached response for {cache_key[:8]}...")
return API_RESPONSE_CACHE[cache_key]
try:
result = _openai_chat_simple(model_name, system, user_json)
except Exception as e:
print(f"[GPT] Preferred model failed: {e}. Falling back to {FALLBACK_OPENAI_MODEL}.")
result = _openai_chat_simple(FALLBACK_OPENAI_MODEL, system, user_json)
API_RESPONSE_CACHE[cache_key] = result
return result
# =========================
# Target-aware paragraph generators
# =========================
def build_target_context_string(target_url: str) -> str:
title, meta, h1, blocks = get_target_context(target_url)
ctx_parts = []
if title: ctx_parts.append(f"Title: {title}")
if meta: ctx_parts.append(f"Meta: {meta}")
if h1: ctx_parts.append(f"H1: {h1}")
if blocks: ctx_parts.append("Body: " + " ".join(blocks[:3]))
return "\n".join(ctx_parts)[:2000]
def gpt_generate_insert_paragraph(anchor_text: str, target_url: str, language: str,
insert_after_sentence: str, article_context: List[str],
target_context: str) -> str:
"""
Generate 1β3 sentences paragraph (HTML) that includes the exact anchor as a link,
written to fit right after the given sentence.
"""
if not OPENAI_API_KEY:
# simple fallback
return f'<p>For more details, see <a href="{target_url}">{anchor_text}</a>.</p>'
cache_key = hashlib.md5(
f"para_{anchor_text}_{target_url}_{language}_{insert_after_sentence}_{' '.join(article_context)[:400]}_{target_context[:400]}".encode()
).hexdigest()
system = (
f"You are a precise copywriter in {language}. "
"Write a short paragraph (1β3 sentences) that fits naturally into the article context, "
"goes immediately AFTER the given sentence, and includes an <a href> with the EXACT provided anchor text "
"pointing to the target URL. No em dashes. Output JSON with key 'paragraph_html'."
)
user = {
"insert_after_sentence": insert_after_sentence,
"article_context": article_context[:8],
"target_context": target_context,
"anchor_text": anchor_text,
"target_url": target_url
}
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user)
return obj.get("paragraph_html", obj.get("text", f'<p><a href="{target_url}">{anchor_text}</a></p>'))
def gpt_get_search_keywords_from_context(ctx_text: str, target_url: str) -> List[str]:
if not OPENAI_API_KEY:
return []
cache_key = hashlib.md5(f"kw_{target_url}_{ctx_text[:600]}".encode()).hexdigest()
system = (
"You are an SEO assistant. From the provided target page context, return 5-10 realistic keyword phrases "
"users would search for to find it. Return JSON {'keywords': [...] } only."
)
user = {"url": target_url, "context": ctx_text}
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user)
return obj.get("keywords", [])
def gpt_generate_content_with_keyword(source_blocks, keywords, target_url, language="English"):
if not OPENAI_API_KEY or not keywords:
return None
source_preview = " ".join(source_blocks[:3])[:500]
cache_key = hashlib.md5(f"gen_{source_preview}_{str(keywords)}_{target_url}_{language}".encode()).hexdigest()
system = (
f"You are a skilled content writer in {language}. Given article paragraphs and keyword candidates "
"for a target link, do: 1) choose ONE best keyword; 2) write 1β2 natural sentences that include it "
"as an <a href> to target_url; 3) provide the exact source sentence AFTER WHICH to insert. "
"Return JSON keys: chosen_keyword, new_content, insert_after_sentence."
)
user = {
"article_paragraphs": source_blocks[:7],
"available_keywords": keywords,
"target_url": target_url,
"language": language
}
obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user)
return obj
# =========================
# Alternative anchor pipeline
# =========================
def find_alternative_anchor(blocks, target_url, original_anchor):
try:
ctx = build_target_context_string(target_url)
print(f"[Alt] Target context len={len(ctx)}")
keywords = gpt_get_search_keywords_from_context(ctx, target_url)
if not keywords:
title, _, _, _ = get_target_context(target_url)
keywords = keyword_fallback_from_title_domain(title, target_url)
if not keywords:
return None, None
source_text = " ".join(blocks[:2])
language_name = get_language_name(detect_language(source_text))
result = gpt_generate_content_with_keyword(
source_blocks=blocks,
keywords=keywords,
target_url=target_url,
language=language_name
)
if not result:
return None, None
chosen_keyword = result.get("chosen_keyword", keywords[0])
new_content = result.get("new_content", "")
insert_after_sentence = result.get("insert_after_sentence", "")
if insert_after_sentence:
if len(insert_after_sentence) > 100:
position_text = f"[Insert after: ...{insert_after_sentence[-80:]}]"
else:
position_text = f"[Insert after: {insert_after_sentence}]"
else:
position_text = ""
return chosen_keyword, f"{position_text}\n\n{new_content}" if position_text else new_content
except Exception as e:
print(f"[Alt] Critical error: {e}")
return None, None
# =========================
# Main selector (paragraph-only, anchor-first, add-paragraph if missing)
# =========================
def suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=False):
"""
- Pull sentences only from <p> tags.
- If anchor exists β return the exact sentence with anchor injection.
- If anchor doesn't exist β propose ADDITIONAL PARAGRAPH with [Insert after: β¦] marker.
"""
try:
para_sents = _paragraph_sentences_from_html(source_url)
if not para_sents:
blocks = get_text_blocks(source_url)
if not blocks:
return [{"error": f"No text blocks found on the page: {source_url}"}]
para_sents = []
for blk in blocks:
for s in re.split(r'(?<=[.!?])\s+|\n+', blk):
s = s.strip()
if len(s) >= 10:
para_sents.append(s)
if not para_sents:
return [{"error": f"No sentences found on the page: {source_url}"}]
keyword_present = any(_sentence_contains_anchor(s, anchor_text) for s in para_sents)
t_title, t_meta, t_h1, _ = get_target_context(target_url)
ext = tldextract.extract(target_url)
tgt_domain = ".".join([p for p in [ext.domain, ext.suffix] if p])
sent_query = f"{anchor_text} β relevant to: {t_title or t_h1} | {t_meta} ({tgt_domain})"
# Use full article blocks as context for paragraph generation when needed
article_blocks_ctx = get_text_blocks(source_url) or []
results = []
if keyword_present:
# Use the FIRST sentence that contains the anchor (exact edit)
best_sent = next(s for s in para_sents if _sentence_contains_anchor(s, anchor_text))
rewritten_sent, _ = inject_anchor_into_sentence(best_sent, anchor_text, target_url)
results.append({
"anchor_was_present": True,
"best_sentence_original": best_sent,
"best_sentence_with_anchor": rewritten_sent,
"keyword_in_article": True
})
else:
# Choose the best insertion point sentence by similarity
try:
q = embed([sent_query])[0]
s_embs = embed(para_sents)
sims = F.cosine_similarity(s_embs, q.repeat(len(para_sents), 1))
si = int(torch.argmax(sims).item())
insert_after_sentence = para_sents[si]
except Exception as e:
print(f"Sentence similarity error: {e}")
insert_after_sentence = para_sents[0]
# Generate a NEW PARAGRAPH (not a sentence change) with the specified anchor
language_name = get_language_name(detect_language(" ".join(para_sents[:2]) or "en"))
target_ctx = build_target_context_string(target_url)
paragraph_html = gpt_generate_insert_paragraph(
anchor_text=anchor_text,
target_url=target_url,
language=language_name,
insert_after_sentence=insert_after_sentence,
article_context=article_blocks_ctx,
target_context=target_ctx
)
position_text = insert_after_sentence
results.append({
"anchor_was_present": False,
"best_sentence_original": position_text, # we use this field as the insert-after pointer
"best_sentence_with_anchor": paragraph_html, # the new paragraph HTML to add
"keyword_in_article": False,
"is_new_paragraph": True
})
# Alternative anchor block (Result 2)
if suggest_alternative:
alt_anchor, alt_content = find_alternative_anchor(article_blocks_ctx, target_url, anchor_text)
if alt_anchor and alt_content:
results[-1]["alternative_anchor"] = alt_anchor
results[-1]["alternative_sentence_original"] = ""
results[-1]["alternative_sentence"] = alt_content
results[-1]["alternative_exact_match"] = True
return results
except Exception as e:
print(f"Critical error in suggest_insertions: {e}")
return [{
"error": f"Error processing the page: {str(e)}",
"anchor_was_present": False,
"best_sentence_original": "Error occurred",
"best_sentence_with_anchor": f"Error occurred. Try manually: <a href='{target_url}'>{anchor_text}</a>",
"keyword_in_article": False
}]
# =========================
# Gradio UI
# =========================
def run_tool(source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor):
if not source_url or not target_url or not anchor_text:
return "β Please provide Source URL, Target URL, and Anchor Text."
warn = ""
if looks_like_url(anchor_text) and not looks_like_url(target_url):
anchor_text, target_url = target_url, anchor_text
warn = "βΉοΈ Detected swapped inputs. I used the URL as Target URL and the text as Anchor.\n\n"
source_url = normalize_url(source_url)
target_url = normalize_url(target_url)
try:
results = suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=suggest_alternative_anchor)
res = results[0]
except Exception as e:
return f"β Error processing the page: {str(e)}"
if "error" in res:
return f"β {res['error']}"
original_sentence = res['best_sentence_original']
draft_html = res["best_sentence_with_anchor"]
anchor_was_present = res.get("anchor_was_present", False)
keyword_in_article = res.get("keyword_in_article", False)
is_new_paragraph = res.get("is_new_paragraph", False)
# Optional polish only when we are changing an existing sentence (not needed for new paragraph usually)
final_html = draft_html
if smart_rewrite and not is_new_paragraph and anchor_was_present:
language_name = get_language_name(detect_language(original_sentence))
g = gpt_rewrite(final_html, anchor_text, target_url, language=language_name, target_context=build_target_context_string(target_url))
final_html = g["sentence_html"]
final_output = to_plain_text(final_html) if plain_text else final_html
if keyword_in_article and not is_new_paragraph:
result = warn + f"β
**Anchor text '{anchor_text}' found in article!**\n\n"
result += "π Add link here:\n\n"
result += f"{final_output}"
else:
# NEW DEFAULT: add paragraph after a sentence
result = warn + f"β οΈ **Anchor text '{anchor_text}' not found in article**\n\n"
result += "π Result 1 β **Add this NEW paragraph** after the sentence below:\n\n"
result += f"π [Insert after:] {original_sentence}\n\n"
result += f"{final_output}"
if suggest_alternative_anchor and res.get("alternative_anchor"):
alt_anchor = res["alternative_anchor"]
alt_content = res.get("alternative_sentence", "")
if alt_content:
if "[Insert after:" in alt_content:
parts = alt_content.split("\n\n", 1)
position_info = parts[0] if len(parts) > 0 else ""
actual_content = parts[1] if len(parts) > 1 else alt_content
else:
position_info = ""
actual_content = alt_content
alt_output = to_plain_text(actual_content) if plain_text else actual_content
result += f"\n\n{'='*50}\n\n"
result += "π Result 2 β **Suggested new anchor & paragraph**:\n"
result += f"π‘ Using keyword: '{alt_anchor}'\n"
if position_info and "[Insert after:" in position_info:
result += f"π {position_info}\n"
result += f"\n{alt_output}"
return result
def to_plain_text(html_or_text: str) -> str:
text = BeautifulSoup(html_or_text, "html.parser").get_text(separator=" ", strip=True)
return html.unescape(text)
def clear_cache():
global EMBEDDING_CACHE, API_RESPONSE_CACHE
EMBEDDING_CACHE.clear()
API_RESPONSE_CACHE.clear()
embed_cached.cache_clear()
return "β
Cache cleared successfully!"
# Show GPT status in the header
gpt_status = "ON" if OPENAI_API_KEY else "OFF"
title_model = PREFERRED_OPENAI_MODEL if OPENAI_API_KEY else "OFF"
with gr.Blocks(title=f"Link Insertion Helper β’ GPT: {gpt_status}") as demo:
gr.Markdown(f"# Link Insertion Helper β’ GPT: {gpt_status} β’ Model: {title_model}")
gr.Markdown("Suggests the best place to add your link with intelligent language detection and caching.")
with gr.Row():
with gr.Column():
source_url = gr.Textbox(label="Source URL", placeholder="https://example.com/article")
target_url = gr.Textbox(label="Target URL", placeholder="https://example.com/target")
anchor_text = gr.Textbox(label="Anchor Text", placeholder="your anchor text")
with gr.Row():
smart_rewrite = gr.Checkbox(label="Smart rewrite (GPT)", value=True)
plain_text = gr.Checkbox(label="Plain text (no URL)", value=True)
suggest_alternative_anchor = gr.Checkbox(
label="Suggest alternative anchor",
value=True,
info="Also propose a second option with a different anchor and its own paragraph"
)
with gr.Row():
submit_btn = gr.Button("Process", variant="primary")
clear_cache_btn = gr.Button("Clear Cache", variant="secondary")
with gr.Column():
output = gr.Textbox(label="Result", lines=14)
cache_status = gr.Textbox(label="Cache Status", interactive=False)
submit_btn.click(
fn=run_tool,
inputs=[source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor],
outputs=output
)
clear_cache_btn.click(
fn=clear_cache,
outputs=cache_status
)
gr.Markdown("""
### Features:
- π§© **Paragraph-Only Selection**: Never proposes headings; picks sentences from <p> tags only
- π― **Anchor-First**: If anchor exists, returns the exact sentence containing it
- β **No Anchor? Add a Paragraph**: Result 1 always gives a new paragraph with [Insert after:]
- π§ **Target-Aware**: Uses title/meta/H1/body of the target URL for relevance
- π **Alternative Anchor**: Optional Result 2 with a different anchor + ready paragraph
- π§° Robust extraction: Trafilatura + BS4; optional Cloudflare/PDF handling
""")
if __name__ == "__main__":
demo.launch() |