import os, re, json, requests, urllib.parse, hashlib, html from functools import lru_cache from typing import List, Optional, Tuple # Torch / Transformers import torch, torch.nn.functional as F from transformers import AutoTokenizer, AutoModel # Parsing / Extraction from bs4 import BeautifulSoup import tldextract import trafilatura # Optional fallbacks try: import cloudscraper HAS_CLOUDSCRAPER = True except Exception: HAS_CLOUDSCRAPER = False try: from pdfminer.high_level import extract_text as pdf_extract_text HAS_PDFMINER = True except Exception: HAS_PDFMINER = False # UI import gradio as gr # ========================= # Config # ========================= MODEL = "michiyasunaga/LinkBERT-base" UA = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" ) } # --- OpenAI settings (simplified for GPT-5) --- OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") PREFERRED_OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5") # per your request FALLBACK_OPENAI_MODEL = "gpt-4o-mini" OPENAI_CHAT_URL = "https://api.openai.com/v1/chat/completions" # Caches EMBEDDING_CACHE = {} API_RESPONSE_CACHE = {} # ========================= # Load LinkBERT (sentence-level embeddings) # ========================= tok = AutoTokenizer.from_pretrained(MODEL) enc = AutoModel.from_pretrained(MODEL) # ========================= # Language Detection # ========================= from langdetect import detect, LangDetectException def detect_language(text: str) -> str: try: return detect(text) except LangDetectException: return 'en' def get_language_name(lang_code: str) -> str: lang_map = { 'en': 'English', 'es': 'Spanish', 'fr': 'French', 'de': 'German', 'it': 'Italian', 'pt': 'Portuguese', 'ru': 'Russian', 'ja': 'Japanese', 'ko': 'Korean', 'zh': 'Chinese', 'ar': 'Arabic', 'hi': 'Hindi', 'sr': 'Serbian', 'hr': 'Croatian', 'bs': 'Bosnian', 'sl': 'Slovenian', 'mk': 'Macedonian', 'bg': 'Bulgarian', 'cs': 'Czech', 'sk': 'Slovak', 'pl': 'Polish', 'uk': 'Ukrainian', 'ro': 'Romanian', 'hu': 'Hungarian' } return lang_map.get(lang_code, 'English') # ========================= # Helpers # ========================= def looks_like_url(text: str) -> bool: if not text: return False text = text.strip() if re.match(r'^(https?://)', text, flags=re.I): return True parts = urllib.parse.urlparse("http://" + text if "://" not in text else text) return bool(parts.netloc and "." in parts.netloc) def normalize_url(url: str) -> str: if not url: return url if not re.match(r'^https?://', url, flags=re.I): return "https://" + url return url def _norm(s: str) -> str: return re.sub(r'\s+', ' ', re.sub(r'[^a-z0-9 ]', ' ', s.lower())).strip() def _contains_anchor(text: str, anchor: str) -> bool: if not text or not anchor: return False t = _norm(text) a = _norm(anchor) return a in t # ========================= # Robust fetching + text extraction # ========================= def _fetch_bytes(url: str, timeout: int = 25) -> Optional[requests.Response]: sess = requests.Session() sess.headers.update({ "User-Agent": UA["User-Agent"], "Accept-Language": "en-US,en;q=0.9", "Cache-Control": "no-cache", }) try: r = sess.get(url, timeout=timeout, allow_redirects=True) print(f"[fetch] requests: {r.status_code} {len(r.content)} bytes from {r.url}") if r.ok and r.content: return r except Exception as e: print(f"[fetch] requests error: {e}") if HAS_CLOUDSCRAPER: try: scraper = cloudscraper.create_scraper(browser={'custom': UA["User-Agent"]}) r = scraper.get(url, timeout=timeout, allow_redirects=True) print(f"[fetch] cloudscraper: {r.status_code} {len(r.content)} bytes from {r.url}") if r.ok and r.content: return r except Exception as e: print(f"[fetch] cloudscraper error: {e}") return None def _split_to_blocks(raw: str, max_paragraphs: int) -> List[str]: raw = re.sub(r'\r', '\n', raw) raw = re.sub(r'\n{3,}', '\n\n', raw) chunks = [c.strip() for c in re.split(r'\n\s*\n', raw) if c.strip()] blocks: List[str] = [] for c in chunks: merged = re.sub(r'\s*\n\s*', ' ', c) if len(merged) >= 40: blocks.append(merged) if len(blocks) >= max_paragraphs: break return blocks def get_text_blocks(url: str, max_paragraphs: int = 8) -> List[str]: try: if re.search(r'\.pdf($|\?)', url, flags=re.I): if HAS_PDFMINER: try: r = _fetch_bytes(url) if not r: print("PDF fetch failed.") return [] txt = pdf_extract_text(fp=bytes(r.content)) blocks = _split_to_blocks(txt or "", max_paragraphs) print(f"PDF extracted {len(blocks)} blocks") return blocks except Exception as pe: print(f"PDF extract error: {pe}") return [] else: print("PDF detected but pdfminer.six not installed.") return [] r = _fetch_bytes(url) if not r: print("No response fetched (blocked or network).") return [] try: txt = trafilatura.extract( r.content, base_url=r.url, include_comments=False, include_tables=False, deduplicate=True, output_format="txt", favor_precision=False ) except Exception as te: print(f"Trafilatura extract error: {te}") txt = None if txt: blocks = _split_to_blocks(txt, max_paragraphs) if blocks: print(f"Trafilatura extracted {len(blocks)} blocks") return blocks soup = BeautifulSoup(r.text, "html.parser") for tag in soup(["script", "style", "noscript", "header", "nav", "aside", "form", "footer"]): tag.decompose() paras = [p.get_text(" ", strip=True) for p in soup.find_all(["p", "li"]) if p.get_text(strip=True)] combined: List[str] = [] buf: List[str] = [] for p in paras: buf.append(p) if len(" ".join(buf)) >= 120: combined.append(" ".join(buf)) buf = [] if len(combined) >= max_paragraphs: break if buf and len(combined) < max_paragraphs: if len(" ".join(buf)) >= 40: combined.append(" ".join(buf)) if combined: print(f"BeautifulSoup fallback collected {len(combined)} blocks") return combined print("No usable text extracted after all fallbacks.") return [] except Exception as e: print(f"get_text_blocks fatal: {e}") return [] # -------- target context helpers -------- def get_target_context(url: str) -> Tuple[str, str, str, List[str]]: """ Return (title, meta_description, h1, content_blocks) """ title = ""; meta = ""; h1 = ""; blocks: List[str] = [] try: r = _fetch_bytes(url) if not r: return title, meta, h1, blocks soup = BeautifulSoup(r.text, "html.parser") if soup.title and soup.title.get_text(): title = soup.title.get_text().strip() md = soup.find("meta", attrs={"name": "description"}) or soup.find("meta", attrs={"property":"og:description"}) if md and md.get("content"): meta = md["content"].strip() h1_tag = soup.find("h1") if h1_tag: h1 = h1_tag.get_text(" ", strip=True) except Exception as e: print(f"[target] soup err: {e}") tb = get_text_blocks(url, max_paragraphs=6) if tb: blocks = tb return title, meta, h1, blocks def keyword_fallback_from_title_domain(title: str, url: str) -> List[str]: ext = tldextract.extract(url) brand = (ext.domain or "").replace("-", " ").strip() base = [] if title: t = _norm(title) tokens = [w for w in t.split() if len(w) >= 4] base.extend(tokens[:6]) if brand: base.extend([brand, f"{brand} reviews", f"{brand} guide"]) seen = set(); out=[] for k in base: k2 = k.strip() if k2 and k2 not in seen: out.append(k2); seen.add(k2) if not out: out = ["learn more", "full guide", "product details"] return out[:8] # ========================= # Extract paragraph sentences ONLY (no headings) # ========================= def _paragraph_sentences_from_html(url: str) -> List[str]: """ Return a flat list of sentences taken only from

