ss / github_storage.py
Dooratre's picture
Upload 13 files
a16b959 verified
"""
GitHub Storage for Cards Microservice.
Clean implementation with proper SHA handling.
"""
import json
import base64
import threading
from datetime import datetime
import requests
class GitHubCardsStorage:
"""GitHub-based persistent storage for cards."""
def __init__(self):
from config import (
GITHUB_TOKEN, GITHUB_REPO, GITHUB_BRANCH,
GITHUB_DATA_DIR, CARDS_GITHUB_FILE
)
self._token = GITHUB_TOKEN
self._repo = GITHUB_REPO
self._branch = GITHUB_BRANCH
self._data_dir = GITHUB_DATA_DIR
self._filename = CARDS_GITHUB_FILE
self._file_sha = None
self._lock = threading.Lock()
self._configured = bool(
self._token
and len(self._token) > 10
and self._repo
and "/" in self._repo
)
self._session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=requests.adapters.Retry(
total=3,
backoff_factor=1.0,
status_forcelist=[502, 503, 504],
allowed_methods=["GET", "PUT"],
)
)
self._session.mount("https://", adapter)
if self._configured:
print(f" [GitHub] Configured: {self._repo}/{self._data_dir}/{self._filename}")
else:
print(f" [GitHub] NOT configured - no token or repo")
def is_configured(self):
return self._configured
def _headers(self):
return {
"Authorization": f"token {self._token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json",
}
def _file_url(self):
path = f"{self._data_dir}/{self._filename}" if self._data_dir else self._filename
return f"https://api.github.com/repos/{self._repo}/contents/{path}"
def _fetch_sha(self):
"""Fetch current file SHA from GitHub."""
try:
url = f"{self._file_url()}?ref={self._branch}"
resp = self._session.get(url, headers=self._headers(), timeout=15)
if resp.status_code == 200:
sha = resp.json().get('sha', '')
with self._lock:
self._file_sha = sha
return sha
elif resp.status_code == 404:
with self._lock:
self._file_sha = None
return None
except Exception as e:
print(f" [GitHub] SHA fetch error: {e}")
return None
def pull_cards(self):
"""Pull cards.json from GitHub. Returns dict."""
if not self._configured:
return {}
url = f"{self._file_url()}?ref={self._branch}"
try:
resp = self._session.get(url, headers=self._headers(), timeout=30)
if resp.status_code == 404:
print(f" [GitHub] File not found (first run)")
with self._lock:
self._file_sha = None
return {}
if resp.status_code == 403:
print(f" [GitHub] Rate limited")
return {}
resp.raise_for_status()
data = resp.json()
sha = data.get('sha', '')
with self._lock:
self._file_sha = sha
content_b64 = data.get('content', '')
if not content_b64:
return {}
content_bytes = base64.b64decode(content_b64)
content_str = content_bytes.decode('utf-8')
parsed = json.loads(content_str)
if not isinstance(parsed, dict):
print(f" [GitHub] Warning: file is not a dict")
return {}
print(f" [GitHub] Pulled {len(parsed)} cards (SHA: {sha[:8]})")
return parsed
except json.JSONDecodeError as e:
print(f" [GitHub] Invalid JSON: {e}")
return {}
except Exception as e:
print(f" [GitHub] Pull error: {e}")
return {}
def push_cards(self, data_dict):
"""Push cards.json to GitHub. Returns (success, error)."""
if not self._configured:
return False, "GitHub not configured"
url = self._file_url()
record_count = len(data_dict) if isinstance(data_dict, dict) else 0
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"Cards backup - {record_count} cards - {timestamp}"
content_str = json.dumps(data_dict, ensure_ascii=False, separators=(',', ':'))
content_b64 = base64.b64encode(content_str.encode('utf-8')).decode('utf-8')
# Always fetch fresh SHA before push
current_sha = self._fetch_sha()
payload = {
"message": message,
"content": content_b64,
"branch": self._branch,
}
if current_sha:
payload["sha"] = current_sha
max_retries = 3
for attempt in range(max_retries):
try:
resp = self._session.put(
url, headers=self._headers(),
json=payload, timeout=60
)
if resp.status_code in [200, 201]:
new_sha = resp.json().get('content', {}).get('sha', '')
if new_sha:
with self._lock:
self._file_sha = new_sha
print(f" [GitHub] Pushed {record_count} cards OK")
return True, None
elif resp.status_code in [409, 422]:
print(f" [GitHub] SHA conflict attempt {attempt+1}, refreshing...")
fresh_sha = self._fetch_sha()
if fresh_sha:
payload["sha"] = fresh_sha
else:
payload.pop("sha", None)
continue
elif resp.status_code == 403:
return False, "GitHub rate limited"
else:
err = ""
try:
err = resp.json().get('message', resp.text[:300])
except Exception:
err = resp.text[:300]
return False, f"GitHub {resp.status_code}: {err}"
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
print(f" [GitHub] Timeout attempt {attempt+1}, retrying...")
continue
return False, "Push timeout"
except Exception as e:
if attempt < max_retries - 1:
continue
return False, f"Push error: {e}"
return False, f"Failed after {max_retries} attempts"
def get_status(self):
"""Get GitHub status info."""
if not self._configured:
return {"configured": False}
status = {
"configured": True,
"repo": self._repo,
"branch": self._branch,
"file": f"{self._data_dir}/{self._filename}",
"has_sha": self._file_sha is not None,
"current_sha": self._file_sha[:12] + "..." if self._file_sha else None,
"rate_limit": None,
}
try:
resp = self._session.get(
"https://api.github.com/rate_limit",
headers=self._headers(),
timeout=10
)
if resp.status_code == 200:
rl = resp.json().get('resources', {}).get('core', {})
status["rate_limit"] = {
"limit": rl.get('limit', 0),
"remaining": rl.get('remaining', 0),
"used": rl.get('used', 0),
}
except Exception as e:
status["rate_limit"] = {"error": str(e)}
return status