Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import time | |
| import asyncio | |
| from datetime import datetime | |
| from fastapi import APIRouter, UploadFile, File, Form | |
| from fastapi.responses import FileResponse | |
| from huggingface_hub import hf_hub_download, upload_file | |
| from core import get_db_list, save_db, api, REPO_ID, TOKEN, ShareLikeReq, ShareCommentReq, ShareCommentActionReq, ShareEditReq, ShareDeleteReq | |
| router = APIRouter() | |
| async def get_share_list(): | |
| return {"status": "success", "data": get_db_list("community_posts.json")} | |
| async def get_share_image(name: str): | |
| try: | |
| path = await asyncio.to_thread(hf_hub_download, repo_id=REPO_ID, filename=f"community_images/{name}", repo_type="dataset", token=TOKEN) | |
| return FileResponse(path) | |
| except: return {"status": "error"} | |
| async def publish_share(username: str = Form(...), title: str = Form(...), content: str = Form(...), image: UploadFile = File(None)): | |
| try: | |
| db = get_db_list("community_posts.json") | |
| post_id = uuid.uuid4().hex | |
| post_data = { "id": post_id, "author": username, "title": title[:20], "content": content, "time": int(time.time() * 1000), "likes": 0, "liked_by": [], "comments": [] } | |
| if image and image.filename: | |
| ext = os.path.splitext(image.filename)[1].lower() | |
| image_name = f"{post_id}{ext}" | |
| temp_path = f"temp_share_{uuid.uuid4().hex}{ext}" | |
| with open(temp_path, "wb") as f: f.write(await image.read()) | |
| await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{image_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| post_data["image"] = image_name | |
| db.insert(0, post_data) | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "post": post_data} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def edit_share(post_id: str = Form(...), username: str = Form(...), title: str = Form(...), content: str = Form(...), image: UploadFile = File(None)): | |
| try: | |
| db = get_db_list("community_posts.json") | |
| for p in db: | |
| if p["id"] == post_id: | |
| if p["author"] != username: return {"status": "error", "message": "无权限"} | |
| p["title"] = title[:20] | |
| p["content"] = content | |
| p["last_edit_date"] = datetime.now().strftime("%Y-%m-%d") | |
| if image and image.filename: | |
| ext = os.path.splitext(image.filename)[1].lower() | |
| image_name = f"{post_id}_{uuid.uuid4().hex[:8]}{ext}" | |
| temp_path = f"temp_share_edit_{uuid.uuid4().hex}{ext}" | |
| with open(temp_path, "wb") as f: f.write(await image.read()) | |
| await asyncio.to_thread(upload_file, path_or_fileobj=temp_path, path_in_repo=f"community_images/{image_name}", repo_id=REPO_ID, repo_type="dataset", token=TOKEN) | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| if p.get("image"): | |
| try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{p['image']}", repo_id=REPO_ID, token=TOKEN)) | |
| except: pass | |
| p["image"] = image_name | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "post": p} # 🌟 核心:返回更新后的单条帖子 | |
| return {"status": "error", "message": "帖子不存在"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def delete_share(req: ShareDeleteReq): | |
| try: | |
| db = get_db_list("community_posts.json") | |
| for i, p in enumerate(db): | |
| if p["id"] == req.post_id: | |
| if p["author"] != req.username and req.username != "猪的飞行梦": return {"status": "error", "message": "无权限"} | |
| del_post = db.pop(i) | |
| if del_post.get("image"): | |
| try: asyncio.create_task(asyncio.to_thread(api.delete_file, path_in_repo=f"community_images/{del_post['image']}", repo_id=REPO_ID, token=TOKEN)) | |
| except: pass | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "deleted_id": req.post_id} # 🌟 核心:返回被删 ID | |
| return {"status": "error", "message": "帖子不存在"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def like_share(req: ShareLikeReq): | |
| try: | |
| db = get_db_list("community_posts.json") | |
| for p in db: | |
| if p["id"] == req.post_id: | |
| liked_by = p.get("liked_by", []) | |
| if req.username in liked_by: liked_by.remove(req.username); p["likes"] = max(0, p.get("likes", 1) - 1) | |
| else: liked_by.append(req.username); p["likes"] = p.get("likes", 0) + 1 | |
| p["liked_by"] = liked_by | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "likes": p["likes"], "post": p} # 🌟 返回更新后对象 | |
| return {"status": "error", "message": "帖子不存在"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def comment_share(req: ShareCommentReq): | |
| try: | |
| if len(req.content) > 99: return {"status": "error", "message": "评论过长"} | |
| db = get_db_list("community_posts.json") | |
| for p in db: | |
| if p["id"] == req.post_id: | |
| new_comment = {"id": uuid.uuid4().hex, "user": req.username, "text": req.content, "time": int(time.time() * 1000)} | |
| if req.parent_id: | |
| parent_found = False | |
| for c in p.get("comments", []): | |
| if c.get("id") == req.parent_id: | |
| if "replies" not in c: c["replies"] = [] | |
| c["replies"].append(new_comment) | |
| parent_found = True; break | |
| if not parent_found: return {"status": "error", "message": "原评论已不存在"} | |
| else: | |
| if "comments" not in p: p["comments"] = [] | |
| p["comments"].append(new_comment) | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "post": p} # 🌟 返回更新后对象 | |
| return {"status": "error", "message": "帖子不存在"} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| async def comment_action_share(req: ShareCommentActionReq): | |
| try: | |
| db = get_db_list("community_posts.json") | |
| for p in db: | |
| if p["id"] == req.post_id: | |
| if p["author"] != req.username: return {"status": "error", "message": "无权限"} | |
| if req.action == "delete": p["comments"] = [c for c in p.get("comments", []) if c.get("id") != req.comment_id] | |
| elif req.action == "pin": | |
| for c in p.get("comments", []): | |
| if c.get("id") == req.comment_id: c["pinned"] = not c.get("pinned", False) | |
| save_db(db, "community_posts.json") | |
| return {"status": "success", "post": p} # 🌟 返回更新后对象 | |
| return {"status": "error", "message": "帖子不存在"} | |
| except Exception as e: return {"status": "error", "message": str(e)} |