TheWineStain / database.py
obaes's picture
Upload 13 files
64deae3 verified
import asyncpg
import os
# Removed experimental playsequene function as storage is now handled by Agno in TWS_fonctions.py
strcreate = """CREATE TABLE usercomment
(
id INT PRIMARY KEY NOT NULL,
content VARCHAR(100),
user VARCHAR(100)
);
CREATE TABLE usercave
(
id INT PRIMARY KEY NOT NULL,
wine VARCHAR(100)
);
"""
class NetworkDB:
def __init__(self, database_url=None):
self.pool = None
self.database_url = database_url or os.getenv("DATABASE_URL")
async def create_table(self) -> str:
try:
conn = await asyncpg.connect(self.database_url)
await conn.fetchval(
strcreate
)
await conn.close()
except Exception as e:
return "[Internal Message: Server Error]"
async def insert_wine(self, id: str, wine: list[float]) -> bool:
# pool = await self.get_pool()
# async with pool.acquire() as conn:
try:
conn = await asyncpg.connect(self.database_url)
id = await conn.fetchval(
"INSERT INTO textusercave_posts (id, wine) VALUES ($1, $2) RETURNING id",
id,
f"{wine}",
)
await conn.close()
return True if id is not None else False
except Exception as e:
return False
async def list_wine(self) -> bool:
# pool = await self.get_pool()
# async with pool.acquire() as conn:
try:
conn = await asyncpg.connect(self.database_url)
id,wine = await conn.fetchval(
"select id, wine from usercave;"
)
await conn.close()
if wine is not None:
formatted_post = f"wine id: {id}\n{wine}"
return formatted_post
return "[Internal Message: No similar post found!]"
return True if id is not None else False
except Exception as e:
return False
async def get_pool(self):
if self.pool:
return self.pool
self.pool = await asyncpg.create_pool(
self.database_url, min_size=1, max_size=10
)
return self.pool
async def post_text(self, content: str, embeddings: list[float]) -> bool:
try:
conn = await asyncpg.connect(self.database_url)
id = await conn.fetchval(
"INSERT INTO text_posts (content, embedding) VALUES ($1, $2) RETURNING id",
content,
f"{embeddings}", # Stored as text instead of VECTOR
)
await conn.close()
return True if id is not None else False
except Exception as e:
return False
async def get_text_post_similar(self, query_embedding: list[float]) -> str:
# Vector search using <-> removed as per pgvector dependency removal
return "[Internal Message: Vector search disabled]"
async def get_text_post_comments(self, post_id: int) -> str:
try:
conn = await asyncpg.connect(self.database_url)
comments = await conn.fetch(
"SELECT content FROM text_posts_comments WHERE post_id = $1 ORDER BY uploaded_at DESC LIMIT 5",
post_id
)
await conn.close()
if len(comments) == 0:
return "[Internal Message: No Comments on this post]"
formatted_comments = ""
for i, comment in enumerate(comments):
# Only add new lines before the comments. So last comment won't have extra new lines. Don't add before first comment obviously
if i > 0:
formatted_comments += "\n\n"
formatted_comments += f"<|Comment_{i}|>\n{comment['content']}"
return formatted_comments
except Exception as e:
return ["Internal Message: Server Error"]
async def comment_on_text_post(self, post_id: int, content: str) -> bool:
try:
conn = await asyncpg.connect(self.database_url)
success = await conn.fetchval("INSERT INTO text_posts_comments (post_id, content) VALUES ($1, $2) RETURNING id", post_id, content)
await conn.close()
return False if success is None else True
except Exception as e:
return False
async def disconnect(self) -> None:
if self.pool:
self.pool.close()