ihtesham0345's picture
fix: normalize search_volume (int→str), relevance (float→int), competition (lower→Title), post_count (int→str) for LongCat API responses
3ef98d0
Raw
History Blame Contribute Delete
17.6 kB
import json
import re
def clean_json_string(text: str) -> str:
if "```json" in text:
text = text.split("```json")[1].split("```")[0]
elif "```" in text:
parts = text.split("```")
if len(parts) >= 3:
text = parts[1]
elif len(parts) >= 2:
text = parts[1]
return text.strip()
def repair_json(json_str: str) -> str:
json_str = json_str.strip()
json_str = json_str.rstrip(", ")
open_braces = json_str.count("{")
close_braces = json_str.count("}")
open_brackets = json_str.count("[")
close_brackets = json_str.count("]")
if open_braces > close_braces:
json_str += "}" * (open_braces - close_braces)
if open_brackets > close_brackets:
json_str += "]" * (open_brackets - close_brackets)
return json_str
def parse_and_repair(raw_text: str, max_preview=300):
cleaned = clean_json_string(raw_text)
try:
return json.loads(cleaned), None
except json.JSONDecodeError:
print("[WARN] JSON Parse Error. Attempting repair with v2...")
repaired = repair_json(cleaned)
try:
return json.loads(repaired), None
except json.JSONDecodeError:
try:
repaired = repair_json_v2(repaired)
return json.loads(repaired), None
except json.JSONDecodeError as e:
return None, {"error": str(e), "raw_preview": raw_text[:max_preview]}
def repair_json_v2(json_str: str) -> str:
json_str = json_str.strip()
json_str = json_str.rstrip(", ")
in_string = False
escape = False
brace_depth = 0
bracket_depth = 0
last_good_pos = 0
for i, ch in enumerate(json_str):
if escape:
escape = False
continue
if ch == '\\':
escape = True
continue
if ch == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if ch == '{':
brace_depth += 1
elif ch == '}':
brace_depth -= 1
elif ch == '[':
bracket_depth += 1
elif ch == ']':
bracket_depth -= 1
if brace_depth >= 0 and bracket_depth >= 0:
last_good_pos = i + 1
end = last_good_pos
trimmed = json_str[:end].rstrip(", ")
cb = trimmed.count("{") - trimmed.count("}")
sb = trimmed.count("[") - trimmed.count("]")
if cb > 0:
trimmed += "}" * cb
if sb > 0:
trimmed += "]" * sb
return trimmed
def normalize_field(items, expected_type):
"""Convert model output to match Pydantic expectations."""
if not isinstance(items, list):
return items
if expected_type == "object_list":
if items and isinstance(items[0], str):
return [{"title": s, "expected_ctr": "Medium"} for s in items]
return items
if expected_type == "string_list":
if items and isinstance(items[0], dict):
extracted = []
for obj in items:
val = obj.get("caption") or obj.get("description") or obj.get("title") or obj.get("name") or obj.get(
"url") or str(obj)
extracted.append(val)
return extracted
return items
if expected_type == "keyword_list":
if items and isinstance(items[0], str):
return [{"keyword": s, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for s in items]
return items
if expected_type == "hashtag_list":
if items and isinstance(items[0], str):
return [{"tag": s, "post_count": "N/A"} for s in items]
return items
if expected_type == "caption_list":
if items and isinstance(items[0], str):
return [{"caption": s, "tone": "engaging"} for s in items]
return items
if expected_type == "pin_list":
if items and isinstance(items[0], str):
return [{"title": s, "description": s, "keyword_focus": s} for s in items]
return items
return items
def normalize_response(data: dict) -> dict:
"""Post-process model output to match expected Pydantic schemas."""
if "video_titles" in data:
data["video_titles"] = normalize_field(data["video_titles"], "object_list")
if "thumbnail_ideas" in data:
data["thumbnail_ideas"] = normalize_field(data["thumbnail_ideas"], "string_list")
if "captions" in data:
data["captions"] = normalize_field(data["captions"], "caption_list")
if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict):
for tier in ["small", "medium", "large"]:
if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list):
if data["hashtag_sets"][tier] and isinstance(data["hashtag_sets"][tier][0], dict):
data["hashtag_sets"][tier] = [obj.get("tag", obj.get("name", str(obj))) for obj in
data["hashtag_sets"][tier]]
if "content_ideas" in data and isinstance(data["content_ideas"], dict):
for key in ["reels", "carousels", "stories"]:
if key in data["content_ideas"]:
data["content_ideas"][key] = normalize_field(data["content_ideas"][key], "string_list")
if "core_keywords" in data:
data["core_keywords"] = normalize_field(data["core_keywords"], "keyword_list")
# LongCat returns search_volume as int, relevance as float → fix types
for kw in data["core_keywords"]:
if isinstance(kw, dict):
if not isinstance(kw.get("search_volume"), str):
kw["search_volume"] = str(kw.get("search_volume", "Medium"))
if isinstance(kw.get("relevance"), float):
kw["relevance"] = int(kw["relevance"] * 100) if kw["relevance"] < 1 else int(kw["relevance"])
if kw.get("competition") is None:
kw["competition"] = "Medium"
elif isinstance(kw["competition"], str):
kw["competition"] = kw["competition"].title()
if "viral_hashtags" in data:
data["viral_hashtags"] = normalize_field(data["viral_hashtags"], "hashtag_list")
# LongCat returns post_count as int → convert to str
for h in data["viral_hashtags"]:
if isinstance(h, dict) and not isinstance(h.get("post_count"), str):
h["post_count"] = str(h.get("post_count", "N/A"))
if "pin_ideas" in data:
data["pin_ideas"] = normalize_field(data["pin_ideas"], "pin_list")
if "seo_keywords" in data:
data["seo_keywords"] = normalize_field(data["seo_keywords"], "string_list")
for field in ["related_phrases", "strategy_tips", "content_titles",
"tags", "engagement_strategies", "growth_strategies",
"article_topics", "thought_leadership_angles",
"engagement_hooks", "ad_copy_suggestions", "page_growth_tips",
"viral_hooks", "engagement_tactics",
"trending_angles", "viral_strategies",
"board_organization", "traffic_strategies",
"hashtags"]:
if field in data:
data[field] = normalize_field(data[field], "string_list")
if "post_drafts" in data:
if isinstance(data["post_drafts"], dict):
data["post_drafts"] = [data["post_drafts"]]
elif data["post_drafts"] and isinstance(data["post_drafts"][0], str):
data["post_drafts"] = [{"headline": s, "body": s, "hook": s} for s in data["post_drafts"]]
if "post_ideas" in data:
if isinstance(data["post_ideas"], dict):
data["post_ideas"] = [data["post_ideas"]]
elif data["post_ideas"] and isinstance(data["post_ideas"][0], str):
data["post_ideas"] = [{"type": "text", "content": s} for s in data["post_ideas"]]
if "tweet_threads" in data:
if isinstance(data["tweet_threads"], dict):
data["tweet_threads"] = [data["tweet_threads"]]
elif data["tweet_threads"] and isinstance(data["tweet_threads"][0], str):
data["tweet_threads"] = [{"tweets": [data["tweet_threads"][0]], "theme": "topic"}]
if "video_concepts" in data:
if isinstance(data["video_concepts"], dict):
data["video_concepts"] = [data["video_concepts"]]
elif data["video_concepts"] and isinstance(data["video_concepts"][0], str):
data["video_concepts"] = [{"hook": s, "script_snippet": s, "sound_suggestion": "Trending"} for s in
data["video_concepts"]]
if "corrections" in data:
if isinstance(data["corrections"], list) and data["corrections"]:
if isinstance(data["corrections"][0], str):
data["corrections"] = [
{"original": s, "corrected": s, "error_type": "grammar", "explanation": "Automatically corrected."}
for s in data["corrections"]]
elif isinstance(data["corrections"][0], dict):
for c in data["corrections"]:
c.setdefault("error_type", "grammar")
c.setdefault("explanation", "Review and correct this.")
if "issues" in data:
if isinstance(data["issues"], list) and data["issues"]:
if isinstance(data["issues"][0], str):
data["issues"] = [{"issue_type": s, "location": "text", "suggestion": "Review this section."} for s in
data["issues"]]
elif isinstance(data["issues"][0], dict):
for iss in data["issues"]:
iss.setdefault("issue_type", "style")
iss.setdefault("location", "text")
iss.setdefault("suggestion", "Review this section.")
if "grammar_score" in data:
if not isinstance(data["grammar_score"], (int, float)):
try:
data["grammar_score"] = int(float(str(data["grammar_score"])))
except (ValueError, TypeError):
data["grammar_score"] = 85
data["grammar_score"] = max(0, min(100, data["grammar_score"]))
if "word_count" in data and not isinstance(data["word_count"], int):
try:
data["word_count"] = int(float(str(data["word_count"])))
except (ValueError, TypeError):
data["word_count"] = 0
if "sentence_count" in data and not isinstance(data["sentence_count"], int):
try:
data["sentence_count"] = int(float(str(data["sentence_count"])))
except (ValueError, TypeError):
data["sentence_count"] = 0
if "description_template" in data and isinstance(data["description_template"], dict):
data["description_template"] = str(data["description_template"])
if "target_audience" in data and isinstance(data["target_audience"], dict):
data["target_audience"] = str(data["target_audience"])
return data
def cross_map_fields(data: dict) -> dict:
if "video_titles" in data and "content_titles" not in data:
data["content_titles"] = [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)]
if "video_titles" in data and "related_phrases" not in data:
titles = data.get("content_titles") or [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)]
data["related_phrases"] = titles[:3]
if "tags" in data and "hashtags" not in data:
data["hashtags"] = data["tags"]
if "tags" in data and "core_keywords" not in data:
data["core_keywords"] = [{"keyword": t, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["tags"]]
if "engagement_strategies" in data and "strategy_tips" not in data:
data["strategy_tips"] = data["engagement_strategies"]
if "growth_strategies" in data and "strategy_tips" not in data:
data["strategy_tips"] = data["growth_strategies"]
if "hashtags" in data and "viral_hashtags" not in data:
data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]]
if "best_posting_time" in data and "strategy_tips" not in data:
data["strategy_tips"] = [f"Post during: {data['best_posting_time']}"]
# Instagram cross-maps
if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict) and "hashtags" not in data:
all_tags = []
for tier in ["small", "medium", "large"]:
if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list):
all_tags.extend(data["hashtag_sets"][tier])
if all_tags:
data["hashtags"] = all_tags
if "hashtags" in data and "core_keywords" not in data:
data["core_keywords"] = [{"keyword": t.lstrip("#"), "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["hashtags"]]
if "captions" in data and "content_titles" not in data:
data["content_titles"] = [c.get("caption", str(c)) for c in data["captions"] if isinstance(c, dict)]
if "content_ideas" in data and isinstance(data["content_ideas"], dict) and "strategy_tips" not in data:
ideas = []
for key in ["reels", "carousels", "stories"]:
if key in data["content_ideas"] and isinstance(data["content_ideas"][key], list):
ideas.extend(data["content_ideas"][key])
if ideas:
data["strategy_tips"] = ideas
# Pinterest cross-maps
if "seo_keywords" in data and "core_keywords" not in data:
data["core_keywords"] = [{"keyword": k, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for k in data["seo_keywords"]]
if "pin_ideas" in data and "content_titles" not in data:
data["content_titles"] = [p.get("title", str(p)) for p in data["pin_ideas"] if isinstance(p, dict)]
if "pin_ideas" in data and "related_phrases" not in data:
data["related_phrases"] = [p.get("keyword_focus", p.get("description", str(p)))[:100] for p in data["pin_ideas"] if isinstance(p, dict)]
if "traffic_strategies" in data and "strategy_tips" not in data:
data["strategy_tips"] = data["traffic_strategies"]
if "board_organization" in data and "strategy_tips" not in data:
data["strategy_tips"] = data["board_organization"]
if "pin_ideas" in data and "hashtags" not in data:
keywords = [p.get("keyword_focus", "") for p in data["pin_ideas"] if isinstance(p, dict)]
keywords = [k for k in keywords if k]
if keywords:
data["hashtags"] = keywords
# Catch-all: if hashtags was set by any platform mapping above but viral_hashtags wasn't
if "hashtags" in data and ("viral_hashtags" not in data or data.get("viral_hashtags") is None):
data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]]
# LinkedIn cross-maps
if "post_drafts" in data and "content_titles" not in data:
data["content_titles"] = [d.get("headline", str(d)) for d in data["post_drafts"] if isinstance(d, dict)]
if "post_drafts" in data and "related_phrases" not in data:
data["related_phrases"] = [d.get("body", str(d))[:100] for d in data["post_drafts"] if isinstance(d, dict)]
if "thought_leadership_angles" in data and "strategy_tips" not in data:
data["strategy_tips"] = data["thought_leadership_angles"]
if "engagement_prompts" in data and "related_phrases" not in data:
data["related_phrases"] = data["engagement_prompts"]
return data
def validate_and_fill_data_defaults(data: dict, defaults: dict) -> dict:
for key, default_val in defaults.items():
if key not in data or data[key] is None:
data[key] = default_val
return data
def run_analysis(messages, defaults=None, temperature=0.3, max_new_tokens=2000):
try:
from services.model_router import generate_text
raw = generate_text(messages, temperature, max_new_tokens)
except Exception as e:
print(f"[generate_text error] {e}")
base = dict(defaults) if defaults else {}
base["error"] = "AI model temporarily unavailable. Please try again."
return base
if raw is None:
base = dict(defaults) if defaults else {}
base["error"] = "Model failed to load on server startup. Check logs."
return base
data, err = parse_and_repair(raw)
if err:
base = dict(defaults) if defaults else {}
base["error"] = "The AI response was malformed. Please try again."
return base
if not isinstance(data, dict):
print(f"[run_analysis] Expected dict, got {type(data).__name__}")
base = dict(defaults) if defaults else {}
base["error"] = "The AI response was malformed. Please try again."
return base
try:
data = normalize_response(data)
except Exception as e:
print(f"[normalize_response error] {e}")
base = dict(defaults) if defaults else {}
base["error"] = "Failed to process response. Please try again."
return base
if defaults:
try:
data = validate_and_fill_data_defaults(data, defaults)
data = cross_map_fields(data)
return data
except Exception as e:
print(f"[validate error] {e}")
base = dict(defaults) if defaults else {}
base["error"] = "Failed to validate response. Please try again."
return base
data = cross_map_fields(data)
return data