import asyncio import os import logging import time from aiogram import Bot, Dispatcher, types, BaseMiddleware, F from aiogram.filters import Command from aiogram.types import FSInputFile, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery from aiogram.client.default import DefaultBotProperties from aiogram.client.session.aiohttp import AiohttpSession from dotenv import load_dotenv from apscheduler.schedulers.asyncio import AsyncIOScheduler from yt_engine import process_youtube_request, get_recommendations from db_manager import db from s3_manager import upload_audio, download_audio, delete_audio logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() BOT_TOKEN = os.getenv("BOT_TOKEN") AUTHORIZED_USER_ID = int(os.getenv("AUTHORIZED_USER_ID", 0)) session = AiohttpSession(timeout=300) # Matikan parse_mode default agar aman dari error karakter aneh di judul lagu YouTube bot = Bot(token=BOT_TOKEN, session=session, default=DefaultBotProperties(parse_mode=None)) dp = Dispatcher() # Antrean sederhana (FIFO) music_queue = asyncio.Queue() async def queue_worker(): """Memproses lagu satu per satu dari antrean agar server tidak meledak""" while True: query, chat_id, msg_id = await music_queue.get() try: await process_music_request(query, chat_id, msg_id) except Exception as e: logger.error(f"Worker Error: {e}") finally: music_queue.task_done() # Satpam Bot class SecurityMiddleware(BaseMiddleware): async def __call__(self, handler, event: types.Message | CallbackQuery, data: dict): if event.from_user.id != AUTHORIZED_USER_ID: return return await handler(event, data) dp.message.middleware(SecurityMiddleware()) dp.callback_query.middleware(SecurityMiddleware()) # --- COMMANDS --- @dp.message(Command("start")) async def cmd_start(message: types.Message): await message.answer("🎧 **Music Bot Ready!**\nKirim format: `/play `\nSetelah lagu terkirim, klik rekomendasi di bawahnya untuk melanjutkan.", parse_mode="Markdown") @dp.message(Command("play")) async def cmd_play(message: types.Message): query = message.text.replace("/play", "").strip() if not query: return await message.answer("⚠️ Masukkan judul lagu. Contoh: /play Dewa 19 Kangen") status_msg = await message.answer("⏳ Memproses pesananmu...") await music_queue.put((query, message.chat.id, status_msg.message_id)) # --- HANDLER TOMBOL SUGGESTION --- @dp.callback_query(F.data.startswith("play_")) async def handle_suggestion_click(callback: CallbackQuery): video_id = callback.data.split("_", 1)[1] # 1. Hapus tombol di pesan sebelumnya biar chat nggak penuh tombol mati await callback.message.edit_reply_markup(reply_markup=None) # 2. Kirim status baru status_msg = await bot.send_message(callback.message.chat.id, "⏳ Mengunduh lagu pilihanmu...") query_url = f"https://www.youtube.com/watch?v={video_id}" # 3. Masukkan ke antrean await music_queue.put((query_url, callback.message.chat.id, status_msg.message_id)) await callback.answer("Siapp! Diproses...") # --- CORE LOGIC --- async def process_music_request(query: str, chat_id: int, status_msg_id: int): local_file = None total_start_time = time.time() # ⏱️ START TIMER TOTAL try: logger.info(f"▶️ MEMULAI PROSES LAGU: {query}") await bot.edit_message_text("🔍 Mencari ke YouTube...", chat_id, status_msg_id) # --- FASE 1: YOUTUBE & FFMPEG --- t_yt_start = time.time() yt_result = await process_youtube_request(query) t_yt_end = time.time() logger.info(f"⏱️ [FASE 1] yt-dlp & FFmpeg memakan waktu: {t_yt_end - t_yt_start:.2f} detik") if yt_result["status"] == "error": return await bot.edit_message_text(f"❌ Gagal: {yt_result['message']}", chat_id, status_msg_id) video_id = yt_result["video_id"] title = yt_result["title"] local_file = yt_result["file_path"] s3_object_name = f"{video_id}.mp3" # --- FASE 2: MINIO S3 --- t_s3_start = time.time() cached_data = await db.get_cache(video_id) if cached_data: await bot.edit_message_text("⚡ Mengambil dari Cache (MinIO)...", chat_id, status_msg_id) local_file = f"downloads/cache_{video_id}.mp3" await download_audio(s3_object_name, local_file) logger.info(f"⏱️ [FASE 2] Download dari MinIO memakan waktu: {time.time() - t_s3_start:.2f} detik") else: await bot.edit_message_text("☁️ Mengunggah ke MinIO Server...", chat_id, status_msg_id) if local_file and os.path.exists(local_file): await upload_audio(local_file, s3_object_name) await db.save_cache(video_id, title, s3_object_name) logger.info(f"⏱️ [FASE 2] Upload ke MinIO memakan waktu: {time.time() - t_s3_start:.2f} detik") # --- FASE 3: TELEGRAM UPLOAD --- await bot.edit_message_text("📤 Mengirim file ke Telegram...", chat_id, status_msg_id) if local_file and os.path.exists(local_file): t_tele_start = time.time() audio = FSInputFile(local_file) await bot.send_audio(chat_id=chat_id, audio=audio, title=title, performer="Music Bot", request_timeout=300) t_tele_end = time.time() logger.info(f"⏱️ [FASE 3] Upload ke Telegram memakan waktu: {t_tele_end - t_tele_start:.2f} detik") if status_msg_id: await bot.delete_message(chat_id, status_msg_id) # --- SUGGEST 5 LAGU --- t_recs_start = time.time() recs = await get_recommendations(video_id) logger.info(f"⏱️ [FASE 4] Fetch Rekomendasi memakan waktu: {time.time() - t_recs_start:.2f} detik") if recs: kb = [] for rec in recs[:5]: short_title = rec['title'][:60] + "..." if len(rec['title']) > 60 else rec['title'] kb.append([InlineKeyboardButton(text=f"▶️ {short_title}", callback_data=f"play_{rec['id']}")]) reply_markup = InlineKeyboardMarkup(inline_keyboard=kb) await bot.send_message( chat_id=chat_id, text=f"💡 **Rekomendasi dari:**\n_{title}_", reply_markup=reply_markup, parse_mode="Markdown" ) logger.info(f"✅ TOTAL WAKTU E2E: {time.time() - total_start_time:.2f} detik\n{'-'*40}") except Exception as e: logger.error(f"Error: {e}") try: await bot.edit_message_text("❌ Terjadi kesalahan internal saat memproses lagu.", chat_id, status_msg_id) except: pass finally: if local_file and os.path.exists(local_file): os.remove(local_file) # --- AUTO CLEANUP CACHE --- async def cleanup_expired_cache(): expired_items = await db.get_expired_cache(days=7) for item in expired_items: await delete_audio(item['s3_object_key']) await db.delete_cache(item['youtube_id']) # --- MAIN RUNNER --- async def main(): await db.connect() asyncio.create_task(queue_worker()) scheduler = AsyncIOScheduler() scheduler.add_job(cleanup_expired_cache, 'cron', hour=3, minute=0) scheduler.start() logger.info("🚀 Bot Music - Mode Interactive Playlist Berjalan!") await bot.delete_webhook(drop_pending_updates=True) await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())