from minio import Minio import os from dotenv import load_dotenv import asyncio load_dotenv() s3_client = Minio( endpoint=os.getenv("MINIO_ENDPOINT"), access_key=os.getenv("MINIO_ACCESS_KEY"), secret_key=os.getenv("MINIO_SECRET_KEY"), secure=os.getenv("MINIO_SECURE", "False").lower() == "true" ) BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME") def init_s3(): # Buat bucket otomatis jika belum ada found = s3_client.bucket_exists(BUCKET_NAME) if not found: s3_client.make_bucket(BUCKET_NAME) init_s3() def upload_audio_sync(file_path: str, object_name: str): try: s3_client.fput_object(BUCKET_NAME, object_name, file_path) return True except Exception as e: print(f"S3 Upload Error: {e}") return False async def upload_audio(file_path: str, object_name: str): return await asyncio.to_thread(upload_audio_sync, file_path, object_name) def get_audio_sync(object_name: str, download_path: str): try: s3_client.fget_object(BUCKET_NAME, object_name, download_path) return True except Exception as e: print(f"S3 Download Error: {e}") return False async def download_audio(object_name: str, download_path: str): return await asyncio.to_thread(get_audio_sync, object_name, download_path) def delete_audio_sync(object_name: str): try: s3_client.remove_object(BUCKET_NAME, object_name) except: pass async def delete_audio(object_name: str): await asyncio.to_thread(delete_audio_sync, object_name)