Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import re | |
| import threading | |
| import random | |
| import string | |
| import hashlib | |
| from datetime import datetime | |
| from io import BytesIO | |
| import requests | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from huggingface_hub import HfApi, list_repo_files | |
| # ===================================================== | |
| # CONFIG | |
| # ===================================================== | |
| HF_SPACE_URL = ( | |
| "https://rahul7star-qwen3-4b-thinking-2509-ai-storey.hf.space/v1/chat/completions" | |
| ) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| DATASET_REPO = "rahul7star/zimg-story-factory" | |
| api = HfApi(token=HF_TOKEN) | |
| app = FastAPI(title="auto-storey") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ===================================================== | |
| # LOGGING | |
| # ===================================================== | |
| def log(msg): | |
| ts = time.strftime("%Y-%m-%d %H:%M:%S") | |
| line = f"[{ts}] {msg}" | |
| print(line) | |
| return line | |
| # ===================================================== | |
| # UNIQUE JOB ID | |
| # ===================================================== | |
| def generate_job_id(title: str = ""): | |
| ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| rand = ''.join( | |
| random.choices(string.ascii_lowercase + string.digits, k=5) | |
| ) | |
| if title: | |
| h = hashlib.md5(title.encode()).hexdigest()[:4] | |
| return f"job_{ts}_{rand}_{h}" | |
| return f"job_{ts}_{rand}" | |
| # ===================================================== | |
| # HF RESPONSE EXTRACTOR | |
| # ===================================================== | |
| def extract_llm_content(resp_json): | |
| return ( | |
| resp_json.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| .strip() | |
| ) | |
| # ===================================================== | |
| # CALL HF SPACE | |
| # ===================================================== | |
| def llama_chat(system, user): | |
| headers = {"Content-Type": "application/json"} | |
| if HF_TOKEN: | |
| headers["Authorization"] = f"Bearer {HF_TOKEN}" | |
| payload = { | |
| "messages": [ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": user}, | |
| ], | |
| "temperature": 1.05, | |
| "top_p": 0.92, | |
| "presence_penalty": 0.6, | |
| "frequency_penalty": 0.4, | |
| "stream": False, | |
| "max_tokens": 1024, | |
| } | |
| log("Sending request to HF Space") | |
| r = requests.post(HF_SPACE_URL, json=payload, headers=headers, timeout=300) | |
| r.raise_for_status() | |
| resp_json = r.json() | |
| log(f"Response received") | |
| content = extract_llm_content(resp_json) | |
| if not content: | |
| raise ValueError("Empty LLM response") | |
| return content | |
| # ===================================================== | |
| # DATASET TITLE SCAN | |
| # ===================================================== | |
| def fetch_existing_titles(dataset_repo): | |
| try: | |
| files = list_repo_files(dataset_repo, repo_type="dataset") | |
| titles = [] | |
| for f in files: | |
| if f.endswith(".md"): | |
| folder = f.split("/")[0] | |
| parts = folder.split("_", 2) | |
| if len(parts) == 3: | |
| titles.append(parts[2].lower()) | |
| return set(titles) | |
| except Exception as e: | |
| log(f"Dataset scan failed: {e}") | |
| return set() | |
| def title_similarity(a, b): | |
| a_words = set(a.split("_")) | |
| b_words = set(b.split("_")) | |
| union = len(a_words | b_words) | |
| if union == 0: | |
| return 0 | |
| return len(a_words & b_words) / union | |
| # ===================================================== | |
| # CLEAN STORY | |
| # ===================================================== | |
| def clean_generated_story(story_text, prompt): | |
| text = story_text.strip() | |
| if text.lower().startswith(prompt.lower()): | |
| text = text[len(prompt):].strip() | |
| paragraphs = [p.strip() for p in text.split("\n") if p.strip()] | |
| seen = set() | |
| unique = [] | |
| for p in paragraphs: | |
| if p not in seen: | |
| unique.append(p) | |
| seen.add(p) | |
| return "\n\n".join(unique[:3]) | |
| # ===================================================== | |
| # UNIQUE IDEA GENERATOR | |
| # ===================================================== | |
| def generate_idea(max_retries=6): | |
| existing_titles = fetch_existing_titles(DATASET_REPO) | |
| system = """ | |
| You are a highly creative children's author. | |
| Rules: | |
| - Title MUST be UNIQUE. | |
| - Title should usually be 3 words. | |
| - Avoid patterns like "The Day", "Moon", "Magic", "Adventure". | |
| - Be imaginative and unexpected. | |
| - Return STRICT JSON only. | |
| """ | |
| user = """ | |
| Invent a UNIQUE children's story idea. | |
| Return JSON: | |
| { | |
| "title":"", | |
| "character":"", | |
| "setting":"", | |
| "twist":"", | |
| "style":"cinematic storybook, kid-friendly", | |
| "author":"OhamLab" | |
| } | |
| """ | |
| for attempt in range(max_retries): | |
| log(f"Idea attempt {attempt+1}") | |
| raw = llama_chat(system, user) | |
| raw = raw.replace("```json", "").replace("```", "") | |
| match = re.search(r"\{.*\}", raw, re.S) | |
| if not match: | |
| continue | |
| idea = json.loads(match.group()) | |
| title_clean = re.sub(r"\W+", "_", idea["title"]).lower() | |
| if title_clean in existing_titles: | |
| log("Duplicate title → retry") | |
| continue | |
| similar = any( | |
| title_similarity(title_clean, t) > 0.6 | |
| for t in existing_titles | |
| ) | |
| if similar: | |
| log("Similar title → retry") | |
| continue | |
| log(f"Unique idea accepted: {idea['title']}") | |
| return idea | |
| raise RuntimeError("Failed to generate unique idea") | |
| # ===================================================== | |
| # STORY GENERATION | |
| # ===================================================== | |
| def generate_story_full(idea): | |
| prompt = ( | |
| "Write ONLY story text. EXACTLY 3 paragraphs.\n\n" | |
| f"Character: {idea['character']}\n" | |
| f"Setting: {idea['setting']}\n" | |
| f"Twist: {idea['twist']}\n" | |
| f"Style: {idea['style']}\n" | |
| ) | |
| raw_story = llama_chat( | |
| "You are an imaginative children's storyteller. Never repeat common plots.", | |
| prompt, | |
| ) | |
| return clean_generated_story(raw_story, prompt) | |
| # ===================================================== | |
| # JOB PUBLISHER | |
| # ===================================================== | |
| def publish_job(dataset_repo, folder_name, filename, | |
| title, author, theme, | |
| style_descriptor, length, email, | |
| job_id): | |
| raw_url = ( | |
| f"https://huggingface.co/datasets/" | |
| f"{dataset_repo}/resolve/main/{folder_name}/{filename}" | |
| ) | |
| job = { | |
| "job_id": job_id, | |
| "email": email, | |
| "story_url": raw_url, | |
| "repo": dataset_repo, | |
| "folder": folder_name, | |
| "title": title, | |
| "author": author, | |
| "theme": theme, | |
| "style_descriptor": style_descriptor, | |
| "length": length, | |
| "status": "NEW", | |
| "created_at": datetime.utcnow().isoformat() | |
| } | |
| job_path = "/tmp/job.json" | |
| with open(job_path, "w") as f: | |
| json.dump(job, f, indent=2) | |
| api.upload_file( | |
| path_or_fileobj=job_path, | |
| path_in_repo=f"jobs/{job_id}.json", | |
| repo_id=dataset_repo, | |
| repo_type="dataset" | |
| ) | |
| # ===================================================== | |
| # STORY PIPELINE | |
| # ===================================================== | |
| def create_story_pipeline(): | |
| log("🧠 Generating idea...") | |
| idea = generate_idea() | |
| log("✍️ Writing story...") | |
| story_text = generate_story_full(idea) | |
| title = idea["title"] | |
| job_id = generate_job_id(title) | |
| safe_title = re.sub(r"\W+", "_", title.strip()) | |
| folder_name = f"{job_id}_{safe_title}" | |
| filename = f"{safe_title}.md" | |
| api.upload_file( | |
| path_or_fileobj=BytesIO(story_text.encode()), | |
| path_in_repo=f"{folder_name}/{filename}", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset" | |
| ) | |
| publish_job( | |
| DATASET_REPO, | |
| folder_name, | |
| filename, | |
| title, | |
| idea.get("author", "OhamLab"), | |
| idea["character"], | |
| idea["style"], | |
| "Short", | |
| "auto@system.ai", | |
| job_id | |
| ) | |
| log(f"✅ Job completed: {job_id}") | |
| return { | |
| "job_id": job_id, | |
| "title": title, | |
| "idea": idea, | |
| "story_preview": story_text, | |
| } | |
| # ===================================================== | |
| # API | |
| # ===================================================== | |
| def create_story(): | |
| try: | |
| return create_story_pipeline() | |
| except Exception as e: | |
| log(f"Error: {e}") | |
| return {"error": str(e)} | |
| def health(): | |
| return {"status": "ok"} | |
| # ===================================================== | |
| # SCHEDULER | |
| # ===================================================== | |
| def scheduler(): | |
| while True: | |
| try: | |
| log("🚀 Scheduler triggered") | |
| create_story_pipeline() | |
| except Exception as e: | |
| log(f"Scheduler error: {e}") | |
| time.sleep(900) | |
| def start_scheduler(): | |
| threading.Thread(target=scheduler, daemon=True).start() | |
| # ===================================================== | |
| # LOCAL RUN | |
| # ===================================================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |