import asyncpg import os from dotenv import load_dotenv load_dotenv() class DatabaseManager: def __init__(self): self.pool = None async def connect(self): self.pool = await asyncpg.create_pool( host=os.getenv("DB_HOST"), port=os.getenv("DB_PORT"), user=os.getenv("DB_USER"), password=os.getenv("DB_PASS"), database=os.getenv("DB_NAME") ) await self.init_tables() async def init_tables(self): query = """ CREATE TABLE IF NOT EXISTS audio_cache ( id SERIAL PRIMARY KEY, youtube_id VARCHAR(50) UNIQUE NOT NULL, title VARCHAR(255) NOT NULL, s3_object_key VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ async with self.pool.acquire() as conn: await conn.execute(query) async def get_cache(self, youtube_id: str): query = "SELECT * FROM audio_cache WHERE youtube_id = $1" async with self.pool.acquire() as conn: return await conn.fetchrow(query, youtube_id) async def save_cache(self, youtube_id: str, title: str, s3_object_key: str): query = """ INSERT INTO audio_cache (youtube_id, title, s3_object_key) VALUES ($1, $2, $3) ON CONFLICT (youtube_id) DO UPDATE SET created_at = CURRENT_TIMESTAMP; """ async with self.pool.acquire() as conn: await conn.execute(query, youtube_id, title, s3_object_key) async def get_expired_cache(self, days=7): query = "SELECT * FROM audio_cache WHERE created_at < NOW() - INTERVAL '$1 days'" async with self.pool.acquire() as conn: return await conn.fetch(query, days) async def delete_cache(self, youtube_id: str): query = "DELETE FROM audio_cache WHERE youtube_id = $1" async with self.pool.acquire() as conn: await conn.execute(query, youtube_id) db = DatabaseManager()