Spaces:
Sleeping
Sleeping
File size: 4,680 Bytes
ae588db | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | import sqlite_utils
from datetime import datetime
import os
import json
from typing import Dict, Optional, Any
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "articles.db")
def get_db():
"""Returns a connection to the SQLite database."""
db = sqlite_utils.Database(DB_PATH)
# Create table if not exists
if "articles" not in db.table_names():
db["articles"].create({
"url": str,
"title": str,
"author": str,
"publication": str,
"markdown_content": str,
"json_state": str, # Raw Apollo/JSON-LD state
"html_content": str,
"last_scraped": str,
"source": str, # 'apollo', 'json-ld', 'html', 'archive', 'vision'
"embedding": str # JSON string of float list
}, pk="url")
# Enable Full Text Search
db["articles"].enable_fts(["title", "markdown_content", "author"])
return db
def save_article(data: Dict[str, Any]):
"""Saves or updates an article in the database."""
db = get_db()
# Prepare record
record = {
"url": data.get("url"),
"title": data.get("title"),
"author": data.get("author", {}).get("name") if isinstance(data.get("author"), dict) else data.get("author"),
"publication": data.get("publication"),
"markdown_content": data.get("markdownContent"),
"json_state": json.dumps(data.get("json_state", {})),
"html_content": data.get("html_debug", "")[:100000], # Truncate if too huge
"last_scraped": datetime.now().isoformat(),
"source": data.get("source", "unknown"),
"embedding": json.dumps(data.get("embedding")) if data.get("embedding") else None
}
# Validation: Don't save if it looks like an error page
content = record.get("markdown_content", "")
if "Apologies, but something went wrong" in content or "500" in content and len(content) < 500:
print(f"Refusing to save invalid article: {record['url']}")
return
# Upsert
db["articles"].upsert(record, pk="url")
def get_article(url: str) -> Optional[Dict[str, Any]]:
"""Retrieves an article from the database."""
db = get_db()
try:
record = db["articles"].get(url)
if record:
# Parse JSON state back
try:
record["json_state"] = json.loads(record["json_state"])
except:
record["json_state"] = {}
# Map back to expected format
return {
"url": record["url"],
"title": record["title"],
"author": {"name": record["author"]},
"publication": record["publication"],
"markdownContent": record["markdown_content"],
"json_state": record["json_state"],
"source": record["source"],
"cached": True
}
except sqlite_utils.db.NotFoundError:
return None
return None
def is_fresh(url: str, max_age_hours: int = 24) -> bool:
"""Checks if the cached article is fresh enough."""
article = get_article(url)
if not article:
return False
db = get_db()
record = db["articles"].get(url)
last_scraped = datetime.fromisoformat(record["last_scraped"])
age = (datetime.now() - last_scraped).total_seconds() / 3600
return age < max_age_hours
def search_similar(query_embedding: list, limit: int = 5) -> list:
"""
Searches for similar articles using cosine similarity.
Note: This is a brute-force implementation in Python.
"""
import numpy as np
db = get_db()
results = []
# Fetch all embeddings
# In a real production system, use a Vector DB
rows = db.query("SELECT url, title, embedding FROM articles WHERE embedding IS NOT NULL")
query_vec = np.array(query_embedding)
norm_query = np.linalg.norm(query_vec)
for row in rows:
try:
emb = json.loads(row["embedding"])
vec = np.array(emb)
norm_vec = np.linalg.norm(vec)
if norm_vec == 0 or norm_query == 0:
continue
similarity = np.dot(query_vec, vec) / (norm_query * norm_vec)
results.append({
"url": row["url"],
"title": row["title"],
"similarity": float(similarity)
})
except:
continue
# Sort by similarity desc
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:limit]
|