import json import re def clean_json_string(text: str) -> str: if "```json" in text: text = text.split("```json")[1].split("```")[0] elif "```" in text: parts = text.split("```") if len(parts) >= 3: text = parts[1] elif len(parts) >= 2: text = parts[1] return text.strip() def repair_json(json_str: str) -> str: json_str = json_str.strip() json_str = json_str.rstrip(", ") open_braces = json_str.count("{") close_braces = json_str.count("}") open_brackets = json_str.count("[") close_brackets = json_str.count("]") if open_braces > close_braces: json_str += "}" * (open_braces - close_braces) if open_brackets > close_brackets: json_str += "]" * (open_brackets - close_brackets) return json_str def parse_and_repair(raw_text: str, max_preview=300): cleaned = clean_json_string(raw_text) try: return json.loads(cleaned), None except json.JSONDecodeError: print("[WARN] JSON Parse Error. Attempting repair with v2...") repaired = repair_json(cleaned) try: return json.loads(repaired), None except json.JSONDecodeError: try: repaired = repair_json_v2(repaired) return json.loads(repaired), None except json.JSONDecodeError as e: return None, {"error": str(e), "raw_preview": raw_text[:max_preview]} def repair_json_v2(json_str: str) -> str: json_str = json_str.strip() json_str = json_str.rstrip(", ") in_string = False escape = False brace_depth = 0 bracket_depth = 0 last_good_pos = 0 for i, ch in enumerate(json_str): if escape: escape = False continue if ch == '\\': escape = True continue if ch == '"' and not escape: in_string = not in_string continue if in_string: continue if ch == '{': brace_depth += 1 elif ch == '}': brace_depth -= 1 elif ch == '[': bracket_depth += 1 elif ch == ']': bracket_depth -= 1 if brace_depth >= 0 and bracket_depth >= 0: last_good_pos = i + 1 end = last_good_pos trimmed = json_str[:end].rstrip(", ") cb = trimmed.count("{") - trimmed.count("}") sb = trimmed.count("[") - trimmed.count("]") if cb > 0: trimmed += "}" * cb if sb > 0: trimmed += "]" * sb return trimmed def normalize_field(items, expected_type): """Convert model output to match Pydantic expectations.""" if not isinstance(items, list): return items if expected_type == "object_list": if items and isinstance(items[0], str): return [{"title": s, "expected_ctr": "Medium"} for s in items] return items if expected_type == "string_list": if items and isinstance(items[0], dict): extracted = [] for obj in items: val = obj.get("caption") or obj.get("description") or obj.get("title") or obj.get("name") or obj.get( "url") or str(obj) extracted.append(val) return extracted return items if expected_type == "keyword_list": if items and isinstance(items[0], str): return [{"keyword": s, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for s in items] return items if expected_type == "hashtag_list": if items and isinstance(items[0], str): return [{"tag": s, "post_count": "N/A"} for s in items] return items if expected_type == "caption_list": if items and isinstance(items[0], str): return [{"caption": s, "tone": "engaging"} for s in items] return items if expected_type == "pin_list": if items and isinstance(items[0], str): return [{"title": s, "description": s, "keyword_focus": s} for s in items] return items return items def normalize_response(data: dict) -> dict: """Post-process model output to match expected Pydantic schemas.""" if "video_titles" in data: data["video_titles"] = normalize_field(data["video_titles"], "object_list") if "thumbnail_ideas" in data: data["thumbnail_ideas"] = normalize_field(data["thumbnail_ideas"], "string_list") if "captions" in data: data["captions"] = normalize_field(data["captions"], "caption_list") if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict): for tier in ["small", "medium", "large"]: if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list): if data["hashtag_sets"][tier] and isinstance(data["hashtag_sets"][tier][0], dict): data["hashtag_sets"][tier] = [obj.get("tag", obj.get("name", str(obj))) for obj in data["hashtag_sets"][tier]] if "content_ideas" in data and isinstance(data["content_ideas"], dict): for key in ["reels", "carousels", "stories"]: if key in data["content_ideas"]: data["content_ideas"][key] = normalize_field(data["content_ideas"][key], "string_list") if "core_keywords" in data: data["core_keywords"] = normalize_field(data["core_keywords"], "keyword_list") # LongCat returns search_volume as int, relevance as float → fix types for kw in data["core_keywords"]: if isinstance(kw, dict): if not isinstance(kw.get("search_volume"), str): kw["search_volume"] = str(kw.get("search_volume", "Medium")) if isinstance(kw.get("relevance"), float): kw["relevance"] = int(kw["relevance"] * 100) if kw["relevance"] < 1 else int(kw["relevance"]) if kw.get("competition") is None: kw["competition"] = "Medium" elif isinstance(kw["competition"], str): kw["competition"] = kw["competition"].title() if "viral_hashtags" in data: data["viral_hashtags"] = normalize_field(data["viral_hashtags"], "hashtag_list") # LongCat returns post_count as int → convert to str for h in data["viral_hashtags"]: if isinstance(h, dict) and not isinstance(h.get("post_count"), str): h["post_count"] = str(h.get("post_count", "N/A")) if "pin_ideas" in data: data["pin_ideas"] = normalize_field(data["pin_ideas"], "pin_list") if "seo_keywords" in data: data["seo_keywords"] = normalize_field(data["seo_keywords"], "string_list") for field in ["related_phrases", "strategy_tips", "content_titles", "tags", "engagement_strategies", "growth_strategies", "article_topics", "thought_leadership_angles", "engagement_hooks", "ad_copy_suggestions", "page_growth_tips", "viral_hooks", "engagement_tactics", "trending_angles", "viral_strategies", "board_organization", "traffic_strategies", "hashtags"]: if field in data: data[field] = normalize_field(data[field], "string_list") if "post_drafts" in data: if isinstance(data["post_drafts"], dict): data["post_drafts"] = [data["post_drafts"]] elif data["post_drafts"] and isinstance(data["post_drafts"][0], str): data["post_drafts"] = [{"headline": s, "body": s, "hook": s} for s in data["post_drafts"]] if "post_ideas" in data: if isinstance(data["post_ideas"], dict): data["post_ideas"] = [data["post_ideas"]] elif data["post_ideas"] and isinstance(data["post_ideas"][0], str): data["post_ideas"] = [{"type": "text", "content": s} for s in data["post_ideas"]] if "tweet_threads" in data: if isinstance(data["tweet_threads"], dict): data["tweet_threads"] = [data["tweet_threads"]] elif data["tweet_threads"] and isinstance(data["tweet_threads"][0], str): data["tweet_threads"] = [{"tweets": [data["tweet_threads"][0]], "theme": "topic"}] if "video_concepts" in data: if isinstance(data["video_concepts"], dict): data["video_concepts"] = [data["video_concepts"]] elif data["video_concepts"] and isinstance(data["video_concepts"][0], str): data["video_concepts"] = [{"hook": s, "script_snippet": s, "sound_suggestion": "Trending"} for s in data["video_concepts"]] if "corrections" in data: if isinstance(data["corrections"], list) and data["corrections"]: if isinstance(data["corrections"][0], str): data["corrections"] = [ {"original": s, "corrected": s, "error_type": "grammar", "explanation": "Automatically corrected."} for s in data["corrections"]] elif isinstance(data["corrections"][0], dict): for c in data["corrections"]: c.setdefault("error_type", "grammar") c.setdefault("explanation", "Review and correct this.") if "issues" in data: if isinstance(data["issues"], list) and data["issues"]: if isinstance(data["issues"][0], str): data["issues"] = [{"issue_type": s, "location": "text", "suggestion": "Review this section."} for s in data["issues"]] elif isinstance(data["issues"][0], dict): for iss in data["issues"]: iss.setdefault("issue_type", "style") iss.setdefault("location", "text") iss.setdefault("suggestion", "Review this section.") if "grammar_score" in data: if not isinstance(data["grammar_score"], (int, float)): try: data["grammar_score"] = int(float(str(data["grammar_score"]))) except (ValueError, TypeError): data["grammar_score"] = 85 data["grammar_score"] = max(0, min(100, data["grammar_score"])) if "word_count" in data and not isinstance(data["word_count"], int): try: data["word_count"] = int(float(str(data["word_count"]))) except (ValueError, TypeError): data["word_count"] = 0 if "sentence_count" in data and not isinstance(data["sentence_count"], int): try: data["sentence_count"] = int(float(str(data["sentence_count"]))) except (ValueError, TypeError): data["sentence_count"] = 0 if "description_template" in data and isinstance(data["description_template"], dict): data["description_template"] = str(data["description_template"]) if "target_audience" in data and isinstance(data["target_audience"], dict): data["target_audience"] = str(data["target_audience"]) return data def cross_map_fields(data: dict) -> dict: if "video_titles" in data and "content_titles" not in data: data["content_titles"] = [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)] if "video_titles" in data and "related_phrases" not in data: titles = data.get("content_titles") or [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)] data["related_phrases"] = titles[:3] if "tags" in data and "hashtags" not in data: data["hashtags"] = data["tags"] if "tags" in data and "core_keywords" not in data: data["core_keywords"] = [{"keyword": t, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["tags"]] if "engagement_strategies" in data and "strategy_tips" not in data: data["strategy_tips"] = data["engagement_strategies"] if "growth_strategies" in data and "strategy_tips" not in data: data["strategy_tips"] = data["growth_strategies"] if "hashtags" in data and "viral_hashtags" not in data: data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]] if "best_posting_time" in data and "strategy_tips" not in data: data["strategy_tips"] = [f"Post during: {data['best_posting_time']}"] # Instagram cross-maps if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict) and "hashtags" not in data: all_tags = [] for tier in ["small", "medium", "large"]: if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list): all_tags.extend(data["hashtag_sets"][tier]) if all_tags: data["hashtags"] = all_tags if "hashtags" in data and "core_keywords" not in data: data["core_keywords"] = [{"keyword": t.lstrip("#"), "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["hashtags"]] if "captions" in data and "content_titles" not in data: data["content_titles"] = [c.get("caption", str(c)) for c in data["captions"] if isinstance(c, dict)] if "content_ideas" in data and isinstance(data["content_ideas"], dict) and "strategy_tips" not in data: ideas = [] for key in ["reels", "carousels", "stories"]: if key in data["content_ideas"] and isinstance(data["content_ideas"][key], list): ideas.extend(data["content_ideas"][key]) if ideas: data["strategy_tips"] = ideas # Pinterest cross-maps if "seo_keywords" in data and "core_keywords" not in data: data["core_keywords"] = [{"keyword": k, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for k in data["seo_keywords"]] if "pin_ideas" in data and "content_titles" not in data: data["content_titles"] = [p.get("title", str(p)) for p in data["pin_ideas"] if isinstance(p, dict)] if "pin_ideas" in data and "related_phrases" not in data: data["related_phrases"] = [p.get("keyword_focus", p.get("description", str(p)))[:100] for p in data["pin_ideas"] if isinstance(p, dict)] if "traffic_strategies" in data and "strategy_tips" not in data: data["strategy_tips"] = data["traffic_strategies"] if "board_organization" in data and "strategy_tips" not in data: data["strategy_tips"] = data["board_organization"] if "pin_ideas" in data and "hashtags" not in data: keywords = [p.get("keyword_focus", "") for p in data["pin_ideas"] if isinstance(p, dict)] keywords = [k for k in keywords if k] if keywords: data["hashtags"] = keywords # Catch-all: if hashtags was set by any platform mapping above but viral_hashtags wasn't if "hashtags" in data and ("viral_hashtags" not in data or data.get("viral_hashtags") is None): data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]] # LinkedIn cross-maps if "post_drafts" in data and "content_titles" not in data: data["content_titles"] = [d.get("headline", str(d)) for d in data["post_drafts"] if isinstance(d, dict)] if "post_drafts" in data and "related_phrases" not in data: data["related_phrases"] = [d.get("body", str(d))[:100] for d in data["post_drafts"] if isinstance(d, dict)] if "thought_leadership_angles" in data and "strategy_tips" not in data: data["strategy_tips"] = data["thought_leadership_angles"] if "engagement_prompts" in data and "related_phrases" not in data: data["related_phrases"] = data["engagement_prompts"] return data def validate_and_fill_data_defaults(data: dict, defaults: dict) -> dict: for key, default_val in defaults.items(): if key not in data or data[key] is None: data[key] = default_val return data def run_analysis(messages, defaults=None, temperature=0.3, max_new_tokens=2000): try: from services.model_router import generate_text raw = generate_text(messages, temperature, max_new_tokens) except Exception as e: print(f"[generate_text error] {e}") base = dict(defaults) if defaults else {} base["error"] = "AI model temporarily unavailable. Please try again." return base if raw is None: base = dict(defaults) if defaults else {} base["error"] = "Model failed to load on server startup. Check logs." return base data, err = parse_and_repair(raw) if err: base = dict(defaults) if defaults else {} base["error"] = "The AI response was malformed. Please try again." return base if not isinstance(data, dict): print(f"[run_analysis] Expected dict, got {type(data).__name__}") base = dict(defaults) if defaults else {} base["error"] = "The AI response was malformed. Please try again." return base try: data = normalize_response(data) except Exception as e: print(f"[normalize_response error] {e}") base = dict(defaults) if defaults else {} base["error"] = "Failed to process response. Please try again." return base if defaults: try: data = validate_and_fill_data_defaults(data, defaults) data = cross_map_fields(data) return data except Exception as e: print(f"[validate error] {e}") base = dict(defaults) if defaults else {} base["error"] = "Failed to validate response. Please try again." return base data = cross_map_fields(data) return data