Spaces:
Build error
Build error
| from sqlalchemy.orm import Session | |
| from database.models import RSSEntry, InstagramPostRecord, SessionLocal | |
| from typing import Optional, List | |
| from datetime import datetime | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def is_entry_processed(db: Session, entry_id: str) -> bool: | |
| """Check if RSS entry already processed""" | |
| return db.query(RSSEntry).filter(RSSEntry.entry_id == entry_id).first() is not None | |
| def save_rss_entry(db: Session, entry_id: str, feed_url: str, title: str, | |
| link: str, published: datetime, content: str) -> RSSEntry: | |
| """Save new RSS entry""" | |
| entry = RSSEntry( | |
| entry_id=entry_id, | |
| feed_url=feed_url, | |
| title=title, | |
| link=link, | |
| published=published, | |
| content=content | |
| ) | |
| db.add(entry) | |
| db.commit() | |
| db.refresh(entry) | |
| return entry | |
| def save_instagram_post(db: Session, rss_entry_id: str, post_data: dict, | |
| image_url: str = None, drive_url: str = None, | |
| instagram_post_id: str = None, status: str = "pending", | |
| error_message: str = None) -> InstagramPostRecord: | |
| """Save Instagram post record""" | |
| post = InstagramPostRecord( | |
| rss_entry_id=rss_entry_id, | |
| post_data=post_data, | |
| image_url=image_url, | |
| drive_url=drive_url, | |
| instagram_post_id=instagram_post_id, | |
| status=status, | |
| error_message=error_message | |
| ) | |
| db.add(post) | |
| db.commit() | |
| db.refresh(post) | |
| return post | |
| def update_post_status(db: Session, post_id: int, status: str, | |
| instagram_post_id: str = None, error_message: str = None): | |
| """Update post status""" | |
| post = db.query(InstagramPostRecord).filter(InstagramPostRecord.id == post_id).first() | |
| if post: | |
| post.status = status | |
| if instagram_post_id: | |
| post.instagram_post_id = instagram_post_id | |
| post.posted_at = datetime.utcnow() | |
| if error_message: | |
| post.error_message = error_message | |
| db.commit() | |
| def get_pending_posts(db: Session, limit: int = 10) -> List[InstagramPostRecord]: | |
| """Get pending posts""" | |
| return db.query(InstagramPostRecord).filter( | |
| InstagramPostRecord.status == "pending" | |
| ).limit(limit).all() |