Files
music-iyoyah-bot/yt_engine.py
2026-03-26 09:05:08 +07:00

136 lines
5.1 KiB
Python

import yt_dlp
import os
import asyncio
DOWNLOAD_DIR = "downloads"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
def generate_netscape_cookies():
if not os.path.exists("raw_cookie.txt"):
return False
try:
with open("raw_cookie.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
with open("cookies.txt", "w", encoding="utf-8") as f:
f.write("# Netscape HTTP Cookie File\n")
f.write("# This is a generated file! Do not edit.\n\n")
for line in lines:
parts = line.split('\t')
if len(parts) >= 2:
name, value = parts[0].strip(), parts[1].strip()
if name.lower() == "name" or not name: continue
f.write(f".youtube.com\tTRUE\t/\tTRUE\t2147483647\t{name}\t{value}\n")
return True
except Exception as e:
print(f"Error parsing cookies: {e}")
return False
def fetch_recommendations_sync(video_id: str, limit: int = 30):
ydl_opts_recs = {
'quiet': True,
'extract_flat': True,
'playlistend': limit + 2,
'extractor_args': {'youtube': ['player_client=android,ios']},
}
if os.path.exists("cookies.txt"):
ydl_opts_recs['cookiefile'] = 'cookies.txt'
try:
with yt_dlp.YoutubeDL(ydl_opts_recs) as ydl:
mix_url = f"https://www.youtube.com/watch?v={video_id}&list=RD{video_id}"
info = ydl.extract_info(mix_url, download=False)
recs = []
if 'entries' in info:
for entry in info['entries'][1:]:
if entry and entry.get('id') and entry.get('title'):
recs.append({'id': entry['id'], 'title': entry['title']})
return recs
except Exception as e:
return []
async def get_recommendations(video_id: str):
return await asyncio.to_thread(fetch_recommendations_sync, video_id)
def search_and_download(query_or_url: str):
has_cookies = generate_netscape_cookies()
ydl_opts = {
# ❌ FFMPEG POSTPROCESSORS DIHAPUS ❌
# Langsung tembak format asli m4a (AAC) agar CPU tidak tersiksa
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': f'{DOWNLOAD_DIR}/%(id)s.%(ext)s',
'max_filesize': 45000000,
'quiet': True,
'noplaylist': True,
'default_search': 'ytsearch1',
'extractor_args': {'youtube': ['player_client=android,ios']},
'js_runtimes': {'node': {}},
}
if has_cookies and os.path.exists("cookies.txt"):
ydl_opts['cookiefile'] = 'cookies.txt'
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(query_or_url, download=True)
if 'entries' in info:
info = info['entries'][0]
video_id = info.get('id')
title = info.get('title')
# Ambil ekstensi aslinya (kemungkinan besar m4a)
ext = info.get('ext', 'm4a')
file_path = f"{DOWNLOAD_DIR}/{video_id}.{ext}"
return {
"status": "success",
"video_id": video_id,
"title": title,
"file_path": file_path if os.path.exists(file_path) else None,
"ext": ext
}
except Exception as e:
return {"status": "error", "message": str(e)}
async def process_youtube_request(query: str):
return await asyncio.to_thread(search_and_download, query)
# ==========================================
# FITUR TAMBAHAN: PENCARIAN LIST LAGU
# ==========================================
def fetch_search_results_sync(query: str, max_results: int = 30):
"""Mengambil daftar hasil pencarian tanpa mendownload audionya"""
ydl_opts_search = {
'quiet': True,
'extract_flat': True, # Mengambil metadata saja biar super cepat
# KITA HAPUS extractor_args (player_client) di sini karena bikin search error
}
if os.path.exists("cookies.txt"):
ydl_opts_search['cookiefile'] = 'cookies.txt'
try:
with yt_dlp.YoutubeDL(ydl_opts_search) as ydl:
# Memaksa yt-dlp menggunakan mode pencarian (ytsearch) secara eksplisit
search_query = f"ytsearch{max_results}:{query}"
info = ydl.extract_info(search_query, download=False)
results = []
if 'entries' in info:
for entry in info['entries']:
if entry and entry.get('id') and entry.get('title'):
# Terkadang di hasil pencarian nama key-nya 'channel', bukan 'uploader'
uploader_name = entry.get('uploader') or entry.get('channel') or 'Unknown'
results.append({
'id': entry['id'],
'title': entry['title'],
'uploader': uploader_name
})
return results
except Exception as e:
print(f"Search error: {e}")
return []
async def search_youtube_list(query: str, max_results: int = 30):
return await asyncio.to_thread(fetch_search_results_sync, query, max_results)