[YouTube] Re-work client selection

* use `android_sdkless` by default
* use `web_safari` (HLS only) if logged in
* skip any non-HLS format with n-challenge
This commit is contained in:
dirkf
2025-11-04 20:58:12 +00:00
parent a1e2c7d90b
commit 5d445f8c5f

View File

@@ -483,6 +483,12 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
('responseContext', 'visitorData')), ('responseContext', 'visitorData')),
T(compat_str))) T(compat_str)))
# @functools.cached_property
def is_authenticated(self, _cache={}):
if self not in _cache:
_cache[self] = bool(self._generate_sapisidhash_header())
return _cache[self]
def _extract_ytcfg(self, video_id, webpage): def _extract_ytcfg(self, video_id, webpage):
ytcfg = self._search_json( ytcfg = self._search_json(
r'ytcfg\.set\s*\(', webpage, 'ytcfg', video_id, r'ytcfg\.set\s*\(', webpage, 'ytcfg', video_id,
@@ -2101,8 +2107,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return self._cached(self._decrypt_nsig, 'nsig', n, player_url) return self._cached(self._decrypt_nsig, 'nsig', n, player_url)
for fmt in formats: for fmt in formats:
parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url']) n_param = parse_qs(fmt['url']).get('n')
n_param = compat_parse_qs(parsed_fmt_url.query).get('n')
if not n_param: if not n_param:
continue continue
n_param = n_param[-1] n_param = n_param[-1]
@@ -2268,6 +2273,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
(r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE), (r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
regex), webpage, name, default='{}'), video_id, fatal=False) regex), webpage, name, default='{}'), video_id, fatal=False)
def _is_premium_subscriber(self, initial_data):
if not self.is_authenticated or not initial_data:
return False
tlr = traverse_obj(
initial_data, ('topbar', 'desktopTopbarRenderer', 'logo', 'topbarLogoRenderer'))
return (
traverse_obj(tlr, ('iconImage', 'iconType')) == 'YOUTUBE_PREMIUM_LOGO'
or 'premium' in (self._get_text(tlr, 'tooltipText') or '').lower()
)
def _real_extract(self, url): def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {}) url, smuggled_data = unsmuggle_url(url, {})
video_id = self._match_id(url) video_id = self._match_id(url)
@@ -2303,24 +2319,30 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if sts: if sts:
pb_context['signatureTimestamp'] = sts pb_context['signatureTimestamp'] = sts
client_names = traverse_obj(self._INNERTUBE_CLIENTS, ( auth = self._generate_sapisidhash_header(origin)
T(dict.items), lambda _, k_v: not k_v[1].get('REQUIRE_PO_TOKEN'),
0))[:1] client_names = []
if auth or self._is_premium_subscriber(player_response):
client_names = traverse_obj(self._INNERTUBE_CLIENTS, (
T(dict_items), lambda _, k_v: k_v[0] == 'web_safari', 0))[:1]
if not client_names:
client_names = traverse_obj(self._INNERTUBE_CLIENTS, (
T(dict_items), lambda _, k_v: not (
k_v[1].get('REQUIRE_PO_TOKEN')
or (bool(k_v[1].get('WITH_COOKIES', auth)) ^ bool(auth))
), 0))[:1]
if 'web' not in client_names: if 'web' not in client_names:
# webpage links won't download: ignore links and playability # only live HLS webpage links will download: ignore playability
player_response = filter_dict( player_response = filter_dict(
player_response or {}, player_response or {},
lambda k, _: k not in ('streamingData', 'playabilityStatus')) lambda k, _: k != 'playabilityStatus')
if is_live and 'ios' not in client_names:
client_names.append('ios')
headers = { headers = {
'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Mode': 'navigate',
'Origin': origin, 'Origin': origin,
'X-Goog-Visitor-Id': self._extract_visitor_data(ytcfg) or '', 'X-Goog-Visitor-Id': self._extract_visitor_data(ytcfg) or '',
} }
auth = self._generate_sapisidhash_header(origin)
if auth is not None: if auth is not None:
headers['Authorization'] = auth headers['Authorization'] = auth
headers['X-Origin'] = origin headers['X-Origin'] = origin
@@ -2350,7 +2372,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'INNERTUBE_CONTEXT', 'client', 'clientVersion'), 'INNERTUBE_CONTEXT', 'client', 'clientVersion'),
'User-Agent': ( 'User-Agent': (
'INNERTUBE_CONTEXT', 'client', 'userAgent'), 'INNERTUBE_CONTEXT', 'client', 'userAgent'),
})) }) or {})
api_player_response = self._call_api( api_player_response = self._call_api(
'player', query, video_id, fatal=False, headers=api_headers, 'player', query, video_id, fatal=False, headers=api_headers,
@@ -2359,19 +2381,19 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'context', 'client', 'clientName')), 'context', 'client', 'clientName')),
'API JSON', delim=' ')) 'API JSON', delim=' '))
hls = traverse_obj( # be sure to find HLS in case of is_live
(player_response, api_player_response), hls = traverse_obj(player_response, (
(Ellipsis, 'streamingData', 'hlsManifestUrl', T(url_or_none))) 'streamingData', 'hlsManifestUrl', T(url_or_none)))
fetched_timestamp = int(time.time()) fetched_timestamp = int(time.time())
if len(hls) == 2 and not hls[0] and hls[1]: video_details = merge_dicts(*traverse_obj(
player_response['streamingData']['hlsManifestUrl'] = hls[1] (player_response, api_player_response),
else: (Ellipsis, 'videoDetails', T(dict))))
video_details = merge_dicts(*traverse_obj( player_response.update(filter_dict(
(player_response, api_player_response), api_player_response or {}, cndn=lambda k, _: k != 'captions'))
(Ellipsis, 'videoDetails', T(dict)))) player_response['videoDetails'] = video_details
player_response.update(filter_dict( if hls and not traverse_obj(player_response, (
api_player_response or {}, cndn=lambda k, _: k != 'captions')) 'streamingData', 'hlsManifestUrl', T(url_or_none))):
player_response['videoDetails'] = video_details player_response['streamingData']['hlsManifestUrl'] = hls
def is_agegated(playability): def is_agegated(playability):
# playability: dict # playability: dict
@@ -2575,6 +2597,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.write_debug(error_to_compat_str(e), only_once=True) self.write_debug(error_to_compat_str(e), only_once=True)
continue continue
if parse_qs(fmt_url).get('n'):
# this and (we assume) all the formats here are n-scrambled
break
language_preference = ( language_preference = (
10 if audio_track.get('audioIsDefault') 10 if audio_track.get('audioIsDefault')
else -10 if 'descriptive' in (traverse_obj(audio_track, ('displayName', T(lower))) or '') else -10 if 'descriptive' in (traverse_obj(audio_track, ('displayName', T(lower))) or '')