v1 milestone

This commit is contained in:
YOLANDO
2026-03-25 11:20:05 +07:00
commit 4ec369c18e
7307 changed files with 873141 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
# flake8: noqa: F401
from ._base import YoutubeBaseInfoExtractor
from ._clip import YoutubeClipIE
from ._mistakes import YoutubeTruncatedIDIE, YoutubeTruncatedURLIE
from ._notifications import YoutubeNotificationsIE
from ._redirect import (
YoutubeConsentRedirectIE,
YoutubeFavouritesIE,
YoutubeFeedsInfoExtractor,
YoutubeHistoryIE,
YoutubeLivestreamEmbedIE,
YoutubeRecommendedIE,
YoutubeShortsAudioPivotIE,
YoutubeSubscriptionsIE,
YoutubeWatchLaterIE,
YoutubeYtBeIE,
YoutubeYtUserIE,
)
from ._search import YoutubeMusicSearchURLIE, YoutubeSearchIE, YoutubeSearchURLIE
from ._tab import YoutubePlaylistIE, YoutubeTabBaseInfoExtractor, YoutubeTabIE
from ._video import YoutubeIE
# Hack to allow plugin overrides work
for _cls in [
YoutubeBaseInfoExtractor,
YoutubeClipIE,
YoutubeTruncatedIDIE,
YoutubeTruncatedURLIE,
YoutubeNotificationsIE,
YoutubeConsentRedirectIE,
YoutubeFavouritesIE,
YoutubeFeedsInfoExtractor,
YoutubeHistoryIE,
YoutubeLivestreamEmbedIE,
YoutubeRecommendedIE,
YoutubeShortsAudioPivotIE,
YoutubeSubscriptionsIE,
YoutubeWatchLaterIE,
YoutubeYtBeIE,
YoutubeYtUserIE,
YoutubeMusicSearchURLIE,
YoutubeSearchIE,
YoutubeSearchURLIE,
YoutubePlaylistIE,
YoutubeTabBaseInfoExtractor,
YoutubeTabIE,
YoutubeIE,
]:
_cls.__module__ = 'yt_dlp.extractor.youtube'

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,68 @@
from ._tab import YoutubeTabBaseInfoExtractor
from ._video import YoutubeIE
from ...utils import ExtractorError, traverse_obj
class YoutubeClipIE(YoutubeTabBaseInfoExtractor):
IE_NAME = 'youtube:clip'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/clip/(?P<id>[^/?#]+)'
_TESTS = [{
# FIXME: Other metadata should be extracted from the clip, not from the base video
'url': 'https://www.youtube.com/clip/UgytZKpehg-hEMBSn3F4AaABCQ',
'info_dict': {
'id': 'UgytZKpehg-hEMBSn3F4AaABCQ',
'ext': 'mp4',
'section_start': 29.0,
'section_end': 39.7,
'duration': 10.7,
'age_limit': 0,
'availability': 'public',
'categories': ['Gaming'],
'channel': 'Scott The Woz',
'channel_id': 'UC4rqhyiTs7XyuODcECvuiiQ',
'channel_url': 'https://www.youtube.com/channel/UC4rqhyiTs7XyuODcECvuiiQ',
'description': 'md5:7a4517a17ea9b4bd98996399d8bb36e7',
'like_count': int,
'playable_in_embed': True,
'tags': 'count:17',
'thumbnail': 'https://i.ytimg.com/vi_webp/ScPX26pdQik/maxresdefault.webp',
'title': 'Mobile Games on Console - Scott The Woz',
'upload_date': '20210920',
'uploader': 'Scott The Woz',
'uploader_id': '@ScottTheWoz',
'uploader_url': 'https://www.youtube.com/@ScottTheWoz',
'view_count': int,
'live_status': 'not_live',
'channel_follower_count': int,
'chapters': 'count:20',
'comment_count': int,
'heatmap': 'count:100',
'media_type': 'clip',
},
}]
def _real_extract(self, url):
clip_id = self._match_id(url)
_, data = self._extract_webpage(url, clip_id)
video_id = traverse_obj(data, ('currentVideoEndpoint', 'watchEndpoint', 'videoId'))
if not video_id:
raise ExtractorError('Unable to find video ID')
clip_data = traverse_obj(data, (
'engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'clipSectionRenderer',
'contents', ..., 'clipAttributionRenderer', 'onScrubExit', 'commandExecutorCommand', 'commands', ...,
'openPopupAction', 'popup', 'notificationActionRenderer', 'actionButton', 'buttonRenderer', 'command',
'commandExecutorCommand', 'commands', ..., 'loopCommand'), get_all=False)
return {
'_type': 'url_transparent',
'url': f'https://www.youtube.com/watch?v={video_id}',
'ie_key': YoutubeIE.ie_key(),
'id': clip_id,
'media_type': 'clip',
'section_start': int(clip_data['startTimeMs']) / 1000,
'section_end': int(clip_data['endTimeMs']) / 1000,
'_format_sort_fields': ( # https protocol is prioritized for ffmpeg compatibility
'proto:https', 'quality', 'res', 'fps', 'hdr:12', 'source', 'vcodec', 'channels', 'acodec', 'lang'),
}

View File

@@ -0,0 +1,69 @@
from ._base import YoutubeBaseInfoExtractor
from ...utils import ExtractorError
class YoutubeTruncatedURLIE(YoutubeBaseInfoExtractor):
IE_NAME = 'youtube:truncated_url'
IE_DESC = False # Do not list
_VALID_URL = r'''(?x)
(?:https?://)?
(?:\w+\.)?[yY][oO][uU][tT][uU][bB][eE](?:-nocookie)?\.com/
(?:watch\?(?:
feature=[a-z_]+|
annotation_id=annotation_[^&]+|
x-yt-cl=[0-9]+|
hl=[^&]*|
t=[0-9]+
)?
|
attribution_link\?a=[^&]+
)
$
'''
_TESTS = [{
'url': 'https://www.youtube.com/watch?annotation_id=annotation_3951667041',
'only_matching': True,
}, {
'url': 'https://www.youtube.com/watch?',
'only_matching': True,
}, {
'url': 'https://www.youtube.com/watch?x-yt-cl=84503534',
'only_matching': True,
}, {
'url': 'https://www.youtube.com/watch?feature=foo',
'only_matching': True,
}, {
'url': 'https://www.youtube.com/watch?hl=en-GB',
'only_matching': True,
}, {
'url': 'https://www.youtube.com/watch?t=2372',
'only_matching': True,
}]
def _real_extract(self, url):
raise ExtractorError(
'Did you forget to quote the URL? Remember that & is a meta '
'character in most shells, so you want to put the URL in quotes, '
'like yt-dlp '
'"https://www.youtube.com/watch?feature=foo&v=BaW_jenozKc" '
' or simply yt-dlp BaW_jenozKc .',
expected=True)
class YoutubeTruncatedIDIE(YoutubeBaseInfoExtractor):
IE_NAME = 'youtube:truncated_id'
IE_DESC = False # Do not list
_VALID_URL = r'https?://(?:www\.)?youtube\.com/watch\?v=(?P<id>[0-9A-Za-z_-]{1,10})$'
_TESTS = [{
'url': 'https://www.youtube.com/watch?v=N_708QY7Ob',
'only_matching': True,
}]
def _real_extract(self, url):
video_id = self._match_id(url)
raise ExtractorError(
f'Incomplete YouTube ID {video_id}. URL {url} looks truncated.',
expected=True)

View File

@@ -0,0 +1,98 @@
import itertools
import re
from ._tab import YoutubeTabBaseInfoExtractor, YoutubeTabIE
from ._video import YoutubeIE
from ...utils import traverse_obj
class YoutubeNotificationsIE(YoutubeTabBaseInfoExtractor):
IE_NAME = 'youtube:notif'
IE_DESC = 'YouTube notifications; ":ytnotif" keyword (requires cookies)'
_VALID_URL = r':ytnotif(?:ication)?s?'
_LOGIN_REQUIRED = True
_TESTS = [{
'url': ':ytnotif',
'only_matching': True,
}, {
'url': ':ytnotifications',
'only_matching': True,
}]
def _extract_notification_menu(self, response, continuation_list):
notification_list = traverse_obj(
response,
('actions', 0, 'openPopupAction', 'popup', 'multiPageMenuRenderer', 'sections', 0, 'multiPageMenuNotificationSectionRenderer', 'items'),
('actions', 0, 'appendContinuationItemsAction', 'continuationItems'),
expected_type=list) or []
continuation_list[0] = None
for item in notification_list:
entry = self._extract_notification_renderer(item.get('notificationRenderer'))
if entry:
yield entry
continuation = item.get('continuationItemRenderer')
if continuation:
continuation_list[0] = continuation
def _extract_notification_renderer(self, notification):
video_id = traverse_obj(
notification, ('navigationEndpoint', 'watchEndpoint', 'videoId'), expected_type=str)
url = f'https://www.youtube.com/watch?v={video_id}'
channel_id = None
if not video_id:
browse_ep = traverse_obj(
notification, ('navigationEndpoint', 'browseEndpoint'), expected_type=dict)
channel_id = self.ucid_or_none(traverse_obj(browse_ep, 'browseId', expected_type=str))
post_id = self._search_regex(
r'/post/(.+)', traverse_obj(browse_ep, 'canonicalBaseUrl', expected_type=str),
'post id', default=None)
if not channel_id or not post_id:
return
# The direct /post url redirects to this in the browser
url = f'https://www.youtube.com/channel/{channel_id}/community?lb={post_id}'
channel = traverse_obj(
notification, ('contextualMenu', 'menuRenderer', 'items', 1, 'menuServiceItemRenderer', 'text', 'runs', 1, 'text'),
expected_type=str)
notification_title = self._get_text(notification, 'shortMessage')
if notification_title:
notification_title = notification_title.replace('\xad', '') # remove soft hyphens
# TODO: handle recommended videos
title = self._search_regex(
rf'{re.escape(channel or "")}[^:]+: (.+)', notification_title,
'video title', default=None)
timestamp = (self._parse_time_text(self._get_text(notification, 'sentTimeText'))
if self._configuration_arg('approximate_date', ie_key=YoutubeTabIE)
else None)
return {
'_type': 'url',
'url': url,
'ie_key': (YoutubeIE if video_id else YoutubeTabIE).ie_key(),
'video_id': video_id,
'title': title,
'channel_id': channel_id,
'channel': channel,
'uploader': channel,
'thumbnails': self._extract_thumbnails(notification, 'videoThumbnail'),
'timestamp': timestamp,
}
def _notification_menu_entries(self, ytcfg):
continuation_list = [None]
response = None
for page in itertools.count(1):
ctoken = traverse_obj(
continuation_list, (0, 'continuationEndpoint', 'getNotificationMenuEndpoint', 'ctoken'), expected_type=str)
response = self._extract_response(
item_id=f'page {page}', query={'ctoken': ctoken} if ctoken else {}, ytcfg=ytcfg,
ep='notification/get_notification_menu', check_get_keys='actions',
headers=self.generate_api_headers(ytcfg=ytcfg, visitor_data=self._extract_visitor_data(response)))
yield from self._extract_notification_menu(response, continuation_list)
if not continuation_list[0]:
break
def _real_extract(self, url):
display_id = 'notifications'
ytcfg = self._download_ytcfg('web', display_id) if not self.skip_webpage else {}
self._report_playlist_authcheck(ytcfg)
return self.playlist_result(self._notification_menu_entries(ytcfg), display_id, display_id)

View File

@@ -0,0 +1,248 @@
import base64
import urllib.parse
from ._base import YoutubeBaseInfoExtractor
from ._tab import YoutubeTabIE
from ...utils import ExtractorError, classproperty, parse_qs, update_url_query, url_or_none
class YoutubeYtBeIE(YoutubeBaseInfoExtractor):
IE_DESC = 'youtu.be'
_VALID_URL = rf'https?://youtu\.be/(?P<id>[0-9A-Za-z_-]{{11}})/*?.*?\blist=(?P<playlist_id>{YoutubeBaseInfoExtractor._PLAYLIST_ID_RE})'
_TESTS = [{
'url': 'https://youtu.be/yeWKywCrFtk?list=PL2qgrgXsNUG5ig9cat4ohreBjYLAPC0J5',
'info_dict': {
'id': 'yeWKywCrFtk',
'ext': 'mp4',
'title': 'Small Scale Baler and Braiding Rugs',
'uploader': 'Backus-Page House Museum',
'uploader_id': '@backuspagemuseum',
'uploader_url': r're:https?://(?:www\.)?youtube\.com/@backuspagemuseum',
'upload_date': '20161008',
'description': 'md5:800c0c78d5eb128500bffd4f0b4f2e8a',
'categories': ['Nonprofits & Activism'],
'tags': list,
'like_count': int,
'age_limit': 0,
'playable_in_embed': True,
'thumbnail': r're:^https?://.*\.webp',
'channel': 'Backus-Page House Museum',
'channel_id': 'UCEfMCQ9bs3tjvjy1s451zaw',
'live_status': 'not_live',
'view_count': int,
'channel_url': 'https://www.youtube.com/channel/UCEfMCQ9bs3tjvjy1s451zaw',
'availability': 'public',
'duration': 59,
'comment_count': int,
'channel_follower_count': int,
'media_type': 'short',
},
'params': {
'noplaylist': True,
'skip_download': True,
},
}, {
'url': 'https://youtu.be/uWyaPkt-VOI?list=PL9D9FC436B881BA21',
'only_matching': True,
}]
def _real_extract(self, url):
mobj = self._match_valid_url(url)
video_id = mobj.group('id')
playlist_id = mobj.group('playlist_id')
return self.url_result(
update_url_query('https://www.youtube.com/watch', {
'v': video_id,
'list': playlist_id,
'feature': 'youtu.be',
}), ie=YoutubeTabIE.ie_key(), video_id=playlist_id)
class YoutubeLivestreamEmbedIE(YoutubeBaseInfoExtractor):
IE_DESC = 'YouTube livestream embeds'
_VALID_URL = r'https?://(?:\w+\.)?youtube\.com/embed/live_stream/?\?(?:[^#]+&)?channel=(?P<id>[^&#]+)'
_TESTS = [{
'url': 'https://www.youtube.com/embed/live_stream?channel=UC2_KI6RB__jGdlnK6dvFEZA',
'only_matching': True,
}]
def _real_extract(self, url):
channel_id = self._match_id(url)
return self.url_result(
f'https://www.youtube.com/channel/{channel_id}/live',
ie=YoutubeTabIE.ie_key(), video_id=channel_id)
class YoutubeYtUserIE(YoutubeBaseInfoExtractor):
IE_DESC = 'YouTube user videos; "ytuser:" prefix'
IE_NAME = 'youtube:user'
_VALID_URL = r'ytuser:(?P<id>.+)'
_TESTS = [{
'url': 'ytuser:phihag',
'only_matching': True,
}]
def _real_extract(self, url):
user_id = self._match_id(url)
return self.url_result(f'https://www.youtube.com/user/{user_id}', YoutubeTabIE, user_id)
class YoutubeFavouritesIE(YoutubeBaseInfoExtractor):
IE_NAME = 'youtube:favorites'
IE_DESC = 'YouTube liked videos; ":ytfav" keyword (requires cookies)'
_VALID_URL = r':ytfav(?:ou?rite)?s?'
_LOGIN_REQUIRED = True
_TESTS = [{
'url': ':ytfav',
'only_matching': True,
}, {
'url': ':ytfavorites',
'only_matching': True,
}]
def _real_extract(self, url):
return self.url_result(
'https://www.youtube.com/playlist?list=LL',
ie=YoutubeTabIE.ie_key())
class YoutubeFeedsInfoExtractor(YoutubeBaseInfoExtractor):
"""
Base class for feed extractors
Subclasses must re-define the _FEED_NAME property.
"""
_LOGIN_REQUIRED = True
_FEED_NAME = 'feeds'
@classproperty
def IE_NAME(cls):
return f'youtube:{cls._FEED_NAME}'
def _real_extract(self, url):
return self.url_result(
f'https://www.youtube.com/feed/{self._FEED_NAME}', ie=YoutubeTabIE.ie_key())
class YoutubeWatchLaterIE(YoutubeBaseInfoExtractor):
IE_NAME = 'youtube:watchlater'
IE_DESC = 'Youtube watch later list; ":ytwatchlater" keyword (requires cookies)'
_VALID_URL = r':ytwatchlater'
_TESTS = [{
'url': ':ytwatchlater',
'only_matching': True,
}]
def _real_extract(self, url):
return self.url_result(
'https://www.youtube.com/playlist?list=WL', ie=YoutubeTabIE.ie_key())
class YoutubeRecommendedIE(YoutubeFeedsInfoExtractor):
IE_DESC = 'YouTube recommended videos; ":ytrec" keyword'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/?(?:[?#]|$)|:ytrec(?:ommended)?'
_FEED_NAME = 'recommended'
_LOGIN_REQUIRED = False
_TESTS = [{
'url': ':ytrec',
'only_matching': True,
}, {
'url': ':ytrecommended',
'only_matching': True,
}, {
'url': 'https://youtube.com',
'only_matching': True,
}]
class YoutubeSubscriptionsIE(YoutubeFeedsInfoExtractor):
IE_DESC = 'YouTube subscriptions feed; ":ytsubs" keyword (requires cookies)'
_VALID_URL = r':ytsub(?:scription)?s?'
_FEED_NAME = 'subscriptions'
_TESTS = [{
'url': ':ytsubs',
'only_matching': True,
}, {
'url': ':ytsubscriptions',
'only_matching': True,
}]
class YoutubeHistoryIE(YoutubeFeedsInfoExtractor):
IE_DESC = 'Youtube watch history; ":ythis" keyword (requires cookies)'
_VALID_URL = r':ythis(?:tory)?'
_FEED_NAME = 'history'
_TESTS = [{
'url': ':ythistory',
'only_matching': True,
}]
class YoutubeShortsAudioPivotIE(YoutubeBaseInfoExtractor):
IE_DESC = 'YouTube Shorts audio pivot (Shorts using audio of a given video)'
IE_NAME = 'youtube:shorts:pivot:audio'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/source/(?P<id>[\w-]{11})/shorts'
_TESTS = [{
'url': 'https://www.youtube.com/source/Lyj-MZSAA9o/shorts',
'only_matching': True,
}]
@staticmethod
def _generate_audio_pivot_params(video_id):
"""
Generates sfv_audio_pivot browse params for this video id
"""
pb_params = b'\xf2\x05+\n)\x12\'\n\x0b%b\x12\x0b%b\x1a\x0b%b' % ((video_id.encode(),) * 3)
return urllib.parse.quote(base64.b64encode(pb_params).decode())
def _real_extract(self, url):
video_id = self._match_id(url)
return self.url_result(
f'https://www.youtube.com/feed/sfv_audio_pivot?bp={self._generate_audio_pivot_params(video_id)}',
ie=YoutubeTabIE)
class YoutubeConsentRedirectIE(YoutubeBaseInfoExtractor):
IE_NAME = 'youtube:consent'
IE_DESC = False # Do not list
_VALID_URL = r'https?://consent\.youtube\.com/m\?'
_TESTS = [{
'url': 'https://consent.youtube.com/m?continue=https%3A%2F%2Fwww.youtube.com%2Flive%2FqVv6vCqciTM%3Fcbrd%3D1&gl=NL&m=0&pc=yt&hl=en&src=1',
'info_dict': {
'id': 'qVv6vCqciTM',
'ext': 'mp4',
'age_limit': 0,
'uploader_id': '@sana_natori',
'comment_count': int,
'chapters': 'count:13',
'upload_date': '20221223',
'thumbnail': 'https://i.ytimg.com/vi/qVv6vCqciTM/maxresdefault.jpg',
'channel_url': 'https://www.youtube.com/channel/UCIdEIHpS0TdkqRkHL5OkLtA',
'uploader_url': 'https://www.youtube.com/@sana_natori',
'like_count': int,
'release_date': '20221223',
'tags': ['Vtuber', '月ノ美兎', '名取さな', 'にじさんじ', 'クリスマス', '3D配信'],
'title': '【 #インターネット女クリスマス 】3Dで歌ってはしゃぐインターネットの女たち【月美兎/名取さな】',
'view_count': int,
'playable_in_embed': True,
'duration': 4438,
'availability': 'public',
'channel_follower_count': int,
'channel_id': 'UCIdEIHpS0TdkqRkHL5OkLtA',
'categories': ['Entertainment'],
'live_status': 'was_live',
'release_timestamp': 1671793345,
'channel': 'さなちゃんねる',
'description': 'md5:6aebf95cc4a1d731aebc01ad6cc9806d',
'uploader': 'さなちゃんねる',
'channel_is_verified': True,
'heatmap': 'count:100',
},
'add_ie': ['Youtube'],
'params': {'skip_download': 'Youtube'},
}]
def _real_extract(self, url):
redirect_url = url_or_none(parse_qs(url).get('continue', [None])[-1])
if not redirect_url:
raise ExtractorError('Invalid cookie consent redirect URL', expected=True)
return self.url_result(redirect_url)

View File

@@ -0,0 +1,152 @@
import urllib.parse
from ._tab import YoutubeTabBaseInfoExtractor
from ..common import SearchInfoExtractor
from ...utils import join_nonempty, parse_qs
class YoutubeSearchIE(YoutubeTabBaseInfoExtractor, SearchInfoExtractor):
IE_DESC = 'YouTube search'
IE_NAME = 'youtube:search'
_SEARCH_KEY = 'ytsearch'
_SEARCH_PARAMS = 'EgIQAfABAQ==' # Videos only
_TESTS = [{
'url': 'ytsearch5:youtube-dl test video',
'playlist_count': 5,
'info_dict': {
'id': 'youtube-dl test video',
'title': 'youtube-dl test video',
},
}, {
'note': 'Suicide/self-harm search warning',
'url': 'ytsearch1:i hate myself and i wanna die',
'playlist_count': 1,
'info_dict': {
'id': 'i hate myself and i wanna die',
'title': 'i hate myself and i wanna die',
},
}]
class YoutubeSearchURLIE(YoutubeTabBaseInfoExtractor):
IE_DESC = 'YouTube search URLs with sorting and filter support'
IE_NAME = YoutubeSearchIE.IE_NAME + '_url'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/(?:results|search)\?([^#]+&)?(?:search_query|q)=(?:[^&]+)(?:[&#]|$)'
_TESTS = [{
'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video',
'playlist_mincount': 5,
'info_dict': {
'id': 'youtube-dl test video',
'title': 'youtube-dl test video',
},
}, {
'url': 'https://www.youtube.com/results?search_query=python&sp=EgIQAg%253D%253D',
'playlist_mincount': 5,
'info_dict': {
'id': 'python',
'title': 'python',
},
}, {
'url': 'https://www.youtube.com/results?search_query=%23cats',
'playlist_mincount': 1,
'info_dict': {
'id': '#cats',
'title': '#cats',
# The test suite does not have support for nested playlists
# 'entries': [{
# 'url': r're:https://(www\.)?youtube\.com/hashtag/cats',
# 'title': '#cats',
# }],
},
}, {
# Channel results
'url': 'https://www.youtube.com/results?search_query=kurzgesagt&sp=EgIQAg%253D%253D',
'info_dict': {
'id': 'kurzgesagt',
'title': 'kurzgesagt',
},
'playlist': [{
'info_dict': {
'_type': 'url',
'id': 'UCsXVk37bltHxD1rDPwtNM8Q',
'url': 'https://www.youtube.com/channel/UCsXVk37bltHxD1rDPwtNM8Q',
'ie_key': 'YoutubeTab',
'channel': 'Kurzgesagt In a Nutshell',
'description': 'md5:4ae48dfa9505ffc307dad26342d06bfc',
'title': 'Kurzgesagt In a Nutshell',
'channel_id': 'UCsXVk37bltHxD1rDPwtNM8Q',
# No longer available for search as it is set to the handle.
# 'playlist_count': int,
'channel_url': 'https://www.youtube.com/channel/UCsXVk37bltHxD1rDPwtNM8Q',
'thumbnails': list,
'uploader_id': '@kurzgesagt',
'uploader_url': 'https://www.youtube.com/@kurzgesagt',
'uploader': 'Kurzgesagt In a Nutshell',
'channel_is_verified': True,
'channel_follower_count': int,
},
}],
'params': {'extract_flat': True, 'playlist_items': '1'},
'playlist_mincount': 1,
}, {
'url': 'https://www.youtube.com/results?q=test&sp=EgQIBBgB',
'only_matching': True,
}]
def _real_extract(self, url):
qs = parse_qs(url)
query = (qs.get('search_query') or qs.get('q'))[0]
return self.playlist_result(self._search_results(query, qs.get('sp', (None,))[0]), query, query)
class YoutubeMusicSearchURLIE(YoutubeTabBaseInfoExtractor):
IE_DESC = 'YouTube music search URLs with selectable sections, e.g. #songs'
IE_NAME = 'youtube:music:search_url'
_VALID_URL = r'https?://music\.youtube\.com/search\?([^#]+&)?(?:search_query|q)=(?:[^&]+)(?:[&#]|$)'
_TESTS = [{
'url': 'https://music.youtube.com/search?q=royalty+free+music',
'playlist_count': 16,
'info_dict': {
'id': 'royalty free music',
'title': 'royalty free music',
},
}, {
'url': 'https://music.youtube.com/search?q=royalty+free+music&sp=EgWKAQIIAWoKEAoQAxAEEAkQBQ%3D%3D',
'playlist_mincount': 30,
'info_dict': {
'id': 'royalty free music - songs',
'title': 'royalty free music - songs',
},
'params': {'extract_flat': 'in_playlist'},
}, {
'url': 'https://music.youtube.com/search?q=royalty+free+music#community+playlists',
'playlist_mincount': 30,
'info_dict': {
'id': 'royalty free music - community playlists',
'title': 'royalty free music - community playlists',
},
'params': {'extract_flat': 'in_playlist'},
}]
_SECTIONS = {
'albums': 'EgWKAQIYAWoKEAoQAxAEEAkQBQ==',
'artists': 'EgWKAQIgAWoKEAoQAxAEEAkQBQ==',
'community playlists': 'EgeKAQQoAEABagoQChADEAQQCRAF',
'featured playlists': 'EgeKAQQoADgBagwQAxAJEAQQDhAKEAU==',
'songs': 'EgWKAQIIAWoKEAoQAxAEEAkQBQ==',
'videos': 'EgWKAQIQAWoKEAoQAxAEEAkQBQ==',
}
def _real_extract(self, url):
qs = parse_qs(url)
query = (qs.get('search_query') or qs.get('q'))[0]
params = qs.get('sp', (None,))[0]
if params:
section = next((k for k, v in self._SECTIONS.items() if v == params), params)
else:
section = urllib.parse.unquote_plus(([*url.split('#'), ''])[1]).lower()
params = self._SECTIONS.get(section)
if not params:
section = None
title = join_nonempty(query, section, delim=' - ')
return self.playlist_result(self._search_results(query, params, default_client='web_music'), title, title)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,5 @@
# Trigger import of built-in providers
from ._builtin.bun import BunJCP as _BunJCP # noqa: F401
from ._builtin.deno import DenoJCP as _DenoJCP # noqa: F401
from ._builtin.node import NodeJCP as _NodeJCP # noqa: F401
from ._builtin.quickjs import QuickJSJCP as _QuickJSJCP # noqa: F401

View File

@@ -0,0 +1,150 @@
from __future__ import annotations
import os
import re
import shlex
import subprocess
import urllib.parse
from yt_dlp.extractor.youtube.jsc._builtin.ejs import (
_EJS_WIKI_URL,
EJSBaseJCP,
Script,
ScriptSource,
ScriptType,
ScriptVariant,
)
from yt_dlp.extractor.youtube.jsc._builtin.vendor import load_script
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import Popen
from yt_dlp.utils.networking import HTTPHeaderDict, clean_proxies
# KNOWN ISSUES:
# - If node_modules is present and includes a requested lib, the version we request is ignored
# and whatever installed in node_modules is used.
# - No way to ignore existing node_modules, lock files, etc.
# - No sandboxing options available
# - Cannot detect if npm packages are cached without potentially downloading them.
# `--no-install` appears to disable the cache.
# - npm auto-install may fail with an integrity error when using HTTP proxies
# - npm auto-install HTTP proxy support may be limited on older Bun versions
# - Cannot disable the transpiler / specify lang for stdin
@register_provider
class BunJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'bun'
JS_RUNTIME_NAME = 'bun'
BUN_NPM_LIB_FILENAME = 'yt.solver.bun.lib.js'
SUPPORTED_PROXY_SCHEMES = ['http', 'https']
def _iter_script_sources(self):
yield from super()._iter_script_sources()
yield ScriptSource.BUILTIN, self._bun_npm_source
def _bun_npm_source(self, script_type: ScriptType, /):
if script_type != ScriptType.LIB:
return None
if 'ejs:npm' not in self.ie.get_param('remote_components', []):
return self._skip_component('ejs:npm')
# Check to see if the environment proxies are compatible with Bun npm source
if unsupported_scheme := self._check_env_proxies(self._get_env_options()):
self.logger.warning(
f'Bun NPM package downloads only support HTTP/HTTPS proxies; skipping remote NPM package downloads. '
f'Provide another distribution of the challenge solver script or use '
f'another JS runtime that supports "{unsupported_scheme}" proxies. '
f'For more information and alternatives, refer to {_EJS_WIKI_URL}')
return None
# Bun-specific lib scripts that uses Bun autoimport
# https://bun.com/docs/runtime/autoimport
error_hook = lambda e: self.logger.warning(
f'Failed to read bun challenge solver lib script: {e}{provider_bug_report_message(self)}')
code = load_script(
self.BUN_NPM_LIB_FILENAME, error_hook=error_hook)
if code:
return Script(script_type, ScriptVariant.BUN_NPM, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
return None
def _check_env_proxies(self, env):
# check that the schemes of both HTTP_PROXY and HTTPS_PROXY are supported
for key in ('HTTP_PROXY', 'HTTPS_PROXY'):
proxy = env.get(key)
if not proxy:
continue
scheme = urllib.parse.urlparse(proxy).scheme.lower()
if scheme not in self.SUPPORTED_PROXY_SCHEMES:
return scheme
return None
def _get_env_options(self) -> dict[str, str]:
options = os.environ.copy() # pass through existing bun env vars
request_proxies = self.ie._downloader.proxies.copy()
clean_proxies(request_proxies, HTTPHeaderDict())
# Apply 'all' proxy first, then allow per-scheme overrides
if request_proxies.get('all') is not None:
options['HTTP_PROXY'] = options['HTTPS_PROXY'] = request_proxies['all']
for key, env in (('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY')):
val = request_proxies.get(key)
if val is not None:
options[env] = val
if self.ie.get_param('nocheckcertificate'):
options['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'
# Disable Bun transpiler cache
options['BUN_RUNTIME_TRANSPILER_CACHE_PATH'] = '0'
# Prevent segfault: <https://github.com/oven-sh/bun/issues/22901>
options.pop('JSC_useJIT', None)
if self.ejs_setting('jitless', ['false']) != ['false']:
options['BUN_JSC_useJIT'] = '0'
return options
def _run_js_runtime(self, stdin: str, /) -> str:
# https://bun.com/docs/cli/run
options = ['--no-addons', '--prefer-offline']
if self._lib_script.variant == ScriptVariant.BUN_NPM:
# Enable auto-install even if node_modules is present
options.append('--install=fallback')
else:
options.append('--no-install')
cmd = [self.runtime_info.path, '--bun', 'run', *options, '-']
self.logger.debug(f'Running bun: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env_options(),
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running bun process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not re.match(r'^Bun v\d+\.\d+\.\d+ \([\w\s]+\)$', line))
@register_preference(BunJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 800

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import os
import re
import shlex
import subprocess
from yt_dlp.extractor.youtube.jsc._builtin.ejs import (
EJSBaseJCP,
Script,
ScriptSource,
ScriptType,
ScriptVariant,
)
from yt_dlp.extractor.youtube.jsc._builtin.vendor import load_script
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import Popen, remove_terminal_sequences
from yt_dlp.utils.networking import HTTPHeaderDict, clean_proxies
# KNOWN ISSUES:
# - Can't avoid analysis cache: https://github.com/yt-dlp/yt-dlp/pull/14849#issuecomment-3475840821
@register_provider
class DenoJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'deno'
JS_RUNTIME_NAME = 'deno'
_DENO_BASE_OPTIONS = [
'--ext=js', '--no-code-cache', '--no-prompt', '--no-remote',
'--no-lock', '--node-modules-dir=none', '--no-config',
]
DENO_NPM_LIB_FILENAME = 'yt.solver.deno.lib.js'
_NPM_PACKAGES_CACHED = False
def _iter_script_sources(self):
yield from super()._iter_script_sources()
yield ScriptSource.BUILTIN, self._deno_npm_source
def _deno_npm_source(self, script_type: ScriptType, /):
if script_type != ScriptType.LIB:
return None
# Deno-specific lib scripts that use Deno NPM imports
error_hook = lambda e: self.logger.warning(
f'Failed to read deno challenge solver lib script: {e}{provider_bug_report_message(self)}')
code = load_script(
self.DENO_NPM_LIB_FILENAME, error_hook=error_hook)
if not code:
return None
if 'ejs:npm' not in self.ie.get_param('remote_components', []):
# We may still be able to continue if the npm packages are available/cached
self._NPM_PACKAGES_CACHED = self._npm_packages_cached(code)
if not self._NPM_PACKAGES_CACHED:
return self._skip_component('ejs:npm')
return Script(script_type, ScriptVariant.DENO_NPM, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
def _npm_packages_cached(self, stdin: str) -> bool:
# Check if npm packages are cached, so we can run without --remote-components ejs:npm
self.logger.debug('Checking if npm packages are cached')
try:
self._run_deno(stdin, [*self._DENO_BASE_OPTIONS, '--cached-only'])
except JsChallengeProviderError as e:
self.logger.trace(f'Deno npm packages not cached: {e}')
return False
return True
def _run_js_runtime(self, stdin: str, /) -> str:
options = [*self._DENO_BASE_OPTIONS]
if self._lib_script.variant == ScriptVariant.DENO_NPM and self._NPM_PACKAGES_CACHED:
options.append('--cached-only')
elif self._lib_script.variant != ScriptVariant.DENO_NPM:
options.append('--no-npm')
options.append('--cached-only')
if self.ie.get_param('nocheckcertificate'):
options.append('--unsafely-ignore-certificate-errors')
# XXX: Convert this extractor-arg into a general option if/when a JSI framework is implemented
if self.ejs_setting('jitless', ['false']) != ['false']:
options.append('--v8-flags=--jitless')
return self._run_deno(stdin, options)
def _get_env_options(self) -> dict[str, str]:
options = os.environ.copy() # pass through existing deno env vars
request_proxies = self.ie._downloader.proxies.copy()
clean_proxies(request_proxies, HTTPHeaderDict())
# Apply 'all' proxy first, then allow per-scheme overrides
if 'all' in request_proxies and request_proxies['all'] is not None:
options['HTTP_PROXY'] = options['HTTPS_PROXY'] = request_proxies['all']
for key, env in (('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY'), ('no', 'NO_PROXY')):
if key in request_proxies and request_proxies[key] is not None:
options[env] = request_proxies[key]
return options
def _run_deno(self, stdin, options) -> str:
cmd = [self.runtime_info.path, 'run', *options, '-']
self.logger.debug(f'Running deno: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._get_env_options(),
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running deno process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not (
re.match(r'^Download\s+https\S+$', remove_terminal_sequences(line))
or re.match(r'DANGER: TLS certificate validation is disabled for all hostnames', remove_terminal_sequences(line))))
@register_preference(DenoJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 1000

View File

@@ -0,0 +1,328 @@
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import hashlib
import json
from yt_dlp.dependencies import yt_dlp_ejs as _has_ejs
from yt_dlp.extractor.youtube.jsc._builtin import vendor
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeProviderRejectedRequest,
JsChallengeProviderResponse,
JsChallengeResponse,
JsChallengeType,
NChallengeOutput,
SigChallengeOutput,
)
from yt_dlp.extractor.youtube.pot._provider import configuration_arg
from yt_dlp.extractor.youtube.pot.provider import provider_bug_report_message
from yt_dlp.utils import version_tuple
from yt_dlp.utils._jsruntime import JsRuntimeInfo
if _has_ejs:
import yt_dlp_ejs.yt.solver
TYPE_CHECKING = False
if TYPE_CHECKING:
from collections.abc import Callable, Generator
from yt_dlp.extractor.youtube.jsc.provider import JsChallengeRequest
_EJS_WIKI_URL = 'https://github.com/yt-dlp/yt-dlp/wiki/EJS'
class ScriptType(enum.Enum):
LIB = 'lib'
CORE = 'core'
class ScriptVariant(enum.Enum):
UNKNOWN = 'unknown'
MINIFIED = 'minified'
UNMINIFIED = 'unminified'
DENO_NPM = 'deno_npm'
BUN_NPM = 'bun_npm'
class ScriptSource(enum.Enum):
PYPACKAGE = 'python package' # PyPI, PyInstaller exe, zipimport binary, etc
CACHE = 'cache' # GitHub release assets (cached)
WEB = 'web' # GitHub release assets (downloaded)
BUILTIN = 'builtin' # vendored (full core script; import-only lib script + NPM cache)
@dataclasses.dataclass
class Script:
type: ScriptType
variant: ScriptVariant
source: ScriptSource
version: str
code: str
@functools.cached_property
def hash(self, /) -> str:
return hashlib.sha3_512(self.code.encode()).hexdigest()
def __str__(self, /):
return f'<Script {self.type.value!r} v{self.version} (source: {self.source.value}) variant={self.variant.value!r} size={len(self.code)} hash={self.hash[:7]}...>'
class EJSBaseJCP(JsChallengeProvider):
JS_RUNTIME_NAME: str
_CACHE_SECTION = 'challenge-solver'
_REPOSITORY = 'yt-dlp/ejs'
_SUPPORTED_TYPES = [JsChallengeType.N, JsChallengeType.SIG]
_SCRIPT_VERSION = vendor.VERSION
# TODO: Integration tests for each kind of scripts source
_ALLOWED_HASHES = {
ScriptType.LIB: {
ScriptVariant.UNMINIFIED: vendor.HASHES['yt.solver.lib.js'],
ScriptVariant.MINIFIED: vendor.HASHES['yt.solver.lib.min.js'],
ScriptVariant.DENO_NPM: vendor.HASHES['yt.solver.deno.lib.js'],
ScriptVariant.BUN_NPM: vendor.HASHES['yt.solver.bun.lib.js'],
},
ScriptType.CORE: {
ScriptVariant.MINIFIED: vendor.HASHES['yt.solver.core.min.js'],
ScriptVariant.UNMINIFIED: vendor.HASHES['yt.solver.core.js'],
},
}
_SCRIPT_FILENAMES = {
ScriptType.LIB: 'yt.solver.lib.js',
ScriptType.CORE: 'yt.solver.core.js',
}
_MIN_SCRIPT_FILENAMES = {
ScriptType.LIB: 'yt.solver.lib.min.js',
ScriptType.CORE: 'yt.solver.core.min.js',
}
# currently disabled as files are large and we do not support rotation
_ENABLE_PREPROCESSED_PLAYER_CACHE = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._available = True
self.ejs_settings = self.ie.get_param('extractor_args', {}).get('youtube-ejs', {})
# Note: The following 3 args are for developer use only & intentionally not documented.
# - dev: bypasses verification of script hashes and versions.
# - repo: use a custom GitHub repository to fetch web script from.
# - script_version: use a custom script version.
# E.g. --extractor-args "youtube-ejs:dev=true;script_version=0.1.4"
self.is_dev = self.ejs_setting('dev', ['false'])[0] == 'true'
if self.is_dev:
self.report_dev_option('You have enabled dev mode for EJS JCP Providers.')
custom_repo = self.ejs_setting('repo', [None])[0]
if custom_repo:
self.report_dev_option(f'You have set a custom GitHub repository for EJS JCP Providers ({custom_repo}).')
self._REPOSITORY = custom_repo
custom_version = self.ejs_setting('script_version', [None])[0]
if custom_version:
self.report_dev_option(f'You have set a custom EJS script version for EJS JCP Providers ({custom_version}).')
self._SCRIPT_VERSION = custom_version
def ejs_setting(self, key, *args, **kwargs):
return configuration_arg(self.ejs_settings, key, *args, **kwargs)
def report_dev_option(self, message: str):
self.ie.report_warning(
f'{message} '
f'This is a developer option intended for debugging. \n'
' If you experience any issues while using this option, '
f'{self.ie._downloader._format_err("DO NOT", self.ie._downloader.Styles.ERROR)} open a bug report', only_once=True)
def _run_js_runtime(self, stdin: str, /) -> str:
"""To be implemented by subclasses"""
raise NotImplementedError
def _real_bulk_solve(self, /, requests: list[JsChallengeRequest]):
grouped: dict[str, list[JsChallengeRequest]] = collections.defaultdict(list)
for request in requests:
grouped[request.input.player_url].append(request)
for player_url, grouped_requests in grouped.items():
player = None
if self._ENABLE_PREPROCESSED_PLAYER_CACHE:
player = self.ie.cache.load(self._CACHE_SECTION, f'player:{player_url}')
if player:
cached = True
else:
cached = False
video_id = next((request.video_id for request in grouped_requests), None)
player = self._get_player(video_id, player_url)
# NB: This output belongs after the player request
self.logger.info(f'Solving JS challenges using {self.JS_RUNTIME_NAME}')
stdin = self._construct_stdin(player, cached, grouped_requests)
stdout = self._run_js_runtime(stdin)
output = json.loads(stdout)
if output['type'] == 'error':
raise JsChallengeProviderError(output['error'])
if self._ENABLE_PREPROCESSED_PLAYER_CACHE and (preprocessed := output.get('preprocessed_player')):
self.ie.cache.store(self._CACHE_SECTION, f'player:{player_url}', preprocessed)
for request, response_data in zip(grouped_requests, output['responses'], strict=True):
if response_data['type'] == 'error':
yield JsChallengeProviderResponse(request, None, response_data['error'])
else:
yield JsChallengeProviderResponse(request, JsChallengeResponse(request.type, (
NChallengeOutput(response_data['data']) if request.type is JsChallengeType.N
else SigChallengeOutput(response_data['data']))))
def _construct_stdin(self, player: str, preprocessed: bool, requests: list[JsChallengeRequest], /) -> str:
json_requests = [{
'type': request.type.value,
'challenges': request.input.challenges,
} for request in requests]
data = {
'type': 'preprocessed',
'preprocessed_player': player,
'requests': json_requests,
} if preprocessed else {
'type': 'player',
'player': player,
'requests': json_requests,
'output_preprocessed': True,
}
return f'''\
{self._lib_script.code}
Object.assign(globalThis, lib);
{self._core_script.code}
console.log(JSON.stringify(jsc({json.dumps(data)})));
'''
# region: challenge solver script
@functools.cached_property
def _lib_script(self, /):
return self._get_script(ScriptType.LIB)
@functools.cached_property
def _core_script(self, /):
return self._get_script(ScriptType.CORE)
def _get_script(self, script_type: ScriptType, /) -> Script:
skipped_components: list[_SkippedComponent] = []
for _, from_source in self._iter_script_sources():
script = from_source(script_type)
if not script:
continue
if isinstance(script, _SkippedComponent):
skipped_components.append(script)
continue
if not self.is_dev:
# Matching patch version is expected to have same hash
if version_tuple(script.version, lenient=True)[:2] != version_tuple(self._SCRIPT_VERSION, lenient=True)[:2]:
self.logger.warning(
f'Challenge solver {script_type.value} script version {script.version} '
f'is not supported (source: {script.source.value}, variant: {script.variant}, supported version: {self._SCRIPT_VERSION})')
if script.source is ScriptSource.CACHE:
self.logger.debug('Clearing outdated cached script')
self.ie.cache.store(self._CACHE_SECTION, script_type.value, None)
continue
script_hashes = self._ALLOWED_HASHES[script.type].get(script.variant, [])
if script_hashes and script.hash not in script_hashes:
self.logger.warning(
f'Hash mismatch on challenge solver {script.type.value} script '
f'(source: {script.source.value}, variant: {script.variant}, hash: {script.hash})!{provider_bug_report_message(self)}')
if script.source is ScriptSource.CACHE:
self.logger.debug('Clearing invalid cached script')
self.ie.cache.store(self._CACHE_SECTION, script_type.value, None)
continue
self.logger.debug(
f'Using challenge solver {script.type.value} script v{script.version} '
f'(source: {script.source.value}, variant: {script.variant.value})')
break
else:
self._available = False
raise JsChallengeProviderRejectedRequest(
f'No usable challenge solver {script_type.value} script available',
_skipped_components=skipped_components or None,
)
return script
def _iter_script_sources(self) -> Generator[tuple[ScriptSource, Callable[[ScriptType], Script | None]]]:
yield from [
(ScriptSource.PYPACKAGE, self._pypackage_source),
(ScriptSource.CACHE, self._cached_source),
(ScriptSource.BUILTIN, self._builtin_source),
(ScriptSource.WEB, self._web_release_source)]
def _pypackage_source(self, script_type: ScriptType, /) -> Script | None:
if not _has_ejs:
return None
try:
code = yt_dlp_ejs.yt.solver.core() if script_type is ScriptType.CORE else yt_dlp_ejs.yt.solver.lib()
except Exception as e:
self.logger.warning(
f'Failed to load challenge solver {script_type.value} script from python package: {e}{provider_bug_report_message(self)}')
return None
return Script(script_type, ScriptVariant.MINIFIED, ScriptSource.PYPACKAGE, yt_dlp_ejs.version, code)
def _cached_source(self, script_type: ScriptType, /) -> Script | None:
if data := self.ie.cache.load(self._CACHE_SECTION, script_type.value):
return Script(script_type, ScriptVariant(data['variant']), ScriptSource.CACHE, data['version'], data['code'])
return None
def _builtin_source(self, script_type: ScriptType, /) -> Script | None:
error_hook = lambda _: self.logger.warning(
f'Failed to read builtin challenge solver {script_type.value} script{provider_bug_report_message(self)}')
code = vendor.load_script(
self._SCRIPT_FILENAMES[script_type], error_hook=error_hook)
if code:
return Script(script_type, ScriptVariant.UNMINIFIED, ScriptSource.BUILTIN, self._SCRIPT_VERSION, code)
return None
def _web_release_source(self, script_type: ScriptType, /):
if 'ejs:github' not in (self.ie.get_param('remote_components') or ()):
return self._skip_component('ejs:github')
url = f'https://github.com/{self._REPOSITORY}/releases/download/{self._SCRIPT_VERSION}/{self._MIN_SCRIPT_FILENAMES[script_type]}'
if code := self.ie._download_webpage_with_retries(
url, None, f'[{self.logger.prefix}] Downloading challenge solver {script_type.value} script from {url}',
f'[{self.logger.prefix}] Failed to download challenge solver {script_type.value} script', fatal=False,
):
self.ie.cache.store(self._CACHE_SECTION, script_type.value, {
'version': self._SCRIPT_VERSION,
'variant': ScriptVariant.MINIFIED.value,
'code': code,
})
return Script(script_type, ScriptVariant.MINIFIED, ScriptSource.WEB, self._SCRIPT_VERSION, code)
return None
# endregion: challenge solver script
@property
def runtime_info(self) -> JsRuntimeInfo | None:
runtime = self.ie._downloader._js_runtimes.get(self.JS_RUNTIME_NAME)
if not runtime or not runtime.info or not runtime.info.supported:
return None
return runtime.info
def is_available(self, /) -> bool:
if not self.runtime_info:
return False
return self._available
def _skip_component(self, component: str, /):
return _SkippedComponent(component, self.JS_RUNTIME_NAME)
@dataclasses.dataclass
class _SkippedComponent:
component: str
runtime: str

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import re
import shlex
import subprocess
from yt_dlp.extractor.youtube.jsc._builtin.ejs import EJSBaseJCP
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.utils import Popen
@register_provider
class NodeJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'node'
JS_RUNTIME_NAME = 'node'
_ARGS = ['-']
def _run_js_runtime(self, stdin: str, /) -> str:
args = []
if self.ejs_setting('jitless', ['false']) != ['false']:
args.append('--v8-flags=--jitless')
# Node permission flag changed from experimental to stable in v23.5.0
if self.runtime_info.version_tuple < (23, 5, 0):
args.append('--experimental-permission')
args.append('--no-warnings=ExperimentalWarning')
else:
args.append('--permission')
cmd = [self.runtime_info.path, *args, *self._ARGS]
self.logger.debug(f'Running node: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stdout, stderr = proc.communicate_or_kill(stdin)
stderr = self._clean_stderr(stderr)
if proc.returncode or stderr:
msg = f'Error running node process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
return stdout
def _clean_stderr(self, stderr):
return '\n'.join(
line for line in stderr.splitlines()
if not (
re.match(r'^\[stdin\]:', line)
or re.match(r'^var jsc', line)
or '(Use `node --trace-uncaught ...` to show where the exception was thrown)' == line
or re.match(r'^Node\.js v\d+\.\d+\.\d+$', line)))
@register_preference(NodeJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 900

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import pathlib
import shlex
import subprocess
import tempfile
from yt_dlp.extractor.youtube.jsc._builtin.ejs import EJSBaseJCP
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeRequest,
register_preference,
register_provider,
)
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.utils import Popen
@register_provider
class QuickJSJCP(EJSBaseJCP, BuiltinIEContentProvider):
PROVIDER_NAME = 'quickjs'
JS_RUNTIME_NAME = 'quickjs'
def _run_js_runtime(self, stdin: str, /) -> str:
if self.runtime_info.name == 'quickjs-ng':
self.logger.warning('QuickJS-NG is missing some optimizations making this very slow. Consider using upstream QuickJS instead.')
elif self.runtime_info.version_tuple < (2025, 4, 26):
self.logger.warning('Older QuickJS versions are missing optimizations making this very slow. Consider upgrading.')
# QuickJS does not support reading from stdin, so we have to use a temp file
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False, encoding='utf-8')
try:
temp_file.write(stdin)
temp_file.close()
cmd = [self.runtime_info.path, '--script', temp_file.name]
self.logger.debug(f'Running QuickJS: {shlex.join(cmd)}')
with Popen(
cmd,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
stdout, stderr = proc.communicate_or_kill()
if proc.returncode or stderr:
msg = f'Error running QuickJS process (returncode: {proc.returncode})'
if stderr:
msg = f'{msg}: {stderr.strip()}'
raise JsChallengeProviderError(msg)
finally:
pathlib.Path(temp_file.name).unlink(missing_ok=True)
return stdout
@register_preference(QuickJSJCP)
def preference(provider: JsChallengeProvider, requests: list[JsChallengeRequest]) -> int:
return 850

View File

@@ -0,0 +1,17 @@
import importlib.resources
from yt_dlp.extractor.youtube.jsc._builtin.vendor._info import HASHES, VERSION
__all__ = ['HASHES', 'VERSION', 'load_script']
def load_script(filename, error_hook=None):
file = importlib.resources.files(__package__) / filename
if file.is_file():
try:
return file.read_text(encoding='utf-8')
except (OSError, FileNotFoundError, ModuleNotFoundError) as e:
if error_hook:
error_hook(e)
return None
return None

View File

@@ -0,0 +1,11 @@
# This file is generated by devscripts/update_ejs.py. DO NOT MODIFY!
VERSION = '0.8.0'
HASHES = {
'yt.solver.bun.lib.js': '6ff45e94de9f0ea936a183c48173cfa9ce526ee4b7544cd556428427c1dd53c8073ef0174e79b320252bf0e7c64b0032cc1cf9c4358f3fda59033b7caa01c241',
'yt.solver.core.js': 'c163a6f376db6ce3da47d516a28a8f2a0554ae95c58dc766f0a6e2b3894f2cef1ee07fa84beb442fa471aac4f300985added1657c7c94c4d1cfefe68920ab599',
'yt.solver.core.min.js': 'ee5b307d07f55e91e4723edf5ac205cc877a474187849d757dc1322e38427b157a9d706d510c1723d3670f98e5a3f8cbcde77874a80406bd7204bc9fea30f283',
'yt.solver.deno.lib.js': '9c8ee3ab6c23e443a5a951e3ac73c6b8c1c8fb34335e7058a07bf99d349be5573611de00536dcd03ecd3cf34014c4e9b536081de37af3637c5390c6a6fd6a0f0',
'yt.solver.lib.js': '1ee3753a8222fc855f5c39db30a9ccbb7967dbe1fb810e86dc9a89aa073a0907f294c720e9b65427d560a35aa1ce6af19ef854d9126a05ca00afe03f72047733',
'yt.solver.lib.min.js': '8420c259ad16e99ce004e4651ac1bcabb53b4457bf5668a97a9359be9a998a789fee8ab124ee17f91a2ea8fd84e0f2b2fc8eabcaf0b16a186ba734cf422ad053',
}

View File

@@ -0,0 +1,9 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
const lib = {
meriyah: await import('meriyah@6.1.4'),
astring: await import('astring@1.9.0'),
};
export { lib };

View File

@@ -0,0 +1,368 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
var jsc = (function (meriyah, astring) {
'use strict';
function matchesStructure(obj, structure) {
if (Array.isArray(structure)) {
if (!Array.isArray(obj)) {
return false;
}
return (
structure.length === obj.length &&
structure.every((value, index) => matchesStructure(obj[index], value))
);
}
if (typeof structure === 'object') {
if (!obj) {
return !structure;
}
if ('or' in structure) {
return structure.or.some((node) => matchesStructure(obj, node));
}
if ('anykey' in structure && Array.isArray(structure.anykey)) {
const haystack = Array.isArray(obj) ? obj : Object.values(obj);
return structure.anykey.every((value) =>
haystack.some((el) => matchesStructure(el, value)),
);
}
for (const [key, value] of Object.entries(structure)) {
if (!matchesStructure(obj[key], value)) {
return false;
}
}
return true;
}
return structure === obj;
}
function isOneOf(value, ...of) {
return of.includes(value);
}
function generateArrowFunction(data) {
return meriyah.parse(data).body[0].expression;
}
function _optionalChain$1(ops) {
let lastAccessLHS = undefined;
let value = ops[0];
let i = 1;
while (i < ops.length) {
const op = ops[i];
const fn = ops[i + 1];
i += 2;
if ((op === 'optionalAccess' || op === 'optionalCall') && value == null) {
return undefined;
}
if (op === 'access' || op === 'optionalAccess') {
lastAccessLHS = value;
value = fn(value);
} else if (op === 'call' || op === 'optionalCall') {
value = fn((...args) => value.call(lastAccessLHS, ...args));
lastAccessLHS = undefined;
}
}
return value;
}
const identifier = {
or: [
{
type: 'ExpressionStatement',
expression: {
type: 'AssignmentExpression',
operator: '=',
left: { or: [{ type: 'Identifier' }, { type: 'MemberExpression' }] },
right: { type: 'FunctionExpression', async: false },
},
},
{ type: 'FunctionDeclaration', async: false, id: { type: 'Identifier' } },
{
type: 'VariableDeclaration',
declarations: {
anykey: [
{
type: 'VariableDeclarator',
init: { type: 'FunctionExpression', async: false },
},
],
},
},
],
};
const asdasd = {
type: 'ExpressionStatement',
expression: {
type: 'CallExpression',
callee: {
type: 'MemberExpression',
object: { type: 'Identifier' },
property: {},
optional: false,
},
arguments: [
{ type: 'Literal', value: 'alr' },
{ type: 'Literal', value: 'yes' },
],
optional: false,
},
};
function extract(node) {
if (!matchesStructure(node, identifier)) {
return null;
}
const options = [];
if (node.type === 'FunctionDeclaration') {
if (
node.id &&
_optionalChain$1([
node,
'access',
(_) => _.body,
'optionalAccess',
(_2) => _2.body,
])
) {
options.push({
name: node.id,
statements: _optionalChain$1([
node,
'access',
(_3) => _3.body,
'optionalAccess',
(_4) => _4.body,
]),
});
}
} else if (node.type === 'ExpressionStatement') {
if (node.expression.type !== 'AssignmentExpression') {
return null;
}
const name = node.expression.left;
const body = _optionalChain$1([
node.expression.right,
'optionalAccess',
(_5) => _5.body,
'optionalAccess',
(_6) => _6.body,
]);
if (name && body) {
options.push({ name: name, statements: body });
}
} else if (node.type === 'VariableDeclaration') {
for (const declaration of node.declarations) {
const name = declaration.id;
const body = _optionalChain$1([
declaration.init,
'optionalAccess',
(_7) => _7.body,
'optionalAccess',
(_8) => _8.body,
]);
if (name && body) {
options.push({ name: name, statements: body });
}
}
}
for (const { name: name, statements: statements } of options) {
if (matchesStructure(statements, { anykey: [asdasd] })) {
return createSolver(name);
}
}
return null;
}
function createSolver(expression) {
return generateArrowFunction(
`\n({sig, n}) => {\n const url = (${astring.generate(expression)})("https://youtube.com/watch?v=yt-dlp-wins", "s", sig ? encodeURIComponent(sig) : undefined);\n url.set("n", n);\n const proto = Object.getPrototypeOf(url);\n const keys = Object.keys(proto).concat(Object.getOwnPropertyNames(proto));\n for (const key of keys) {\n if (!["constructor", "set", "get", "clone"].includes(key)) {\n url[key]();\n break;\n }\n }\n const s = url.get("s");\n return {\n sig: s ? decodeURIComponent(s) : null,\n n: url.get("n") ?? null,\n };\n}\n`,
);
}
const setupNodes = meriyah.parse(
`\nif (typeof globalThis.XMLHttpRequest === "undefined") {\n globalThis.XMLHttpRequest = { prototype: {} };\n}\nif (typeof URL === "undefined") {\n globalThis.location = {\n hash: "",\n host: "www.youtube.com",\n hostname: "www.youtube.com",\n href: "https://www.youtube.com/watch?v=yt-dlp-wins",\n origin: "https://www.youtube.com",\n password: "",\n pathname: "/watch",\n port: "",\n protocol: "https:",\n search: "?v=yt-dlp-wins",\n username: "",\n };\n} else {\n globalThis.location = new URL("https://www.youtube.com/watch?v=yt-dlp-wins");\n}\nif (typeof globalThis.document === "undefined") {\n globalThis.document = Object.create(null);\n}\nif (typeof globalThis.navigator === "undefined") {\n globalThis.navigator = Object.create(null);\n}\nif (typeof globalThis.self === "undefined") {\n globalThis.self = globalThis;\n}\nif (typeof globalThis.window === "undefined") {\n globalThis.window = globalThis;\n}\n`,
).body;
function _optionalChain(ops) {
let lastAccessLHS = undefined;
let value = ops[0];
let i = 1;
while (i < ops.length) {
const op = ops[i];
const fn = ops[i + 1];
i += 2;
if ((op === 'optionalAccess' || op === 'optionalCall') && value == null) {
return undefined;
}
if (op === 'access' || op === 'optionalAccess') {
lastAccessLHS = value;
value = fn(value);
} else if (op === 'call' || op === 'optionalCall') {
value = fn((...args) => value.call(lastAccessLHS, ...args));
lastAccessLHS = undefined;
}
}
return value;
}
function preprocessPlayer(data) {
const program = meriyah.parse(data);
const plainStatements = modifyPlayer(program);
const solutions = getSolutions(plainStatements);
for (const [name, options] of Object.entries(solutions)) {
plainStatements.push({
type: 'ExpressionStatement',
expression: {
type: 'AssignmentExpression',
operator: '=',
left: {
type: 'MemberExpression',
computed: false,
object: { type: 'Identifier', name: '_result' },
property: { type: 'Identifier', name: name },
optional: false,
},
right: multiTry(options),
},
});
}
program.body.splice(0, 0, ...setupNodes);
return astring.generate(program);
}
function modifyPlayer(program) {
const body = program.body;
const block = (() => {
switch (body.length) {
case 1: {
const func = body[0];
if (
_optionalChain([func, 'optionalAccess', (_) => _.type]) ===
'ExpressionStatement' &&
func.expression.type === 'CallExpression' &&
func.expression.callee.type === 'MemberExpression' &&
func.expression.callee.object.type === 'FunctionExpression'
) {
return func.expression.callee.object.body;
}
break;
}
case 2: {
const func = body[1];
if (
_optionalChain([func, 'optionalAccess', (_2) => _2.type]) ===
'ExpressionStatement' &&
func.expression.type === 'CallExpression' &&
func.expression.callee.type === 'FunctionExpression'
) {
const block = func.expression.callee.body;
block.body.splice(0, 1);
return block;
}
break;
}
}
throw 'unexpected structure';
})();
block.body = block.body.filter((node) => {
if (node.type === 'ExpressionStatement') {
if (node.expression.type === 'AssignmentExpression') {
return true;
}
return node.expression.type === 'Literal';
}
return true;
});
return block.body;
}
function getSolutions(statements) {
const found = { n: [], sig: [] };
for (const statement of statements) {
const result = extract(statement);
if (result) {
found.n.push(makeSolver(result, { type: 'Identifier', name: 'n' }));
found.sig.push(makeSolver(result, { type: 'Identifier', name: 'sig' }));
}
}
return found;
}
function makeSolver(result, ident) {
return {
type: 'ArrowFunctionExpression',
params: [ident],
body: {
type: 'MemberExpression',
object: {
type: 'CallExpression',
callee: result,
arguments: [
{
type: 'ObjectExpression',
properties: [
{
type: 'Property',
key: ident,
value: ident,
kind: 'init',
computed: false,
method: false,
shorthand: true,
},
],
},
],
optional: false,
},
computed: false,
property: ident,
optional: false,
},
async: false,
expression: true,
generator: false,
};
}
function getFromPrepared(code) {
const resultObj = { n: null, sig: null };
Function('_result', code)(resultObj);
return resultObj;
}
function multiTry(generators) {
return generateArrowFunction(
`\n(_input) => {\n const _results = new Set();\n const errors = [];\n for (const _generator of ${astring.generate({ type: 'ArrayExpression', elements: generators })}) {\n try {\n _results.add(_generator(_input));\n } catch (e) {\n errors.push(e);\n }\n }\n if (!_results.size) {\n throw \`no solutions: \${errors.join(", ")}\`;\n }\n if (_results.size !== 1) {\n throw \`invalid solutions: \${[..._results].map(x => JSON.stringify(x)).join(", ")}\`;\n }\n return _results.values().next().value;\n}\n`,
);
}
function main(input) {
const preprocessedPlayer =
input.type === 'player'
? preprocessPlayer(input.player)
: input.preprocessed_player;
const solvers = getFromPrepared(preprocessedPlayer);
const responses = input.requests.map((input) => {
if (!isOneOf(input.type, 'n', 'sig')) {
return { type: 'error', error: `Unknown request type: ${input.type}` };
}
const solver = solvers[input.type];
if (!solver) {
return {
type: 'error',
error: `Failed to extract ${input.type} function`,
};
}
try {
return {
type: 'result',
data: Object.fromEntries(
input.challenges.map((challenge) => [challenge, solver(challenge)]),
),
};
} catch (error) {
return {
type: 'error',
error:
error instanceof Error
? `${error.message}\n${error.stack}`
: `${error}`,
};
}
});
const output = { type: 'result', responses: responses };
if (input.type === 'player' && input.output_preprocessed) {
output.preprocessed_player = preprocessedPlayer;
}
return output;
}
return main;
})(meriyah, astring);

View File

@@ -0,0 +1,9 @@
/*!
* SPDX-License-Identifier: Unlicense
* This file was automatically generated by https://github.com/yt-dlp/ejs
*/
const lib = {
meriyah: await import('npm:meriyah@6.1.4'),
astring: await import('npm:astring@1.9.0'),
};
export { lib };

View File

@@ -0,0 +1,287 @@
from __future__ import annotations
import collections
import dataclasses
import typing
from yt_dlp.extractor.youtube.jsc._builtin.ejs import _EJS_WIKI_URL
from yt_dlp.extractor.youtube.jsc._registry import (
_jsc_preferences,
_jsc_providers,
)
from yt_dlp.extractor.youtube.jsc.provider import (
JsChallengeProvider,
JsChallengeProviderError,
JsChallengeProviderRejectedRequest,
JsChallengeProviderResponse,
JsChallengeRequest,
JsChallengeResponse,
JsChallengeType,
NChallengeInput,
NChallengeOutput,
SigChallengeInput,
SigChallengeOutput,
)
from yt_dlp.extractor.youtube.pot._director import YoutubeIEContentProviderLogger, provider_display_list
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProviderLogger,
)
from yt_dlp.extractor.youtube.pot.provider import (
provider_bug_report_message,
)
if typing.TYPE_CHECKING:
from collections.abc import Iterable
from yt_dlp.extractor.youtube.jsc._builtin.ejs import _SkippedComponent
from yt_dlp.extractor.youtube.jsc.provider import Preference as JsChallengePreference
class JsChallengeRequestDirector:
def __init__(self, logger: IEContentProviderLogger):
self.providers: dict[str, JsChallengeProvider] = {}
self.preferences: list[JsChallengePreference] = []
self.logger = logger
def register_provider(self, provider: JsChallengeProvider):
self.providers[provider.PROVIDER_KEY] = provider
def register_preference(self, preference: JsChallengePreference):
self.preferences.append(preference)
def _get_providers(self, requests: list[JsChallengeRequest]) -> Iterable[JsChallengeProvider]:
"""Sorts available providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, requests) for pref in self.preferences)
for provider in self.providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every JS Challenge provider upfront may have some overhead
self.logger.trace(f'JS Challenge Providers: {provider_display_list(self.providers.values())}')
self.logger.trace('JS Challenge Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.providers.values(), key=preferences.get, reverse=True)
if provider.is_available()
)
def _handle_error(self, e: Exception, provider: JsChallengeProvider, requests: list[JsChallengeRequest]):
if isinstance(e, JsChallengeProviderRejectedRequest):
self.logger.trace(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" rejected '
f'{"this request" if len(requests) == 1 else f"{len(requests)} requests"}, '
f'trying next available provider. Reason: {e}',
)
elif isinstance(e, JsChallengeProviderError):
if len(requests) == 1:
self.logger.warning(
f'Error solving {requests[0].type.value} challenge request using "{provider.PROVIDER_NAME}" provider: {e}.\n'
f' input = {requests[0].input}\n'
f' {(provider_bug_report_message(provider, before="") if not e.expected else "")}')
else:
self.logger.warning(
f'Error solving {len(requests)} challenge requests using "{provider.PROVIDER_NAME}" provider: {e}.\n'
f' requests = {requests}\n'
f' {(provider_bug_report_message(provider, before="") if not e.expected else "")}')
else:
self.logger.error(
f'Unexpected error solving {len(requests)} challenge request(s) using "{provider.PROVIDER_NAME}" provider: {e!r}\n'
f' requests = {requests}\n'
f' {provider_bug_report_message(provider, before="")}', cause=e)
def bulk_solve(self, requests: list[JsChallengeRequest]) -> list[tuple[JsChallengeRequest, JsChallengeResponse]]:
"""Solves multiple JS Challenges in bulk, returning a list of responses"""
if not self.providers:
self.logger.trace('No JS Challenge providers registered')
return []
results = []
next_requests = requests[:]
skipped_components = []
for provider in self._get_providers(next_requests):
if not next_requests:
break
self.logger.trace(
f'Attempting to solve {len(next_requests)} challenges using "{provider.PROVIDER_NAME}" provider')
try:
for response in provider.bulk_solve([dataclasses.replace(request) for request in next_requests]):
if not validate_provider_response(response):
self.logger.warning(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" returned an invalid response:'
f' response = {response!r}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
if response.error:
self._handle_error(response.error, provider, [response.request])
continue
if (vr_msg := validate_response(response.response, response.request)) is not True:
self.logger.warning(
f'Invalid JS Challenge response received from "{provider.PROVIDER_NAME}" provider: {vr_msg or ""}\n'
f' response = {response.response}\n'
f' request = {response.request}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
try:
next_requests.remove(response.request)
except ValueError:
self.logger.warning(
f'JS Challenge Provider "{provider.PROVIDER_NAME}" returned a response for an unknown request:\n'
f' request = {response.request}\n'
f' {provider_bug_report_message(provider, before="")}')
continue
results.append((response.request, response.response))
except Exception as e:
if isinstance(e, JsChallengeProviderRejectedRequest) and e._skipped_components:
skipped_components.extend(e._skipped_components)
self._handle_error(e, provider, next_requests)
continue
if skipped_components:
self.__report_skipped_components(skipped_components)
if len(results) != len(requests):
self.logger.trace(
f'Not all JS Challenges were solved, expected {len(requests)} responses, got {len(results)}')
self.logger.trace(f'Unsolved requests: {next_requests}')
else:
self.logger.trace(f'Solved all {len(requests)} requested JS Challenges')
return results
def __report_skipped_components(self, components: list[_SkippedComponent], /):
runtime_components = collections.defaultdict(list)
for component in components:
runtime_components[component.component].append(component.runtime)
for runtimes in runtime_components.values():
runtimes.sort()
description_lookup = {
'ejs:npm': 'NPM package',
'ejs:github': 'challenge solver script',
}
descriptions = [
f'{description_lookup.get(component, component)} ({", ".join(runtimes)})'
for component, runtimes in runtime_components.items()
if runtimes
]
flags = [
f' --remote-components {f"{component} (recommended)" if component == "ejs:github" else f"{component} "}'
for component, runtimes in runtime_components.items()
if runtimes
]
def join_parts(parts, joiner):
if not parts:
return ''
if len(parts) == 1:
return parts[0]
return f'{", ".join(parts[:-1])} {joiner} {parts[-1]}'
if len(descriptions) == 1:
msg = (
f'Remote component {descriptions[0]} was skipped. '
f'It may be required to solve JS challenges. '
f'You can enable the download with {flags[0]}')
else:
msg = (
f'Remote components {join_parts(descriptions, "and")} were skipped. '
f'These may be required to solve JS challenges. '
f'You can enable these downloads with {join_parts(flags, "or")}, respectively')
self.logger.warning(f'{msg}. For more information and alternatives, refer to {_EJS_WIKI_URL}')
def close(self):
for provider in self.providers.values():
provider.close()
EXTRACTOR_ARG_PREFIX = 'youtubejsc'
def initialize_jsc_director(ie):
assert ie._downloader is not None, 'Downloader not set'
enable_trace = ie._configuration_arg(
'jsc_trace', ['false'], ie_key='youtube', casesense=False)[0] == 'true'
if enable_trace:
log_level = IEContentProviderLogger.LogLevel.TRACE
elif ie.get_param('verbose', False):
log_level = IEContentProviderLogger.LogLevel.DEBUG
else:
log_level = IEContentProviderLogger.LogLevel.INFO
def get_provider_logger_and_settings(provider, logger_key):
logger_prefix = f'{logger_key}:{provider.PROVIDER_NAME}'
extractor_key = f'{EXTRACTOR_ARG_PREFIX}-{provider.PROVIDER_KEY.lower()}'
return (
YoutubeIEContentProviderLogger(ie, logger_prefix, log_level=log_level),
ie.get_param('extractor_args', {}).get(extractor_key, {}))
director = JsChallengeRequestDirector(
logger=YoutubeIEContentProviderLogger(ie, 'jsc', log_level=log_level),
)
ie._downloader.add_close_hook(director.close)
for provider in _jsc_providers.value.values():
logger, settings = get_provider_logger_and_settings(provider, 'jsc')
director.register_provider(provider(ie, logger, settings))
for preference in _jsc_preferences.value:
director.register_preference(preference)
if director.logger.log_level <= director.logger.LogLevel.DEBUG:
# calling is_available() for every JS Challenge provider upfront may have some overhead
director.logger.debug(f'JS Challenge Providers: {provider_display_list(director.providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} JS Challenge provider preferences')
return director
def validate_provider_response(response: JsChallengeProviderResponse) -> bool:
return (
isinstance(response, JsChallengeProviderResponse)
and isinstance(response.request, JsChallengeRequest)
and (
isinstance(response.response, JsChallengeResponse)
or (response.error is not None and isinstance(response.error, Exception)))
)
def validate_response(response: JsChallengeResponse, request: JsChallengeRequest) -> bool | str:
if not isinstance(response, JsChallengeResponse):
return 'Response is not a JsChallengeResponse'
if request.type == JsChallengeType.N:
return validate_nsig_challenge_output(response.output, request.input)
else:
return validate_sig_challenge_output(response.output, request.input)
def validate_nsig_challenge_output(challenge_output: NChallengeOutput, challenge_input: NChallengeInput) -> bool | str:
if not (
isinstance(challenge_output, NChallengeOutput)
and len(challenge_output.results) == len(challenge_input.challenges)
and all(isinstance(k, str) and isinstance(v, str) for k, v in challenge_output.results.items())
and all(challenge in challenge_output.results for challenge in challenge_input.challenges)
):
return 'Invalid NChallengeOutput'
# Validate n results are valid - if they end with the input challenge then the js function returned with an exception.
for challenge, result in challenge_output.results.items():
if result.endswith(challenge):
return f'n result is invalid for {challenge!r}: {result!r}'
return True
def validate_sig_challenge_output(challenge_output: SigChallengeOutput, challenge_input: SigChallengeInput) -> bool:
return (
isinstance(challenge_output, SigChallengeOutput)
and len(challenge_output.results) == len(challenge_input.challenges)
and all(isinstance(k, str) and isinstance(v, str) for k, v in challenge_output.results.items())
and all(challenge in challenge_output.results for challenge in challenge_input.challenges)
) or 'Invalid SigChallengeOutput'

View File

@@ -0,0 +1,4 @@
from yt_dlp.globals import Indirect
_jsc_providers = Indirect({})
_jsc_preferences = Indirect(set())

View File

@@ -0,0 +1,161 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import dataclasses
import enum
import typing
from yt_dlp.extractor.youtube.jsc._registry import _jsc_preferences, _jsc_providers
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.utils import ExtractorError
__all__ = [
'JsChallengeProvider',
'JsChallengeProviderError',
'JsChallengeProviderRejectedRequest',
'JsChallengeProviderResponse',
'JsChallengeRequest',
'JsChallengeResponse',
'JsChallengeType',
'NChallengeInput',
'NChallengeOutput',
'SigChallengeInput',
'SigChallengeOutput',
'register_preference',
'register_provider',
]
class JsChallengeType(enum.Enum):
N = 'n'
SIG = 'sig'
@dataclasses.dataclass(frozen=True)
class JsChallengeRequest:
type: JsChallengeType
input: NChallengeInput | SigChallengeInput
video_id: str | None = None
@dataclasses.dataclass(frozen=True)
class NChallengeInput:
player_url: str
challenges: list[str] = dataclasses.field(default_factory=list)
@dataclasses.dataclass(frozen=True)
class SigChallengeInput:
player_url: str
challenges: list[str] = dataclasses.field(default_factory=list)
@dataclasses.dataclass(frozen=True)
class NChallengeOutput:
results: dict[str, str] = dataclasses.field(default_factory=dict)
@dataclasses.dataclass(frozen=True)
class SigChallengeOutput:
results: dict[str, str] = dataclasses.field(default_factory=dict)
@dataclasses.dataclass
class JsChallengeProviderResponse:
request: JsChallengeRequest
response: JsChallengeResponse | None = None
error: Exception | None = None
@dataclasses.dataclass
class JsChallengeResponse:
type: JsChallengeType
output: NChallengeOutput | SigChallengeOutput
class JsChallengeProviderRejectedRequest(IEContentProviderError):
"""Reject the JsChallengeRequest (cannot handle the request)"""
def __init__(self, msg=None, expected: bool = False, *, _skipped_components=None):
super().__init__(msg, expected)
self._skipped_components = _skipped_components
class JsChallengeProviderError(IEContentProviderError):
"""An error occurred while solving the challenge"""
class JsChallengeProvider(IEContentProvider, abc.ABC, suffix='JCP'):
# Set to None to disable the check
_SUPPORTED_TYPES: tuple[JsChallengeType] | None = ()
def __validate_request(self, request: JsChallengeRequest):
if not self.is_available():
raise JsChallengeProviderRejectedRequest(f'{self.PROVIDER_NAME} is not available')
# Validate request using built-in settings
if (
self._SUPPORTED_TYPES is not None
and request.type not in self._SUPPORTED_TYPES
):
raise JsChallengeProviderRejectedRequest(
f'JS Challenge type "{request.type}" is not supported by {self.PROVIDER_NAME}')
def bulk_solve(self, requests: list[JsChallengeRequest]) -> typing.Generator[JsChallengeProviderResponse, None, None]:
"""Solve multiple JS challenges and return the results"""
validated_requests = []
for request in requests:
try:
self.__validate_request(request)
validated_requests.append(request)
except JsChallengeProviderRejectedRequest as e:
yield JsChallengeProviderResponse(request=request, error=e)
continue
yield from self._real_bulk_solve(validated_requests)
@abc.abstractmethod
def _real_bulk_solve(self, requests: list[JsChallengeRequest]) -> typing.Generator[JsChallengeProviderResponse, None, None]:
"""Subclasses can override this method to handle bulk solving"""
raise NotImplementedError(f'{self.PROVIDER_NAME} does not implement bulk solving')
def _get_player(self, video_id, player_url):
try:
return self.ie._load_player(
video_id=video_id,
player_url=player_url,
fatal=True,
)
except ExtractorError as e:
raise JsChallengeProviderError(
f'Failed to load player for JS challenge: {e}') from e
def register_provider(provider: type[JsChallengeProvider]):
"""Register a JsChallengeProvider class"""
return register_provider_generic(
provider=provider,
base_class=JsChallengeProvider,
registry=_jsc_providers.value,
)
def register_preference(*providers: type[JsChallengeProvider]) -> typing.Callable[[Preference], Preference]:
"""Register a preference for a JsChallengeProvider class."""
return register_preference_generic(
JsChallengeProvider,
_jsc_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
Preference = typing.Callable[[JsChallengeProvider, list[JsChallengeRequest]], int]
__all__.append('Preference')

View File

@@ -0,0 +1,3 @@
# Trigger import of built-in providers
from ._builtin.memory_cache import MemoryLRUPCP as _MemoryLRUPCP # noqa: F401
from ._builtin.webpo_cachespec import WebPoPCSP as _WebPoPCSP # noqa: F401

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import datetime as dt
import typing
from threading import Lock
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot._registry import _pot_memory_cache
from yt_dlp.extractor.youtube.pot.cache import (
PoTokenCacheProvider,
register_preference,
register_provider,
)
def initialize_global_cache(max_size: int):
if _pot_memory_cache.value.get('cache') is None:
_pot_memory_cache.value['cache'] = {}
_pot_memory_cache.value['lock'] = Lock()
_pot_memory_cache.value['max_size'] = max_size
if _pot_memory_cache.value['max_size'] != max_size:
raise ValueError('Cannot change max_size of initialized global memory cache')
return (
_pot_memory_cache.value['cache'],
_pot_memory_cache.value['lock'],
_pot_memory_cache.value['max_size'],
)
@register_provider
class MemoryLRUPCP(PoTokenCacheProvider, BuiltinIEContentProvider):
PROVIDER_NAME = 'memory'
DEFAULT_CACHE_SIZE = 25
def __init__(
self,
*args,
initialize_cache: typing.Callable[[int], tuple[dict[str, tuple[str, int]], Lock, int]] = initialize_global_cache,
**kwargs,
):
super().__init__(*args, **kwargs)
self.cache, self.lock, self.max_size = initialize_cache(self.DEFAULT_CACHE_SIZE)
def is_available(self) -> bool:
return True
def get(self, key: str) -> str | None:
with self.lock:
if key not in self.cache:
return None
value, expires_at = self.cache.pop(key)
if expires_at < int(dt.datetime.now(dt.timezone.utc).timestamp()):
return None
self.cache[key] = (value, expires_at)
return value
def store(self, key: str, value: str, expires_at: int):
with self.lock:
if expires_at < int(dt.datetime.now(dt.timezone.utc).timestamp()):
return
if key in self.cache:
self.cache.pop(key)
self.cache[key] = (value, expires_at)
if len(self.cache) > self.max_size:
oldest_key = next(iter(self.cache))
self.cache.pop(oldest_key)
def delete(self, key: str):
with self.lock:
self.cache.pop(key, None)
@register_preference(MemoryLRUPCP)
def memorylru_preference(*_, **__):
# Memory LRU Cache SHOULD be the highest priority
return 10000

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from yt_dlp.extractor.youtube.pot._provider import BuiltinIEContentProvider
from yt_dlp.extractor.youtube.pot.cache import (
CacheProviderWritePolicy,
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
register_spec,
)
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenRequest,
)
from yt_dlp.extractor.youtube.pot.utils import ContentBindingType, get_webpo_content_binding
from yt_dlp.utils import traverse_obj
@register_spec
class WebPoPCSP(PoTokenCacheSpecProvider, BuiltinIEContentProvider):
PROVIDER_NAME = 'webpo'
def generate_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
bind_to_visitor_id = self._configuration_arg(
'bind_to_visitor_id', default=['true'])[0] == 'true'
content_binding, content_binding_type = get_webpo_content_binding(
request, bind_to_visitor_id=bind_to_visitor_id)
if not content_binding or not content_binding_type:
return None
write_policy = CacheProviderWritePolicy.WRITE_ALL
if content_binding_type == ContentBindingType.VIDEO_ID:
write_policy = CacheProviderWritePolicy.WRITE_FIRST
return PoTokenCacheSpec(
key_bindings={
't': 'webpo',
'cb': content_binding,
'cbt': content_binding_type.value,
'ip': traverse_obj(request.innertube_context, ('client', 'remoteHost')),
'sa': request.request_source_address,
'px': request.request_proxy,
},
# Integrity token response usually states it has a ttl of 12 hours (43200 seconds).
# We will default to 6 hours to be safe.
default_ttl=21600,
write_policy=write_policy,
)

View File

@@ -0,0 +1,471 @@
from __future__ import annotations
import base64
import binascii
import dataclasses
import datetime as dt
import hashlib
import json
import traceback
import typing
import urllib.parse
from collections.abc import Iterable
from yt_dlp.extractor.youtube.pot._provider import (
BuiltinIEContentProvider,
IEContentProvider,
IEContentProviderLogger,
)
from yt_dlp.extractor.youtube.pot._registry import (
_pot_cache_provider_preferences,
_pot_cache_providers,
_pot_pcs_providers,
_pot_providers,
_ptp_preferences,
)
from yt_dlp.extractor.youtube.pot.cache import (
CacheProviderWritePolicy,
PoTokenCacheProvider,
PoTokenCacheProviderError,
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
)
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenProvider,
PoTokenProviderError,
PoTokenProviderRejectedRequest,
PoTokenRequest,
PoTokenResponse,
provider_bug_report_message,
)
from yt_dlp.utils import bug_reports_message, format_field, join_nonempty
if typing.TYPE_CHECKING:
from yt_dlp.extractor.youtube.pot.cache import CacheProviderPreference
from yt_dlp.extractor.youtube.pot.provider import Preference
class YoutubeIEContentProviderLogger(IEContentProviderLogger):
def __init__(self, ie, prefix, log_level: IEContentProviderLogger.LogLevel | None = None):
self.__ie = ie
self.prefix = prefix
self.log_level = log_level if log_level is not None else self.LogLevel.INFO
def _format_msg(self, message: str):
prefixstr = format_field(self.prefix, None, '[%s] ')
return f'{prefixstr}{message}'
def trace(self, message: str):
if self.log_level <= self.LogLevel.TRACE:
self.__ie.write_debug(self._format_msg('TRACE: ' + message))
def debug(self, message: str, *, once=False):
if self.log_level <= self.LogLevel.DEBUG:
self.__ie.write_debug(self._format_msg(message), only_once=once)
def info(self, message: str):
if self.log_level <= self.LogLevel.INFO:
self.__ie.to_screen(self._format_msg(message))
def warning(self, message: str, *, once=False):
if self.log_level <= self.LogLevel.WARNING:
self.__ie.report_warning(self._format_msg(message), only_once=once)
def error(self, message: str, cause=None):
if self.log_level <= self.LogLevel.ERROR:
self.__ie._downloader.report_error(
self._format_msg(message), is_error=False,
tb=''.join(traceback.format_exception(None, cause, cause.__traceback__)) if cause else None)
class PoTokenCache:
def __init__(
self,
logger: IEContentProviderLogger,
cache_providers: list[PoTokenCacheProvider],
cache_spec_providers: list[PoTokenCacheSpecProvider],
cache_provider_preferences: list[CacheProviderPreference] | None = None,
):
self.cache_providers: dict[str, PoTokenCacheProvider] = {
provider.PROVIDER_KEY: provider for provider in (cache_providers or [])}
self.cache_provider_preferences: list[CacheProviderPreference] = cache_provider_preferences or []
self.cache_spec_providers: dict[str, PoTokenCacheSpecProvider] = {
provider.PROVIDER_KEY: provider for provider in (cache_spec_providers or [])}
self.logger = logger
def _get_cache_providers(self, request: PoTokenRequest) -> Iterable[PoTokenCacheProvider]:
"""Sorts available cache providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.cache_provider_preferences)
for provider in self.cache_providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Cache Providers: {provider_display_list(self.cache_providers.values())}')
self.logger.trace('Cache Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_KEY}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.cache_providers.values(), key=preferences.get, reverse=True) if provider.is_available())
def _get_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
for provider in self.cache_spec_providers.values():
if not provider.is_available():
continue
try:
spec = provider.generate_cache_spec(request)
if not spec:
continue
if not validate_cache_spec(spec):
self.logger.error(
f'PoTokenCacheSpecProvider "{provider.PROVIDER_KEY}" generate_cache_spec() '
f'returned invalid spec {spec}{provider_bug_report_message(provider)}')
continue
spec = dataclasses.replace(spec, _provider=provider)
self.logger.trace(
f'Retrieved cache spec {spec} from cache spec provider "{provider.PROVIDER_NAME}"')
return spec
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache spec provider: '
f'{e!r}{provider_bug_report_message(provider)}')
continue
return None
def _generate_key_bindings(self, spec: PoTokenCacheSpec) -> dict[str, str]:
bindings_cleaned = {
**{k: v for k, v in spec.key_bindings.items() if v is not None},
# Allow us to invalidate caches if such need arises
'_dlp_cache': 'v1',
}
if spec._provider:
bindings_cleaned['_p'] = spec._provider.PROVIDER_KEY
self.logger.trace(f'Generated cache key bindings: {bindings_cleaned}')
return bindings_cleaned
def _generate_key(self, bindings: dict) -> str:
binding_string = ''.join(repr(dict(sorted(bindings.items()))))
return hashlib.sha256(binding_string.encode()).hexdigest()
def get(self, request: PoTokenRequest) -> PoTokenResponse | None:
spec = self._get_cache_spec(request)
if not spec:
self.logger.trace('No cache spec available for this request, unable to fetch from cache')
return None
cache_key = self._generate_key(self._generate_key_bindings(spec))
self.logger.trace(f'Attempting to access PO Token cache using key: {cache_key}')
for idx, provider in enumerate(self._get_cache_providers(request)):
try:
self.logger.trace(
f'Attempting to fetch PO Token response from "{provider.PROVIDER_NAME}" cache provider')
cache_response = provider.get(cache_key)
if not cache_response:
continue
try:
po_token_response = PoTokenResponse(**json.loads(cache_response))
except (TypeError, ValueError, json.JSONDecodeError):
po_token_response = None
if not validate_response(po_token_response):
self.logger.error(
f'Invalid PO Token response retrieved from cache provider "{provider.PROVIDER_NAME}": '
f'{cache_response}{provider_bug_report_message(provider)}')
provider.delete(cache_key)
continue
self.logger.trace(
f'PO Token response retrieved from cache using "{provider.PROVIDER_NAME}" provider: '
f'{po_token_response}')
if idx > 0:
# Write back to the highest priority cache provider,
# so we stop trying to fetch from lower priority providers
self.logger.trace('Writing PO Token response to highest priority cache provider')
self.store(request, po_token_response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
return po_token_response
except PoTokenCacheProviderError as e:
self.logger.warning(
f'Error from "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
continue
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider)}',
)
continue
return None
def store(
self,
request: PoTokenRequest,
response: PoTokenResponse,
write_policy: CacheProviderWritePolicy | None = None,
):
spec = self._get_cache_spec(request)
if not spec:
self.logger.trace('No cache spec available for this request. Not caching.')
return
if not validate_response(response):
self.logger.error(
f'Invalid PO Token response provided to PoTokenCache.store(): '
f'{response}{bug_reports_message()}')
return
cache_key = self._generate_key(self._generate_key_bindings(spec))
self.logger.trace(f'Attempting to access PO Token cache using key: {cache_key}')
default_expires_at = int(dt.datetime.now(dt.timezone.utc).timestamp()) + spec.default_ttl
cache_response = dataclasses.replace(response, expires_at=response.expires_at or default_expires_at)
write_policy = write_policy or spec.write_policy
self.logger.trace(f'Using write policy: {write_policy}')
for idx, provider in enumerate(self._get_cache_providers(request)):
try:
self.logger.trace(
f'Caching PO Token response in "{provider.PROVIDER_NAME}" cache provider '
f'(key={cache_key}, expires_at={cache_response.expires_at})')
provider.store(
key=cache_key,
value=json.dumps(dataclasses.asdict(cache_response)),
expires_at=cache_response.expires_at)
except PoTokenCacheProviderError as e:
self.logger.warning(
f'Error from "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache provider: '
f'{e!r}{provider_bug_report_message(provider)}')
# WRITE_FIRST should not write to lower priority providers in the case the highest priority provider fails
if idx == 0 and write_policy == CacheProviderWritePolicy.WRITE_FIRST:
return
def close(self):
for provider in self.cache_providers.values():
provider.close()
for spec_provider in self.cache_spec_providers.values():
spec_provider.close()
class PoTokenRequestDirector:
def __init__(self, logger: IEContentProviderLogger, cache: PoTokenCache):
self.providers: dict[str, PoTokenProvider] = {}
self.preferences: list[Preference] = []
self.cache = cache
self.logger = logger
def register_provider(self, provider: PoTokenProvider):
self.providers[provider.PROVIDER_KEY] = provider
def register_preference(self, preference: Preference):
self.preferences.append(preference)
def _get_providers(self, request: PoTokenRequest) -> Iterable[PoTokenProvider]:
"""Sorts available providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.preferences)
for provider in self.providers.values()
}
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Providers: {provider_display_list(self.providers.values())}')
self.logger.trace('Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
return (
provider for provider in sorted(
self.providers.values(), key=preferences.get, reverse=True)
if provider.is_available()
)
def _get_po_token(self, request) -> PoTokenResponse | None:
for provider in self._get_providers(request):
try:
self.logger.trace(
f'Attempting to fetch a PO Token from "{provider.PROVIDER_NAME}" provider')
response = provider.request_pot(request.copy())
except PoTokenProviderRejectedRequest as e:
self.logger.trace(
f'PO Token Provider "{provider.PROVIDER_NAME}" rejected this request, '
f'trying next available provider. Reason: {e}')
continue
except PoTokenProviderError as e:
self.logger.warning(
f'Error fetching PO Token from "{provider.PROVIDER_NAME}" provider: '
f'{e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
continue
except Exception as e:
self.logger.error(
f'Unexpected error when fetching PO Token from "{provider.PROVIDER_NAME}" provider: '
f'{e!r}{provider_bug_report_message(provider)}')
continue
self.logger.trace(f'PO Token response from "{provider.PROVIDER_NAME}" provider: {response}')
if not validate_response(response):
self.logger.error(
f'Invalid PO Token response received from "{provider.PROVIDER_NAME}" provider: '
f'{response}{provider_bug_report_message(provider)}')
continue
return response
self.logger.trace('No PO Token providers were able to provide a valid PO Token')
return None
def get_po_token(self, request: PoTokenRequest) -> str | None:
if not request.bypass_cache:
if pot_response := self.cache.get(request):
return clean_pot(pot_response.po_token)
if not self.providers:
self.logger.trace('No PO Token providers registered')
return None
pot_response = self._get_po_token(request)
if not pot_response:
return None
pot_response.po_token = clean_pot(pot_response.po_token)
if pot_response.expires_at is None or pot_response.expires_at > 0:
self.cache.store(request, pot_response)
else:
self.logger.trace(
f'PO Token response will not be cached (expires_at={pot_response.expires_at})')
return pot_response.po_token
def close(self):
for provider in self.providers.values():
provider.close()
self.cache.close()
EXTRACTOR_ARG_PREFIX = 'youtubepot'
def initialize_pot_director(ie):
assert ie._downloader is not None, 'Downloader not set'
enable_trace = ie._configuration_arg(
'pot_trace', ['false'], ie_key='youtube', casesense=False)[0] == 'true'
if enable_trace:
log_level = IEContentProviderLogger.LogLevel.TRACE
elif ie.get_param('verbose', False):
log_level = IEContentProviderLogger.LogLevel.DEBUG
else:
log_level = IEContentProviderLogger.LogLevel.INFO
def get_provider_logger_and_settings(provider, logger_key):
logger_prefix = f'{logger_key}:{provider.PROVIDER_NAME}'
extractor_key = f'{EXTRACTOR_ARG_PREFIX}-{provider.PROVIDER_KEY.lower()}'
return (
YoutubeIEContentProviderLogger(ie, logger_prefix, log_level=log_level),
ie.get_param('extractor_args', {}).get(extractor_key, {}))
cache_providers = []
for cache_provider in _pot_cache_providers.value.values():
logger, settings = get_provider_logger_and_settings(cache_provider, 'pot:cache')
cache_providers.append(cache_provider(ie, logger, settings))
cache_spec_providers = []
for cache_spec_provider in _pot_pcs_providers.value.values():
logger, settings = get_provider_logger_and_settings(cache_spec_provider, 'pot:cache:spec')
cache_spec_providers.append(cache_spec_provider(ie, logger, settings))
cache = PoTokenCache(
logger=YoutubeIEContentProviderLogger(ie, 'pot:cache', log_level=log_level),
cache_providers=cache_providers,
cache_spec_providers=cache_spec_providers,
cache_provider_preferences=list(_pot_cache_provider_preferences.value),
)
director = PoTokenRequestDirector(
logger=YoutubeIEContentProviderLogger(ie, 'pot', log_level=log_level),
cache=cache,
)
ie._downloader.add_close_hook(director.close)
for provider in _pot_providers.value.values():
logger, settings = get_provider_logger_and_settings(provider, 'pot')
director.register_provider(provider(ie, logger, settings))
for preference in _ptp_preferences.value:
director.register_preference(preference)
if director.logger.log_level <= director.logger.LogLevel.DEBUG:
# calling is_available() for every PO Token provider upfront may have some overhead
director.logger.debug(f'PO Token Providers: {provider_display_list(director.providers.values())}')
director.logger.debug(f'PO Token Cache Providers: {provider_display_list(cache.cache_providers.values())}')
director.logger.debug(f'PO Token Cache Spec Providers: {provider_display_list(cache.cache_spec_providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} provider preferences')
director.logger.trace(f'Registered {len(cache.cache_provider_preferences)} cache provider preferences')
return director
def provider_display_list(providers: Iterable[IEContentProvider]):
def provider_display_name(provider):
display_str = join_nonempty(
provider.PROVIDER_NAME,
provider.PROVIDER_VERSION if not isinstance(provider, BuiltinIEContentProvider) else None)
statuses = []
if not isinstance(provider, BuiltinIEContentProvider):
statuses.append('external')
if not provider.is_available():
statuses.append('unavailable')
if statuses:
display_str += f' ({", ".join(statuses)})'
return display_str
return ', '.join(provider_display_name(provider) for provider in providers) or 'none'
def clean_pot(po_token: str):
# Clean and validate the PO Token. This will strip invalid characters off
# (e.g. additional url params the user may accidentally include)
try:
return base64.urlsafe_b64encode(
base64.urlsafe_b64decode(urllib.parse.unquote(po_token))).decode()
except (binascii.Error, ValueError):
raise ValueError('Invalid PO Token')
def validate_response(response: PoTokenResponse | None):
if (
not isinstance(response, PoTokenResponse)
or not isinstance(response.po_token, str)
or not response.po_token
): # noqa: SIM103
return False
try:
clean_pot(response.po_token)
except ValueError:
return False
if not isinstance(response.expires_at, int):
return response.expires_at is None
return response.expires_at <= 0 or response.expires_at > int(dt.datetime.now(dt.timezone.utc).timestamp())
def validate_cache_spec(spec: PoTokenCacheSpec):
return (
isinstance(spec, PoTokenCacheSpec)
and isinstance(spec.write_policy, CacheProviderWritePolicy)
and isinstance(spec.default_ttl, int)
and isinstance(spec.key_bindings, dict)
and all(isinstance(k, str) for k in spec.key_bindings)
and all(v is None or isinstance(v, str) for v in spec.key_bindings.values())
and bool([v for v in spec.key_bindings.values() if v is not None])
)

View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import abc
import enum
import functools
from yt_dlp.extractor.common import InfoExtractor
from yt_dlp.utils import NO_DEFAULT, bug_reports_message, classproperty, traverse_obj
from yt_dlp.version import __version__
# xxx: these could be generalized outside YoutubeIE eventually
class IEContentProviderLogger(abc.ABC):
class LogLevel(enum.IntEnum):
TRACE = 0
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
@classmethod
def _missing_(cls, value):
if isinstance(value, str):
value = value.upper()
if value in dir(cls):
return cls[value]
return cls.INFO
log_level = LogLevel.INFO
@abc.abstractmethod
def trace(self, message: str):
pass
@abc.abstractmethod
def debug(self, message: str, *, once=False):
pass
@abc.abstractmethod
def info(self, message: str):
pass
@abc.abstractmethod
def warning(self, message: str, *, once=False):
pass
@abc.abstractmethod
def error(self, message: str, cause=None):
pass
class IEContentProviderError(Exception):
def __init__(self, msg=None, expected=False):
super().__init__(msg)
self.expected = expected
class IEContentProvider(abc.ABC):
PROVIDER_VERSION: str = '0.0.0'
BUG_REPORT_LOCATION: str = '(developer has not provided a bug report location)'
def __init__(
self,
ie: InfoExtractor,
logger: IEContentProviderLogger,
settings: dict[str, list[str]], *_, **__,
):
self.ie = ie
self.settings = settings or {}
self.logger = logger
super().__init__()
@classmethod
def __init_subclass__(cls, *, suffix=None, **kwargs):
if suffix:
cls._PROVIDER_KEY_SUFFIX = suffix
return super().__init_subclass__(**kwargs)
@classproperty
def PROVIDER_NAME(cls) -> str:
return cls.__name__[:-len(cls._PROVIDER_KEY_SUFFIX)]
@classproperty
def BUG_REPORT_MESSAGE(cls):
return f'please report this issue to the provider developer at {cls.BUG_REPORT_LOCATION} .'
@classproperty
def PROVIDER_KEY(cls) -> str:
assert hasattr(cls, '_PROVIDER_KEY_SUFFIX'), 'Content Provider implementation must define a suffix for the provider key'
assert cls.__name__.endswith(cls._PROVIDER_KEY_SUFFIX), f'Class name must end with "{cls._PROVIDER_KEY_SUFFIX}"'
return cls.__name__[:-len(cls._PROVIDER_KEY_SUFFIX)]
@abc.abstractmethod
def is_available(self) -> bool:
"""
Check if the provider is available (e.g. all required dependencies are available)
This is used to determine if the provider should be used and to provide debug information.
IMPORTANT: This method should not make any network requests or perform any expensive operations.
It is called multiple times.
"""
raise NotImplementedError
def close(self): # noqa: B027
pass
def _configuration_arg(self, key, default=NO_DEFAULT, *, casesense=False):
"""
@returns A list of values for the setting given by "key"
or "default" if no such key is present
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
"""
return configuration_arg(self.settings, key, default=default, casesense=casesense)
class BuiltinIEContentProvider(IEContentProvider, abc.ABC):
PROVIDER_VERSION = __version__
BUG_REPORT_MESSAGE = bug_reports_message(before='')
def configuration_arg(config, key, default=NO_DEFAULT, *, casesense=False):
"""
@returns A list of values for the setting given by "key"
or "default" if no such key is present
@param config The configuration dictionary
@param default The default value to return when the key is not present (default: [])
@param casesense When false, the values are converted to lower case
"""
val = traverse_obj(config, key)
if val is None:
return [] if default is NO_DEFAULT else default
return list(val) if casesense else [x.lower() for x in val]
def register_provider_generic(
provider,
base_class,
registry,
):
"""Generic function to register a provider class"""
assert issubclass(provider, base_class), f'{provider} must be a subclass of {base_class.__name__}'
assert provider.PROVIDER_KEY not in registry, f'{base_class.__name__} {provider.PROVIDER_KEY} already registered'
registry[provider.PROVIDER_KEY] = provider
return provider
def register_preference_generic(
base_class,
registry,
*providers,
):
"""Generic function to register a preference for a provider"""
assert all(issubclass(provider, base_class) for provider in providers)
def outer(preference):
@functools.wraps(preference)
def inner(provider, *args, **kwargs):
if not providers or isinstance(provider, providers):
return preference(provider, *args, **kwargs)
return 0
registry.add(inner)
return preference
return outer

View File

@@ -0,0 +1,8 @@
from yt_dlp.globals import Indirect
_pot_providers = Indirect({})
_ptp_preferences = Indirect(set())
_pot_pcs_providers = Indirect({})
_pot_cache_providers = Indirect({})
_pot_cache_provider_preferences = Indirect(set())
_pot_memory_cache = Indirect({})

View File

@@ -0,0 +1,97 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import dataclasses
import enum
import typing
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.extractor.youtube.pot._registry import (
_pot_cache_provider_preferences,
_pot_cache_providers,
_pot_pcs_providers,
)
from yt_dlp.extractor.youtube.pot.provider import PoTokenRequest
class PoTokenCacheProviderError(IEContentProviderError):
"""An error occurred while fetching a PO Token"""
class PoTokenCacheProvider(IEContentProvider, abc.ABC, suffix='PCP'):
@abc.abstractmethod
def get(self, key: str) -> str | None:
pass
@abc.abstractmethod
def store(self, key: str, value: str, expires_at: int):
pass
@abc.abstractmethod
def delete(self, key: str):
pass
class CacheProviderWritePolicy(enum.Enum):
WRITE_ALL = enum.auto() # Write to all cache providers
WRITE_FIRST = enum.auto() # Write to only the first cache provider
@dataclasses.dataclass
class PoTokenCacheSpec:
key_bindings: dict[str, str | None]
default_ttl: int
write_policy: CacheProviderWritePolicy = CacheProviderWritePolicy.WRITE_ALL
# Internal
_provider: PoTokenCacheSpecProvider | None = None
class PoTokenCacheSpecProvider(IEContentProvider, abc.ABC, suffix='PCSP'):
def is_available(self) -> bool:
return True
@abc.abstractmethod
def generate_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
"""Generate a cache spec for the given request"""
pass
def register_provider(provider: type[PoTokenCacheProvider]):
"""Register a PoTokenCacheProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenCacheProvider,
registry=_pot_cache_providers.value,
)
def register_spec(provider: type[PoTokenCacheSpecProvider]):
"""Register a PoTokenCacheSpecProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenCacheSpecProvider,
registry=_pot_pcs_providers.value,
)
def register_preference(
*providers: type[PoTokenCacheProvider]) -> typing.Callable[[CacheProviderPreference], CacheProviderPreference]:
"""Register a preference for a PoTokenCacheProvider"""
return register_preference_generic(
PoTokenCacheProvider,
_pot_cache_provider_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
CacheProviderPreference = typing.Callable[[PoTokenCacheProvider, PoTokenRequest], int]

View File

@@ -0,0 +1,283 @@
"""PUBLIC API"""
from __future__ import annotations
import abc
import copy
import dataclasses
import enum
import functools
import typing
import urllib.parse
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.extractor.youtube.pot._provider import (
IEContentProvider,
IEContentProviderError,
register_preference_generic,
register_provider_generic,
)
from yt_dlp.extractor.youtube.pot._registry import _pot_providers, _ptp_preferences
from yt_dlp.networking import Request, Response
from yt_dlp.utils import traverse_obj
from yt_dlp.utils.networking import HTTPHeaderDict
__all__ = [
'ExternalRequestFeature',
'PoTokenContext',
'PoTokenProvider',
'PoTokenProviderError',
'PoTokenProviderRejectedRequest',
'PoTokenRequest',
'PoTokenResponse',
'provider_bug_report_message',
'register_preference',
'register_provider',
]
class PoTokenContext(enum.Enum):
GVS = 'gvs'
PLAYER = 'player'
SUBS = 'subs'
@dataclasses.dataclass
class PoTokenRequest:
# YouTube parameters
context: PoTokenContext
innertube_context: InnertubeContext
innertube_host: str | None = None
session_index: str | None = None
player_url: str | None = None
is_authenticated: bool = False
video_webpage: str | None = None
internal_client_name: str | None = None
# Content binding parameters
visitor_data: str | None = None
data_sync_id: str | None = None
video_id: str | None = None
# Internal, YouTube experiment on whether to bind GVS PO Token to video_id.
_gvs_bind_to_video_id: bool = False
# Networking parameters
request_cookiejar: YoutubeDLCookieJar = dataclasses.field(default_factory=YoutubeDLCookieJar)
request_proxy: str | None = None
request_headers: HTTPHeaderDict = dataclasses.field(default_factory=HTTPHeaderDict)
request_timeout: float | None = None
request_source_address: str | None = None
request_verify_tls: bool = True
# Generate a new token, do not used a cached token
# The token should still be cached for future requests
bypass_cache: bool = False
def copy(self):
return dataclasses.replace(
self,
request_headers=HTTPHeaderDict(self.request_headers),
innertube_context=copy.deepcopy(self.innertube_context),
)
@dataclasses.dataclass
class PoTokenResponse:
po_token: str
expires_at: int | None = None
class PoTokenProviderRejectedRequest(IEContentProviderError):
"""Reject the PoTokenRequest (cannot handle the request)"""
class PoTokenProviderError(IEContentProviderError):
"""An error occurred while fetching a PO Token"""
class ExternalRequestFeature(enum.Enum):
PROXY_SCHEME_HTTP = enum.auto()
PROXY_SCHEME_HTTPS = enum.auto()
PROXY_SCHEME_SOCKS4 = enum.auto()
PROXY_SCHEME_SOCKS4A = enum.auto()
PROXY_SCHEME_SOCKS5 = enum.auto()
PROXY_SCHEME_SOCKS5H = enum.auto()
SOURCE_ADDRESS = enum.auto()
DISABLE_TLS_VERIFICATION = enum.auto()
class PoTokenProvider(IEContentProvider, abc.ABC, suffix='PTP'):
# Set to None to disable the check
_SUPPORTED_CONTEXTS: tuple[PoTokenContext] | None = ()
# Innertube Client Name.
# For example, "WEB", "ANDROID", "TVHTML5".
# For a list of WebPO client names, see yt_dlp.extractor.youtube.pot.utils.WEBPO_CLIENTS.
# Also see yt_dlp.extractor.youtube._base.INNERTUBE_CLIENTS
# for a list of client names currently supported by the YouTube extractor.
_SUPPORTED_CLIENTS: tuple[str] | None = ()
# If making external requests to websites (i.e. to youtube.com)
# using another library or service (i.e., not _request_webpage),
# add the request features that are supported.
# If only using _request_webpage to make external requests, set this to None.
_SUPPORTED_EXTERNAL_REQUEST_FEATURES: tuple[ExternalRequestFeature] | None = ()
def __validate_request(self, request: PoTokenRequest):
if not self.is_available():
raise PoTokenProviderRejectedRequest(f'{self.PROVIDER_NAME} is not available')
# Validate request using built-in settings
if (
self._SUPPORTED_CONTEXTS is not None
and request.context not in self._SUPPORTED_CONTEXTS
):
raise PoTokenProviderRejectedRequest(
f'PO Token Context "{request.context}" is not supported by {self.PROVIDER_NAME}')
if self._SUPPORTED_CLIENTS is not None:
client_name = traverse_obj(
request.innertube_context, ('client', 'clientName'))
if client_name not in self._SUPPORTED_CLIENTS:
raise PoTokenProviderRejectedRequest(
f'Client "{client_name}" is not supported by {self.PROVIDER_NAME}. '
f'Supported clients: {", ".join(self._SUPPORTED_CLIENTS) or "none"}')
self.__validate_external_request_features(request)
@functools.cached_property
def _supported_proxy_schemes(self):
return {
scheme: feature
for scheme, feature in {
'http': ExternalRequestFeature.PROXY_SCHEME_HTTP,
'https': ExternalRequestFeature.PROXY_SCHEME_HTTPS,
'socks4': ExternalRequestFeature.PROXY_SCHEME_SOCKS4,
'socks4a': ExternalRequestFeature.PROXY_SCHEME_SOCKS4A,
'socks5': ExternalRequestFeature.PROXY_SCHEME_SOCKS5,
'socks5h': ExternalRequestFeature.PROXY_SCHEME_SOCKS5H,
}.items()
if feature in (self._SUPPORTED_EXTERNAL_REQUEST_FEATURES or [])
}
def __validate_external_request_features(self, request: PoTokenRequest):
if self._SUPPORTED_EXTERNAL_REQUEST_FEATURES is None:
return
if request.request_proxy:
scheme = urllib.parse.urlparse(request.request_proxy).scheme
if scheme.lower() not in self._supported_proxy_schemes:
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider do not '
f'support proxy scheme "{scheme}". Supported proxy schemes: '
f'{", ".join(self._supported_proxy_schemes) or "none"}')
if (
request.request_source_address
and ExternalRequestFeature.SOURCE_ADDRESS not in self._SUPPORTED_EXTERNAL_REQUEST_FEATURES
):
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider '
f'do not support setting source address')
if (
not request.request_verify_tls
and ExternalRequestFeature.DISABLE_TLS_VERIFICATION not in self._SUPPORTED_EXTERNAL_REQUEST_FEATURES
):
raise PoTokenProviderRejectedRequest(
f'External requests by "{self.PROVIDER_NAME}" provider '
f'do not support ignoring TLS certificate failures')
def request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
self.__validate_request(request)
return self._real_request_pot(request)
@abc.abstractmethod
def _real_request_pot(self, request: PoTokenRequest) -> PoTokenResponse:
"""To be implemented by subclasses"""
pass
# Helper functions
def _request_webpage(self, request: Request, pot_request: PoTokenRequest | None = None, note=None, **kwargs) -> Response:
"""Make a request using the internal HTTP Client.
Use this instead of calling requests, urllib3 or other HTTP client libraries directly!
YouTube cookies will be automatically applied if this request is made to YouTube.
@param request: The request to make
@param pot_request: The PoTokenRequest to use. Request parameters will be merged from it.
@param note: Custom log message to display when making the request. Set to `False` to disable logging.
Tips:
- Disable proxy (e.g. if calling local service): Request(..., proxies={'all': None})
- Set request timeout: Request(..., extensions={'timeout': 5.0})
"""
req = request.copy()
# Merge some ctx request settings into the request
# Most of these will already be used by the configured ydl instance,
# however, the YouTube extractor may override some.
if pot_request is not None:
req.headers = HTTPHeaderDict(pot_request.request_headers, req.headers)
req.proxies = req.proxies or ({'all': pot_request.request_proxy} if pot_request.request_proxy else {})
if pot_request.request_cookiejar is not None:
req.extensions['cookiejar'] = req.extensions.get('cookiejar', pot_request.request_cookiejar)
if note is not False:
self.logger.info(str(note) if note else 'Requesting webpage')
return self.ie._downloader.urlopen(req)
def register_provider(provider: type[PoTokenProvider]):
"""Register a PoTokenProvider class"""
return register_provider_generic(
provider=provider,
base_class=PoTokenProvider,
registry=_pot_providers.value,
)
def provider_bug_report_message(provider: IEContentProvider, before=';'):
msg = provider.BUG_REPORT_MESSAGE
before = before.rstrip()
if not before or before.endswith(('.', '!', '?')):
msg = msg[0].title() + msg[1:]
return f'{before} {msg}' if before else msg
def register_preference(*providers: type[PoTokenProvider]) -> typing.Callable[[Preference], Preference]:
"""Register a preference for a PoTokenProvider"""
return register_preference_generic(
PoTokenProvider,
_ptp_preferences.value,
*providers,
)
if typing.TYPE_CHECKING:
Preference = typing.Callable[[PoTokenProvider, PoTokenRequest], int]
__all__.append('Preference')
# Barebones innertube context. There may be more fields.
class ClientInfo(typing.TypedDict, total=False):
hl: str | None
gl: str | None
remoteHost: str | None
deviceMake: str | None
deviceModel: str | None
visitorData: str | None
userAgent: str | None
clientName: str
clientVersion: str
osName: str | None
osVersion: str | None
class InnertubeContext(typing.TypedDict, total=False):
client: ClientInfo
request: dict
user: dict

View File

@@ -0,0 +1,77 @@
"""PUBLIC API"""
from __future__ import annotations
import base64
import contextlib
import enum
import re
import urllib.parse
from yt_dlp.extractor.youtube.pot.provider import PoTokenContext, PoTokenRequest
from yt_dlp.utils import traverse_obj
__all__ = ['WEBPO_CLIENTS', 'ContentBindingType', 'get_webpo_content_binding']
WEBPO_CLIENTS = (
'WEB',
'MWEB',
'TVHTML5',
'WEB_EMBEDDED_PLAYER',
'WEB_CREATOR',
'WEB_REMIX',
'TVHTML5_SIMPLY',
'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
)
class ContentBindingType(enum.Enum):
VISITOR_DATA = 'visitor_data'
DATASYNC_ID = 'datasync_id'
VIDEO_ID = 'video_id'
VISITOR_ID = 'visitor_id'
def get_webpo_content_binding(
request: PoTokenRequest,
webpo_clients=WEBPO_CLIENTS,
bind_to_visitor_id=False,
) -> tuple[str | None, ContentBindingType | None]:
client_name = traverse_obj(request.innertube_context, ('client', 'clientName'))
if not client_name or client_name not in webpo_clients:
return None, None
if request.context == PoTokenContext.GVS and request._gvs_bind_to_video_id:
return request.video_id, ContentBindingType.VIDEO_ID
if request.context == PoTokenContext.GVS or client_name in ('WEB_REMIX', ):
if request.is_authenticated:
return request.data_sync_id, ContentBindingType.DATASYNC_ID
else:
if bind_to_visitor_id:
visitor_id = _extract_visitor_id(request.visitor_data)
if visitor_id:
return visitor_id, ContentBindingType.VISITOR_ID
return request.visitor_data, ContentBindingType.VISITOR_DATA
elif request.context in (PoTokenContext.PLAYER, PoTokenContext.SUBS):
return request.video_id, ContentBindingType.VIDEO_ID
return None, None
def _extract_visitor_id(visitor_data):
if not visitor_data:
return None
# Attempt to extract the visitor ID from the visitor_data protobuf
# xxx: ideally should use a protobuf parser
with contextlib.suppress(Exception):
visitor_id = base64.urlsafe_b64decode(
urllib.parse.unquote_plus(visitor_data))[2:13].decode()
# check that visitor id is all letters and numbers
if re.fullmatch(r'[A-Za-z0-9_-]{11}', visitor_id):
return visitor_id
return None