170 lines
7.0 KiB
Python
170 lines
7.0 KiB
Python
import asyncio
|
||
import os
|
||
import time
|
||
import logging
|
||
from aiogram import Bot, Dispatcher, types, BaseMiddleware, F
|
||
from aiogram.filters import Command
|
||
from aiogram.types import FSInputFile, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
|
||
from aiogram.client.default import DefaultBotProperties
|
||
from aiogram.client.session.aiohttp import AiohttpSession
|
||
from dotenv import load_dotenv
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
|
||
# Import modul lokal
|
||
from yt_engine import process_youtube_request, get_recommendations
|
||
from db_manager import db
|
||
from s3_manager import upload_audio, download_audio, delete_audio
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
load_dotenv()
|
||
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
||
AUTHORIZED_USER_ID = int(os.getenv("AUTHORIZED_USER_ID", 0))
|
||
|
||
session = AiohttpSession(timeout=300)
|
||
bot = Bot(token=BOT_TOKEN, session=session, default=DefaultBotProperties(parse_mode=None))
|
||
dp = Dispatcher()
|
||
|
||
# --- SISTEM ANTREAN PINTAR (PRIORITY QUEUE) ---
|
||
# Format isi antrean: (priority, timestamp, query, chat_id, status_msg_id)
|
||
music_queue = asyncio.PriorityQueue()
|
||
|
||
async def queue_worker():
|
||
"""Pekerja di balik layar yang memproses antrean satu per satu"""
|
||
while True:
|
||
priority, _, query, chat_id, msg_id = await music_queue.get()
|
||
try:
|
||
await process_music_request(query, chat_id, msg_id)
|
||
except Exception as e:
|
||
logger.error(f"Worker Error: {e}")
|
||
finally:
|
||
music_queue.task_done()
|
||
|
||
# --- MIDDLEWARE / SECURITY ---
|
||
class SecurityMiddleware(BaseMiddleware):
|
||
async def __call__(self, handler, event: types.Message | CallbackQuery, data: dict):
|
||
user_id = event.from_user.id
|
||
if user_id != AUTHORIZED_USER_ID:
|
||
return
|
||
return await handler(event, data)
|
||
|
||
dp.message.middleware(SecurityMiddleware())
|
||
dp.callback_query.middleware(SecurityMiddleware())
|
||
|
||
# --- COMMAND HANDLERS ---
|
||
@dp.message(Command("start"))
|
||
async def cmd_start(message: types.Message):
|
||
await message.answer("🎧 **Kantor-Bypass Music Bot**\nKirim format: `/play <judul>`")
|
||
|
||
@dp.message(Command("play"))
|
||
async def cmd_play(message: types.Message):
|
||
query = message.text.replace("/play", "").strip()
|
||
if not query:
|
||
return await message.answer("⚠️ Masukkan judul lagu. Contoh: `/play Dewa 19 Kangen`")
|
||
|
||
status_msg = await message.answer("⏳ *Dimasukkan ke antrean reguler...*")
|
||
# Priority 1 = Reguler (Add to Queue)
|
||
await music_queue.put((1, time.time(), query, message.chat.id, status_msg.message_id))
|
||
|
||
# --- CALLBACK HANDLERS (TOMBOL INTERAKTIF) ---
|
||
@dp.callback_query(F.data.startswith("play_") | F.data.startswith("queue_"))
|
||
async def handle_action_buttons(callback: CallbackQuery):
|
||
action, video_id = callback.data.split("_", 1)
|
||
|
||
# Hapus tombol di pesan yang diklik biar gak di-spam klik
|
||
await callback.message.edit_reply_markup(reply_markup=None)
|
||
|
||
query_url = f"https://www.youtube.com/watch?v={video_id}"
|
||
|
||
if action == "play":
|
||
status_msg = await bot.send_message(callback.message.chat.id, "🚀 *VIP Prioritas! Memproses sekarang...*")
|
||
# Priority 0 = VIP (Bypass Antrean)
|
||
await music_queue.put((0, time.time(), query_url, callback.message.chat.id, status_msg.message_id))
|
||
await callback.answer("Memutar langsung!")
|
||
elif action == "queue":
|
||
status_msg = await bot.send_message(callback.message.chat.id, "📥 *Ditambahkan ke antrean...*")
|
||
# Priority 1 = Reguler
|
||
await music_queue.put((1, time.time(), query_url, callback.message.chat.id, status_msg.message_id))
|
||
await callback.answer("Masuk antrean!")
|
||
|
||
# --- CORE LOGIC ---
|
||
async def process_music_request(query: str, chat_id: int, status_msg_id: int):
|
||
local_file = None
|
||
try:
|
||
await bot.edit_message_text("🔍 Mencari dan memproses lagu...", chat_id, status_msg_id)
|
||
yt_result = await process_youtube_request(query)
|
||
|
||
if yt_result["status"] == "error":
|
||
return await bot.edit_message_text(f"❌ Gagal: {yt_result['message']}", chat_id, status_msg_id)
|
||
|
||
video_id = yt_result["video_id"]
|
||
title = yt_result["title"]
|
||
local_file = yt_result["file_path"]
|
||
s3_object_name = f"{video_id}.mp3"
|
||
|
||
cached_data = await db.get_cache(video_id)
|
||
if cached_data:
|
||
await bot.edit_message_text("⚡ Mengambil dari Cache S3...", chat_id, status_msg_id)
|
||
local_file = f"downloads/cache_{video_id}.mp3"
|
||
await download_audio(s3_object_name, local_file)
|
||
else:
|
||
await bot.edit_message_text("☁️ Mengunggah ke S3 Storage...", chat_id, status_msg_id)
|
||
if local_file and os.path.exists(local_file):
|
||
await upload_audio(local_file, s3_object_name)
|
||
await db.save_cache(video_id, title, s3_object_name)
|
||
|
||
await bot.edit_message_text("📤 Mengirim audio ke Telegram...", chat_id, status_msg_id)
|
||
|
||
if local_file and os.path.exists(local_file):
|
||
audio = FSInputFile(local_file)
|
||
await bot.send_audio(chat_id=chat_id, audio=audio, title=title, performer="Music Bot", request_timeout=300)
|
||
await bot.delete_message(chat_id, status_msg_id)
|
||
|
||
# --- MENCARI REKOMENDASI LAGU (MIX ALGORITHM) ---
|
||
recs = await get_recommendations(video_id)
|
||
if recs:
|
||
kb = []
|
||
# Buat baris tombol untuk setiap lagu rekomendasi
|
||
for rec in recs:
|
||
short_title = rec['title'][:30] + "..." if len(rec['title']) > 30 else rec['title']
|
||
kb.append([InlineKeyboardButton(text=f"🎵 {short_title}", callback_data="ignore")])
|
||
kb.append([
|
||
InlineKeyboardButton(text="▶️ Play Now", callback_data=f"play_{rec['id']}"),
|
||
InlineKeyboardButton(text="➕ Queue", callback_data=f"queue_{rec['id']}")
|
||
])
|
||
|
||
reply_markup = InlineKeyboardMarkup(inline_keyboard=kb)
|
||
await bot.send_message(chat_id, f"💡 **Rekomendasi Selanjutnya untukmu:**", reply_markup=reply_markup, parse_mode="Markdown")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error: {e}")
|
||
finally:
|
||
if local_file and os.path.exists(local_file):
|
||
os.remove(local_file)
|
||
|
||
# --- AUTO CLEANUP ---
|
||
async def cleanup_expired_cache():
|
||
expired_items = await db.get_expired_cache(days=7)
|
||
for item in expired_items:
|
||
await delete_audio(item['s3_object_key'])
|
||
await db.delete_cache(item['youtube_id'])
|
||
|
||
# --- MAIN LOOP ---
|
||
async def main():
|
||
await db.connect()
|
||
|
||
# Menyalakan Background Worker untuk Queue
|
||
asyncio.create_task(queue_worker())
|
||
|
||
scheduler = AsyncIOScheduler()
|
||
scheduler.add_job(cleanup_expired_cache, 'cron', hour=3, minute=0)
|
||
scheduler.start()
|
||
|
||
logger.info("🚀 Bot Music Ready dengan Antrean & Rekomendasi!")
|
||
await bot.delete_webhook(drop_pending_updates=True)
|
||
await dp.start_polling(bot)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|