rahul7star's picture
Update app.py
be7437d verified
import os
import time
import json
import re
import threading
import requests
from io import BytesIO
from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from huggingface_hub import HfApi
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from debug import router as debug_router
# =====================================================
# CONFIG
# =====================================================
HF_SPACE_URL = "https://rahul7star-qwen3-4b-thinking-2509-ai-storey.hf.space/v1/chat/completions"
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO = "rahul7star/ltx-story-factory"
api = HfApi(token=HF_TOKEN)
app = FastAPI(title="auto-storey")
app.include_router(debug_router)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# =====================================================
# HTTP SESSION (🔥 FIX FOR TIMEOUT + RETRIES)
# =====================================================
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
# =====================================================
# LOGGING
# =====================================================
def log(msg):
ts = time.strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
return line
# =====================================================
# RESPONSE PARSER
# =====================================================
def extract_llm_content(resp_json: dict) -> str:
try:
return (
resp_json.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
.strip()
)
except Exception:
return ""
# =====================================================
# HF SPACE CALL (🔥 FIXED TIMEOUT LOGIC)
# =====================================================
def llama_chat(system: str, user: str) -> str:
headers = {"Content-Type": "application/json"}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
payload = {
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"temperature": 0.8,
"stream": False,
"max_tokens": 1024,
}
log("🚀 Calling HF Space...")
last_error = None
for attempt in range(3):
try:
r = session.post(
HF_SPACE_URL,
json=payload,
headers=headers,
timeout=(10, 380), # ✅ FIX: connect=10s, read=180s
)
r.raise_for_status()
data = r.json()
content = extract_llm_content(data)
if not content:
raise ValueError("Empty LLM response")
log(f"✅ LLM success (attempt {attempt+1})")
return content
except Exception as e:
last_error = e
log(f"⚠️ Attempt {attempt+1} failed: {e}")
time.sleep(2)
raise RuntimeError(f"HF Space failed after retries: {last_error}")
# =====================================================
# STORY CLEANER
# =====================================================
import re
def clean_generated_story(story_text, prompt):
text = story_text.strip()
# --- Remove prompt if echoed ---
if text.lower().startswith(prompt.lower()):
text = text[len(prompt):].strip()
# --- Normalize spaces/newlines ---
text = re.sub(r'\s+', ' ', text)
# --- Split into words ---
words = text.split()
total_words = len(words)
if total_words == 0:
return ""
# --- Split into 3 roughly equal parts ---
chunk_size = total_words // 3
para1 = words[:chunk_size]
para2 = words[chunk_size:2 * chunk_size]
para3 = words[2 * chunk_size:]
# --- Join back to paragraphs ---
paragraphs = [
" ".join(para1).strip(),
" ".join(para2).strip(),
" ".join(para3).strip(),
]
return "\n\n".join(paragraphs)
def clean_generated_story0(story_text, prompt):
text = story_text.strip()
if text.lower().startswith(prompt.lower()):
text = text[len(prompt):].strip()
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
seen = set()
unique = []
for p in paragraphs:
if p not in seen:
unique.append(p)
seen.add(p)
return "\n\n".join(unique[:3])
# =====================================================
# IDEA GENERATION
# =====================================================
styles = [
"cinematic storybook, kid-friendly",
"anime storybook",
"dreamy watercolor adventure",
"glimmering fairy tale style",
]
def generate_idea() -> dict:
now = datetime.now().hour
if 5 <= now < 12:
time_context = "Morning, full of light and fresh beginnings."
elif 12 <= now < 17:
time_context = "Afternoon, warm and adventurous."
elif 17 <= now < 21:
time_context = "Evening, magical golden glow."
else:
time_context = "Night, mysterious and dreamy."
style_choice = styles[int(time.time()) % len(styles)]
system_prompt = "You are a creative children's story author. Return STRICT JSON only."
user_prompt = f"""
Create a UNIQUE children's story idea.
Context:
{time_context}
Rules:
- Title max 3 words
- Fully original character (never repeat names like Luna)
- Magical setting
- Positive twist
- Style: {style_choice}
Return JSON only:
{{
"title": "",
"character": "",
"setting": "",
"twist": "",
"style": "{style_choice}",
"author": "OhamLab"
}}
"""
raw = llama_chat(system_prompt, user_prompt)
raw = raw.replace("```json", "").replace("```", "")
match = re.search(r"\{.*\}", raw, re.S)
if not match:
raise ValueError(f"Invalid JSON: {raw}")
idea = json.loads(match.group())
log(f"🧠 Idea: {idea}")
return idea
# =====================================================
# STORY GENERATION
# =====================================================
def generate_story_full(idea: dict):
prompt = f"""
Write a 100 word children's creative story.
Character: {idea['character']}
Setting: {idea['setting']}
Twist: {idea['twist']}
Style: {idea['style']}
Must be:
- magical
- emotional
- happy ending
"""
raw_story = llama_chat(
"You are a professional children's story writer.",
prompt,
)
return clean_generated_story(raw_story, prompt)
# =====================================================
# UPLOAD + JOB
# =====================================================
def publish_job(dataset_repo, folder_name, filename, title, author, theme, style, length, email, job_id):
raw_url = f"https://huggingface.co/datasets/{dataset_repo}/resolve/main/{folder_name}/{filename}"
job = {
"job_id": job_id,
"email": email,
"story_url": raw_url,
"repo": dataset_repo,
"folder": folder_name,
"title": title,
"author": author,
"theme": theme,
"style_descriptor": style,
"length": length,
"status": "NEW",
"created_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
job_path = "/tmp/job.json"
with open(job_path, "w") as f:
json.dump(job, f, indent=2)
api.upload_file(
path_or_fileobj=job_path,
path_in_repo=f"jobs/{job_id}.json",
repo_id=dataset_repo,
repo_type="dataset",
)
return job_id
# =====================================================
# PIPELINE
# =====================================================
def create_story_pipeline():
log("🧠 Generating idea...")
idea = generate_idea()
log("✍️ Writing story...")
story = generate_story_full(idea)
job_id = f"job_{int(time.time())}"
title = idea["title"]
safe = re.sub(r"\W+", "_", title)
folder = f"{job_id}_{safe}"
filename = f"{safe}.md"
api.upload_file(
path_or_fileobj=BytesIO(story.encode()),
path_in_repo=f"{folder}/{filename}",
repo_id=DATASET_REPO,
repo_type="dataset",
)
log("📤 Story uploaded")
publish_job(
DATASET_REPO,
folder,
filename,
title,
"OhamLab",
idea["character"],
idea["style"],
"short",
"auto@system.ai",
job_id,
)
log(f"✅ DONE: {job_id}")
return {"job_id": job_id, "title": title, "idea": idea, "story": story}
# =====================================================
# API
# =====================================================
@app.post("/create-story")
def create_story():
try:
return create_story_pipeline()
except Exception as e:
log(f"❌ ERROR: {e}")
return {"error": str(e)}
@app.get("/health")
def health():
return {"status": "ok"}
# =====================================================
# SAFE SCHEDULER (NO CRASH LOOP)
# =====================================================
def scheduler():
while True:
try:
log("⏰ Scheduler trigger")
create_story_pipeline()
except Exception as e:
log(f"❌ Scheduler error: {e}")
time.sleep(14400) # 4 hours
@app.on_event("startup")
def start():
threading.Thread(target=scheduler, daemon=True).start()
# =====================================================
# RUN
# =====================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)