tags of the source page. Excludes headings/lists to avoid proposing H tags. """ sents: List[str] = [] try: r = _fetch_bytes(url) if not r: return sents soup = BeautifulSoup(r.text, "html.parser") paras = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)] for p in paras: split = re.split(r'(?<=[.!?])\s+|\n+', p) for s in split: s = s.strip() if len(s) >= 10: sents.append(s) except Exception as e: print(f"[p-sents] error: {e}") return sents def _sentence_contains_anchor(s: str, anchor: str) -> bool: return _contains_anchor(s, anchor) # ========================= # Embedding helpers # ========================= def mean_pool(last_hidden_state, mask): x = last_hidden_state mask = mask.unsqueeze(-1) return (x * mask).sum(1) / mask.sum(1) @lru_cache(maxsize=1000) def embed_cached(text_tuple): texts = list(text_tuple) batch = tok(texts, padding=True, truncation=True, return_tensors="pt") with torch.no_grad(): out = enc(**batch) return mean_pool(out.last_hidden_state, batch["attention_mask"]) def embed(texts: List[str]): return embed_cached(tuple(texts)) # ========================= # Anchor injection helper # ========================= def inject_anchor_into_sentence(sentence, anchor_text, target_url): """ If the sentence already has the anchor text → wrap it; else append a short clause. (Used only when anchor exists in article.) """ if not sentence or not anchor_text: return sentence, False try: pattern = re.compile(r'\b' + re.escape(anchor_text) + r'\b', re.IGNORECASE) if pattern.search(sentence): result = pattern.sub(f'{anchor_text}', sentence) return result, True except Exception: pass if len(sentence) > 0 and sentence[-1] in '.!?': base, punct = sentence[:-1], sentence[-1] else: base, punct = sentence, '.' rewritten = f'{base} {anchor_text}{punct}' return rewritten, False # ========================= # OpenAI helpers (SIMPLE BODY for GPT-5) # ========================= def _openai_chat_simple(model_name: str, system: str, user_json: dict): """ Minimal body: model + messages only (no response_format/max_tokens/etc.). """ if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY not set") headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"} body = { "model": model_name, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": json.dumps(user_json, ensure_ascii=False)} ] } r = requests.post(OPENAI_CHAT_URL, headers=headers, json=body, timeout=60) print(f"[GPT] Model={model_name} HTTP {r.status_code}") r.raise_for_status() txt = r.json()["choices"][0]["message"]["content"] try: return json.loads(txt) except Exception: return {"text": txt} def _openai_chat_cached(cache_key: str, model_name: str, system: str, user_json: dict): if cache_key in API_RESPONSE_CACHE: print(f"[GPT] Using cached response for {cache_key[:8]}...") return API_RESPONSE_CACHE[cache_key] try: result = _openai_chat_simple(model_name, system, user_json) except Exception as e: print(f"[GPT] Preferred model failed: {e}. Falling back to {FALLBACK_OPENAI_MODEL}.") result = _openai_chat_simple(FALLBACK_OPENAI_MODEL, system, user_json) API_RESPONSE_CACHE[cache_key] = result return result # ========================= # Target-aware paragraph generators # ========================= def build_target_context_string(target_url: str) -> str: title, meta, h1, blocks = get_target_context(target_url) ctx_parts = [] if title: ctx_parts.append(f"Title: {title}") if meta: ctx_parts.append(f"Meta: {meta}") if h1: ctx_parts.append(f"H1: {h1}") if blocks: ctx_parts.append("Body: " + " ".join(blocks[:3])) return "\n".join(ctx_parts)[:2000] def gpt_generate_insert_paragraph(anchor_text: str, target_url: str, language: str, insert_after_sentence: str, article_context: List[str], target_context: str) -> str: """ Generate 1–3 sentences paragraph (HTML) that includes the exact anchor as a link, written to fit right after the given sentence. """ if not OPENAI_API_KEY: # simple fallback return f'

For more details, see {anchor_text}.

' cache_key = hashlib.md5( f"para_{anchor_text}_{target_url}_{language}_{insert_after_sentence}_{' '.join(article_context)[:400]}_{target_context[:400]}".encode() ).hexdigest() system = ( f"You are a precise copywriter in {language}. " "Write a short paragraph (1–3 sentences) that fits naturally into the article context, " "goes immediately AFTER the given sentence, and includes an with the EXACT provided anchor text " "pointing to the target URL. No em dashes. Output JSON with key 'paragraph_html'." ) user = { "insert_after_sentence": insert_after_sentence, "article_context": article_context[:8], "target_context": target_context, "anchor_text": anchor_text, "target_url": target_url } obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) return obj.get("paragraph_html", obj.get("text", f'

{anchor_text}

')) def gpt_get_search_keywords_from_context(ctx_text: str, target_url: str) -> List[str]: if not OPENAI_API_KEY: return [] cache_key = hashlib.md5(f"kw_{target_url}_{ctx_text[:600]}".encode()).hexdigest() system = ( "You are an SEO assistant. From the provided target page context, return 5-10 realistic keyword phrases " "users would search for to find it. Return JSON {'keywords': [...] } only." ) user = {"url": target_url, "context": ctx_text} obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) return obj.get("keywords", []) def gpt_generate_content_with_keyword(source_blocks, keywords, target_url, language="English"): if not OPENAI_API_KEY or not keywords: return None source_preview = " ".join(source_blocks[:3])[:500] cache_key = hashlib.md5(f"gen_{source_preview}_{str(keywords)}_{target_url}_{language}".encode()).hexdigest() system = ( f"You are a skilled content writer in {language}. Given article paragraphs and keyword candidates " "for a target link, do: 1) choose ONE best keyword; 2) write 1–2 natural sentences that include it " "as an to target_url; 3) provide the exact source sentence AFTER WHICH to insert. " "Return JSON keys: chosen_keyword, new_content, insert_after_sentence." ) user = { "article_paragraphs": source_blocks[:7], "available_keywords": keywords, "target_url": target_url, "language": language } obj = _openai_chat_cached(cache_key, PREFERRED_OPENAI_MODEL, system, user) return obj # ========================= # Alternative anchor pipeline # ========================= def find_alternative_anchor(blocks, target_url, original_anchor): try: ctx = build_target_context_string(target_url) print(f"[Alt] Target context len={len(ctx)}") keywords = gpt_get_search_keywords_from_context(ctx, target_url) if not keywords: title, _, _, _ = get_target_context(target_url) keywords = keyword_fallback_from_title_domain(title, target_url) if not keywords: return None, None source_text = " ".join(blocks[:2]) language_name = get_language_name(detect_language(source_text)) result = gpt_generate_content_with_keyword( source_blocks=blocks, keywords=keywords, target_url=target_url, language=language_name ) if not result: return None, None chosen_keyword = result.get("chosen_keyword", keywords[0]) new_content = result.get("new_content", "") insert_after_sentence = result.get("insert_after_sentence", "") if insert_after_sentence: if len(insert_after_sentence) > 100: position_text = f"[Insert after: ...{insert_after_sentence[-80:]}]" else: position_text = f"[Insert after: {insert_after_sentence}]" else: position_text = "" return chosen_keyword, f"{position_text}\n\n{new_content}" if position_text else new_content except Exception as e: print(f"[Alt] Critical error: {e}") return None, None # ========================= # Main selector (paragraph-only, anchor-first, add-paragraph if missing) # ========================= def suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=False): """ - Pull sentences only from

tags. - If anchor exists → return the exact sentence with anchor injection. - If anchor doesn't exist → propose ADDITIONAL PARAGRAPH with [Insert after: …] marker. """ try: para_sents = _paragraph_sentences_from_html(source_url) if not para_sents: blocks = get_text_blocks(source_url) if not blocks: return [{"error": f"No text blocks found on the page: {source_url}"}] para_sents = [] for blk in blocks: for s in re.split(r'(?<=[.!?])\s+|\n+', blk): s = s.strip() if len(s) >= 10: para_sents.append(s) if not para_sents: return [{"error": f"No sentences found on the page: {source_url}"}] keyword_present = any(_sentence_contains_anchor(s, anchor_text) for s in para_sents) t_title, t_meta, t_h1, _ = get_target_context(target_url) ext = tldextract.extract(target_url) tgt_domain = ".".join([p for p in [ext.domain, ext.suffix] if p]) sent_query = f"{anchor_text} — relevant to: {t_title or t_h1} | {t_meta} ({tgt_domain})" # Use full article blocks as context for paragraph generation when needed article_blocks_ctx = get_text_blocks(source_url) or [] results = [] if keyword_present: # Use the FIRST sentence that contains the anchor (exact edit) best_sent = next(s for s in para_sents if _sentence_contains_anchor(s, anchor_text)) rewritten_sent, _ = inject_anchor_into_sentence(best_sent, anchor_text, target_url) results.append({ "anchor_was_present": True, "best_sentence_original": best_sent, "best_sentence_with_anchor": rewritten_sent, "keyword_in_article": True }) else: # Choose the best insertion point sentence by similarity try: q = embed([sent_query])[0] s_embs = embed(para_sents) sims = F.cosine_similarity(s_embs, q.repeat(len(para_sents), 1)) si = int(torch.argmax(sims).item()) insert_after_sentence = para_sents[si] except Exception as e: print(f"Sentence similarity error: {e}") insert_after_sentence = para_sents[0] # Generate a NEW PARAGRAPH (not a sentence change) with the specified anchor language_name = get_language_name(detect_language(" ".join(para_sents[:2]) or "en")) target_ctx = build_target_context_string(target_url) paragraph_html = gpt_generate_insert_paragraph( anchor_text=anchor_text, target_url=target_url, language=language_name, insert_after_sentence=insert_after_sentence, article_context=article_blocks_ctx, target_context=target_ctx ) position_text = insert_after_sentence results.append({ "anchor_was_present": False, "best_sentence_original": position_text, # we use this field as the insert-after pointer "best_sentence_with_anchor": paragraph_html, # the new paragraph HTML to add "keyword_in_article": False, "is_new_paragraph": True }) # Alternative anchor block (Result 2) if suggest_alternative: alt_anchor, alt_content = find_alternative_anchor(article_blocks_ctx, target_url, anchor_text) if alt_anchor and alt_content: results[-1]["alternative_anchor"] = alt_anchor results[-1]["alternative_sentence_original"] = "" results[-1]["alternative_sentence"] = alt_content results[-1]["alternative_exact_match"] = True return results except Exception as e: print(f"Critical error in suggest_insertions: {e}") return [{ "error": f"Error processing the page: {str(e)}", "anchor_was_present": False, "best_sentence_original": "Error occurred", "best_sentence_with_anchor": f"Error occurred. Try manually: {anchor_text}", "keyword_in_article": False }] # ========================= # Gradio UI # ========================= def run_tool(source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor): if not source_url or not target_url or not anchor_text: return "❌ Please provide Source URL, Target URL, and Anchor Text." warn = "" if looks_like_url(anchor_text) and not looks_like_url(target_url): anchor_text, target_url = target_url, anchor_text warn = "ℹ️ Detected swapped inputs. I used the URL as Target URL and the text as Anchor.\n\n" source_url = normalize_url(source_url) target_url = normalize_url(target_url) try: results = suggest_insertions(source_url, target_url, anchor_text, top_k=1, suggest_alternative=suggest_alternative_anchor) res = results[0] except Exception as e: return f"❌ Error processing the page: {str(e)}" if "error" in res: return f"❌ {res['error']}" original_sentence = res['best_sentence_original'] draft_html = res["best_sentence_with_anchor"] anchor_was_present = res.get("anchor_was_present", False) keyword_in_article = res.get("keyword_in_article", False) is_new_paragraph = res.get("is_new_paragraph", False) # Optional polish only when we are changing an existing sentence (not needed for new paragraph usually) final_html = draft_html if smart_rewrite and not is_new_paragraph and anchor_was_present: language_name = get_language_name(detect_language(original_sentence)) g = gpt_rewrite(final_html, anchor_text, target_url, language=language_name, target_context=build_target_context_string(target_url)) final_html = g["sentence_html"] final_output = to_plain_text(final_html) if plain_text else final_html if keyword_in_article and not is_new_paragraph: result = warn + f"✅ **Anchor text '{anchor_text}' found in article!**\n\n" result += "🔗 Add link here:\n\n" result += f"{final_output}" else: # NEW DEFAULT: add paragraph after a sentence result = warn + f"⚠️ **Anchor text '{anchor_text}' not found in article**\n\n" result += "🔗 Result 1 — **Add this NEW paragraph** after the sentence below:\n\n" result += f"📍 [Insert after:] {original_sentence}\n\n" result += f"{final_output}" if suggest_alternative_anchor and res.get("alternative_anchor"): alt_anchor = res["alternative_anchor"] alt_content = res.get("alternative_sentence", "") if alt_content: if "[Insert after:" in alt_content: parts = alt_content.split("\n\n", 1) position_info = parts[0] if len(parts) > 0 else "" actual_content = parts[1] if len(parts) > 1 else alt_content else: position_info = "" actual_content = alt_content alt_output = to_plain_text(actual_content) if plain_text else actual_content result += f"\n\n{'='*50}\n\n" result += "🔗 Result 2 — **Suggested new anchor & paragraph**:\n" result += f"💡 Using keyword: '{alt_anchor}'\n" if position_info and "[Insert after:" in position_info: result += f"📍 {position_info}\n" result += f"\n{alt_output}" return result def to_plain_text(html_or_text: str) -> str: text = BeautifulSoup(html_or_text, "html.parser").get_text(separator=" ", strip=True) return html.unescape(text) def clear_cache(): global EMBEDDING_CACHE, API_RESPONSE_CACHE EMBEDDING_CACHE.clear() API_RESPONSE_CACHE.clear() embed_cached.cache_clear() return "✅ Cache cleared successfully!" # Show GPT status in the header gpt_status = "ON" if OPENAI_API_KEY else "OFF" title_model = PREFERRED_OPENAI_MODEL if OPENAI_API_KEY else "OFF" with gr.Blocks(title=f"Link Insertion Helper • GPT: {gpt_status}") as demo: gr.Markdown(f"# Link Insertion Helper • GPT: {gpt_status} • Model: {title_model}") gr.Markdown("Suggests the best place to add your link with intelligent language detection and caching.") with gr.Row(): with gr.Column(): source_url = gr.Textbox(label="Source URL", placeholder="https://example.com/article") target_url = gr.Textbox(label="Target URL", placeholder="https://example.com/target") anchor_text = gr.Textbox(label="Anchor Text", placeholder="your anchor text") with gr.Row(): smart_rewrite = gr.Checkbox(label="Smart rewrite (GPT)", value=True) plain_text = gr.Checkbox(label="Plain text (no URL)", value=True) suggest_alternative_anchor = gr.Checkbox( label="Suggest alternative anchor", value=True, info="Also propose a second option with a different anchor and its own paragraph" ) with gr.Row(): submit_btn = gr.Button("Process", variant="primary") clear_cache_btn = gr.Button("Clear Cache", variant="secondary") with gr.Column(): output = gr.Textbox(label="Result", lines=14) cache_status = gr.Textbox(label="Cache Status", interactive=False) submit_btn.click( fn=run_tool, inputs=[source_url, target_url, anchor_text, smart_rewrite, plain_text, suggest_alternative_anchor], outputs=output ) clear_cache_btn.click( fn=clear_cache, outputs=cache_status ) gr.Markdown(""" ### Features: - 🧩 **Paragraph-Only Selection**: Never proposes headings; picks sentences from

tags only - 🎯 **Anchor-First**: If anchor exists, returns the exact sentence containing it - ➕ **No Anchor? Add a Paragraph**: Result 1 always gives a new paragraph with [Insert after:] - 🧠 **Target-Aware**: Uses title/meta/H1/body of the target URL for relevance - 🔄 **Alternative Anchor**: Optional Result 2 with a different anchor + ready paragraph - 🧰 Robust extraction: Trafilatura + BS4; optional Cloudflare/PDF handling """) if __name__ == "__main__": demo.launch()