Spaces:
Sleeping
Sleeping
fix: normalize search_volume (int→str), relevance (float→int), competition (lower→Title), post_count (int→str) for LongCat API responses
3ef98d0 | import json | |
| import re | |
| def clean_json_string(text: str) -> str: | |
| if "```json" in text: | |
| text = text.split("```json")[1].split("```")[0] | |
| elif "```" in text: | |
| parts = text.split("```") | |
| if len(parts) >= 3: | |
| text = parts[1] | |
| elif len(parts) >= 2: | |
| text = parts[1] | |
| return text.strip() | |
| def repair_json(json_str: str) -> str: | |
| json_str = json_str.strip() | |
| json_str = json_str.rstrip(", ") | |
| open_braces = json_str.count("{") | |
| close_braces = json_str.count("}") | |
| open_brackets = json_str.count("[") | |
| close_brackets = json_str.count("]") | |
| if open_braces > close_braces: | |
| json_str += "}" * (open_braces - close_braces) | |
| if open_brackets > close_brackets: | |
| json_str += "]" * (open_brackets - close_brackets) | |
| return json_str | |
| def parse_and_repair(raw_text: str, max_preview=300): | |
| cleaned = clean_json_string(raw_text) | |
| try: | |
| return json.loads(cleaned), None | |
| except json.JSONDecodeError: | |
| print("[WARN] JSON Parse Error. Attempting repair with v2...") | |
| repaired = repair_json(cleaned) | |
| try: | |
| return json.loads(repaired), None | |
| except json.JSONDecodeError: | |
| try: | |
| repaired = repair_json_v2(repaired) | |
| return json.loads(repaired), None | |
| except json.JSONDecodeError as e: | |
| return None, {"error": str(e), "raw_preview": raw_text[:max_preview]} | |
| def repair_json_v2(json_str: str) -> str: | |
| json_str = json_str.strip() | |
| json_str = json_str.rstrip(", ") | |
| in_string = False | |
| escape = False | |
| brace_depth = 0 | |
| bracket_depth = 0 | |
| last_good_pos = 0 | |
| for i, ch in enumerate(json_str): | |
| if escape: | |
| escape = False | |
| continue | |
| if ch == '\\': | |
| escape = True | |
| continue | |
| if ch == '"' and not escape: | |
| in_string = not in_string | |
| continue | |
| if in_string: | |
| continue | |
| if ch == '{': | |
| brace_depth += 1 | |
| elif ch == '}': | |
| brace_depth -= 1 | |
| elif ch == '[': | |
| bracket_depth += 1 | |
| elif ch == ']': | |
| bracket_depth -= 1 | |
| if brace_depth >= 0 and bracket_depth >= 0: | |
| last_good_pos = i + 1 | |
| end = last_good_pos | |
| trimmed = json_str[:end].rstrip(", ") | |
| cb = trimmed.count("{") - trimmed.count("}") | |
| sb = trimmed.count("[") - trimmed.count("]") | |
| if cb > 0: | |
| trimmed += "}" * cb | |
| if sb > 0: | |
| trimmed += "]" * sb | |
| return trimmed | |
| def normalize_field(items, expected_type): | |
| """Convert model output to match Pydantic expectations.""" | |
| if not isinstance(items, list): | |
| return items | |
| if expected_type == "object_list": | |
| if items and isinstance(items[0], str): | |
| return [{"title": s, "expected_ctr": "Medium"} for s in items] | |
| return items | |
| if expected_type == "string_list": | |
| if items and isinstance(items[0], dict): | |
| extracted = [] | |
| for obj in items: | |
| val = obj.get("caption") or obj.get("description") or obj.get("title") or obj.get("name") or obj.get( | |
| "url") or str(obj) | |
| extracted.append(val) | |
| return extracted | |
| return items | |
| if expected_type == "keyword_list": | |
| if items and isinstance(items[0], str): | |
| return [{"keyword": s, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for s in items] | |
| return items | |
| if expected_type == "hashtag_list": | |
| if items and isinstance(items[0], str): | |
| return [{"tag": s, "post_count": "N/A"} for s in items] | |
| return items | |
| if expected_type == "caption_list": | |
| if items and isinstance(items[0], str): | |
| return [{"caption": s, "tone": "engaging"} for s in items] | |
| return items | |
| if expected_type == "pin_list": | |
| if items and isinstance(items[0], str): | |
| return [{"title": s, "description": s, "keyword_focus": s} for s in items] | |
| return items | |
| return items | |
| def normalize_response(data: dict) -> dict: | |
| """Post-process model output to match expected Pydantic schemas.""" | |
| if "video_titles" in data: | |
| data["video_titles"] = normalize_field(data["video_titles"], "object_list") | |
| if "thumbnail_ideas" in data: | |
| data["thumbnail_ideas"] = normalize_field(data["thumbnail_ideas"], "string_list") | |
| if "captions" in data: | |
| data["captions"] = normalize_field(data["captions"], "caption_list") | |
| if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict): | |
| for tier in ["small", "medium", "large"]: | |
| if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list): | |
| if data["hashtag_sets"][tier] and isinstance(data["hashtag_sets"][tier][0], dict): | |
| data["hashtag_sets"][tier] = [obj.get("tag", obj.get("name", str(obj))) for obj in | |
| data["hashtag_sets"][tier]] | |
| if "content_ideas" in data and isinstance(data["content_ideas"], dict): | |
| for key in ["reels", "carousels", "stories"]: | |
| if key in data["content_ideas"]: | |
| data["content_ideas"][key] = normalize_field(data["content_ideas"][key], "string_list") | |
| if "core_keywords" in data: | |
| data["core_keywords"] = normalize_field(data["core_keywords"], "keyword_list") | |
| # LongCat returns search_volume as int, relevance as float → fix types | |
| for kw in data["core_keywords"]: | |
| if isinstance(kw, dict): | |
| if not isinstance(kw.get("search_volume"), str): | |
| kw["search_volume"] = str(kw.get("search_volume", "Medium")) | |
| if isinstance(kw.get("relevance"), float): | |
| kw["relevance"] = int(kw["relevance"] * 100) if kw["relevance"] < 1 else int(kw["relevance"]) | |
| if kw.get("competition") is None: | |
| kw["competition"] = "Medium" | |
| elif isinstance(kw["competition"], str): | |
| kw["competition"] = kw["competition"].title() | |
| if "viral_hashtags" in data: | |
| data["viral_hashtags"] = normalize_field(data["viral_hashtags"], "hashtag_list") | |
| # LongCat returns post_count as int → convert to str | |
| for h in data["viral_hashtags"]: | |
| if isinstance(h, dict) and not isinstance(h.get("post_count"), str): | |
| h["post_count"] = str(h.get("post_count", "N/A")) | |
| if "pin_ideas" in data: | |
| data["pin_ideas"] = normalize_field(data["pin_ideas"], "pin_list") | |
| if "seo_keywords" in data: | |
| data["seo_keywords"] = normalize_field(data["seo_keywords"], "string_list") | |
| for field in ["related_phrases", "strategy_tips", "content_titles", | |
| "tags", "engagement_strategies", "growth_strategies", | |
| "article_topics", "thought_leadership_angles", | |
| "engagement_hooks", "ad_copy_suggestions", "page_growth_tips", | |
| "viral_hooks", "engagement_tactics", | |
| "trending_angles", "viral_strategies", | |
| "board_organization", "traffic_strategies", | |
| "hashtags"]: | |
| if field in data: | |
| data[field] = normalize_field(data[field], "string_list") | |
| if "post_drafts" in data: | |
| if isinstance(data["post_drafts"], dict): | |
| data["post_drafts"] = [data["post_drafts"]] | |
| elif data["post_drafts"] and isinstance(data["post_drafts"][0], str): | |
| data["post_drafts"] = [{"headline": s, "body": s, "hook": s} for s in data["post_drafts"]] | |
| if "post_ideas" in data: | |
| if isinstance(data["post_ideas"], dict): | |
| data["post_ideas"] = [data["post_ideas"]] | |
| elif data["post_ideas"] and isinstance(data["post_ideas"][0], str): | |
| data["post_ideas"] = [{"type": "text", "content": s} for s in data["post_ideas"]] | |
| if "tweet_threads" in data: | |
| if isinstance(data["tweet_threads"], dict): | |
| data["tweet_threads"] = [data["tweet_threads"]] | |
| elif data["tweet_threads"] and isinstance(data["tweet_threads"][0], str): | |
| data["tweet_threads"] = [{"tweets": [data["tweet_threads"][0]], "theme": "topic"}] | |
| if "video_concepts" in data: | |
| if isinstance(data["video_concepts"], dict): | |
| data["video_concepts"] = [data["video_concepts"]] | |
| elif data["video_concepts"] and isinstance(data["video_concepts"][0], str): | |
| data["video_concepts"] = [{"hook": s, "script_snippet": s, "sound_suggestion": "Trending"} for s in | |
| data["video_concepts"]] | |
| if "corrections" in data: | |
| if isinstance(data["corrections"], list) and data["corrections"]: | |
| if isinstance(data["corrections"][0], str): | |
| data["corrections"] = [ | |
| {"original": s, "corrected": s, "error_type": "grammar", "explanation": "Automatically corrected."} | |
| for s in data["corrections"]] | |
| elif isinstance(data["corrections"][0], dict): | |
| for c in data["corrections"]: | |
| c.setdefault("error_type", "grammar") | |
| c.setdefault("explanation", "Review and correct this.") | |
| if "issues" in data: | |
| if isinstance(data["issues"], list) and data["issues"]: | |
| if isinstance(data["issues"][0], str): | |
| data["issues"] = [{"issue_type": s, "location": "text", "suggestion": "Review this section."} for s in | |
| data["issues"]] | |
| elif isinstance(data["issues"][0], dict): | |
| for iss in data["issues"]: | |
| iss.setdefault("issue_type", "style") | |
| iss.setdefault("location", "text") | |
| iss.setdefault("suggestion", "Review this section.") | |
| if "grammar_score" in data: | |
| if not isinstance(data["grammar_score"], (int, float)): | |
| try: | |
| data["grammar_score"] = int(float(str(data["grammar_score"]))) | |
| except (ValueError, TypeError): | |
| data["grammar_score"] = 85 | |
| data["grammar_score"] = max(0, min(100, data["grammar_score"])) | |
| if "word_count" in data and not isinstance(data["word_count"], int): | |
| try: | |
| data["word_count"] = int(float(str(data["word_count"]))) | |
| except (ValueError, TypeError): | |
| data["word_count"] = 0 | |
| if "sentence_count" in data and not isinstance(data["sentence_count"], int): | |
| try: | |
| data["sentence_count"] = int(float(str(data["sentence_count"]))) | |
| except (ValueError, TypeError): | |
| data["sentence_count"] = 0 | |
| if "description_template" in data and isinstance(data["description_template"], dict): | |
| data["description_template"] = str(data["description_template"]) | |
| if "target_audience" in data and isinstance(data["target_audience"], dict): | |
| data["target_audience"] = str(data["target_audience"]) | |
| return data | |
| def cross_map_fields(data: dict) -> dict: | |
| if "video_titles" in data and "content_titles" not in data: | |
| data["content_titles"] = [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)] | |
| if "video_titles" in data and "related_phrases" not in data: | |
| titles = data.get("content_titles") or [t.get("title", str(t)) for t in data["video_titles"] if isinstance(t, dict)] | |
| data["related_phrases"] = titles[:3] | |
| if "tags" in data and "hashtags" not in data: | |
| data["hashtags"] = data["tags"] | |
| if "tags" in data and "core_keywords" not in data: | |
| data["core_keywords"] = [{"keyword": t, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["tags"]] | |
| if "engagement_strategies" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = data["engagement_strategies"] | |
| if "growth_strategies" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = data["growth_strategies"] | |
| if "hashtags" in data and "viral_hashtags" not in data: | |
| data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]] | |
| if "best_posting_time" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = [f"Post during: {data['best_posting_time']}"] | |
| # Instagram cross-maps | |
| if "hashtag_sets" in data and isinstance(data["hashtag_sets"], dict) and "hashtags" not in data: | |
| all_tags = [] | |
| for tier in ["small", "medium", "large"]: | |
| if tier in data["hashtag_sets"] and isinstance(data["hashtag_sets"][tier], list): | |
| all_tags.extend(data["hashtag_sets"][tier]) | |
| if all_tags: | |
| data["hashtags"] = all_tags | |
| if "hashtags" in data and "core_keywords" not in data: | |
| data["core_keywords"] = [{"keyword": t.lstrip("#"), "search_volume": "Medium", "competition": "Medium", "relevance": 80} for t in data["hashtags"]] | |
| if "captions" in data and "content_titles" not in data: | |
| data["content_titles"] = [c.get("caption", str(c)) for c in data["captions"] if isinstance(c, dict)] | |
| if "content_ideas" in data and isinstance(data["content_ideas"], dict) and "strategy_tips" not in data: | |
| ideas = [] | |
| for key in ["reels", "carousels", "stories"]: | |
| if key in data["content_ideas"] and isinstance(data["content_ideas"][key], list): | |
| ideas.extend(data["content_ideas"][key]) | |
| if ideas: | |
| data["strategy_tips"] = ideas | |
| # Pinterest cross-maps | |
| if "seo_keywords" in data and "core_keywords" not in data: | |
| data["core_keywords"] = [{"keyword": k, "search_volume": "Medium", "competition": "Medium", "relevance": 80} for k in data["seo_keywords"]] | |
| if "pin_ideas" in data and "content_titles" not in data: | |
| data["content_titles"] = [p.get("title", str(p)) for p in data["pin_ideas"] if isinstance(p, dict)] | |
| if "pin_ideas" in data and "related_phrases" not in data: | |
| data["related_phrases"] = [p.get("keyword_focus", p.get("description", str(p)))[:100] for p in data["pin_ideas"] if isinstance(p, dict)] | |
| if "traffic_strategies" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = data["traffic_strategies"] | |
| if "board_organization" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = data["board_organization"] | |
| if "pin_ideas" in data and "hashtags" not in data: | |
| keywords = [p.get("keyword_focus", "") for p in data["pin_ideas"] if isinstance(p, dict)] | |
| keywords = [k for k in keywords if k] | |
| if keywords: | |
| data["hashtags"] = keywords | |
| # Catch-all: if hashtags was set by any platform mapping above but viral_hashtags wasn't | |
| if "hashtags" in data and ("viral_hashtags" not in data or data.get("viral_hashtags") is None): | |
| data["viral_hashtags"] = [{"tag": h, "post_count": "N/A"} for h in data["hashtags"]] | |
| # LinkedIn cross-maps | |
| if "post_drafts" in data and "content_titles" not in data: | |
| data["content_titles"] = [d.get("headline", str(d)) for d in data["post_drafts"] if isinstance(d, dict)] | |
| if "post_drafts" in data and "related_phrases" not in data: | |
| data["related_phrases"] = [d.get("body", str(d))[:100] for d in data["post_drafts"] if isinstance(d, dict)] | |
| if "thought_leadership_angles" in data and "strategy_tips" not in data: | |
| data["strategy_tips"] = data["thought_leadership_angles"] | |
| if "engagement_prompts" in data and "related_phrases" not in data: | |
| data["related_phrases"] = data["engagement_prompts"] | |
| return data | |
| def validate_and_fill_data_defaults(data: dict, defaults: dict) -> dict: | |
| for key, default_val in defaults.items(): | |
| if key not in data or data[key] is None: | |
| data[key] = default_val | |
| return data | |
| def run_analysis(messages, defaults=None, temperature=0.3, max_new_tokens=2000): | |
| try: | |
| from services.model_router import generate_text | |
| raw = generate_text(messages, temperature, max_new_tokens) | |
| except Exception as e: | |
| print(f"[generate_text error] {e}") | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "AI model temporarily unavailable. Please try again." | |
| return base | |
| if raw is None: | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "Model failed to load on server startup. Check logs." | |
| return base | |
| data, err = parse_and_repair(raw) | |
| if err: | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "The AI response was malformed. Please try again." | |
| return base | |
| if not isinstance(data, dict): | |
| print(f"[run_analysis] Expected dict, got {type(data).__name__}") | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "The AI response was malformed. Please try again." | |
| return base | |
| try: | |
| data = normalize_response(data) | |
| except Exception as e: | |
| print(f"[normalize_response error] {e}") | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "Failed to process response. Please try again." | |
| return base | |
| if defaults: | |
| try: | |
| data = validate_and_fill_data_defaults(data, defaults) | |
| data = cross_map_fields(data) | |
| return data | |
| except Exception as e: | |
| print(f"[validate error] {e}") | |
| base = dict(defaults) if defaults else {} | |
| base["error"] = "Failed to validate response. Please try again." | |
| return base | |
| data = cross_map_fields(data) | |
| return data |