Compare commits

...

26 Commits

Author SHA1 Message Date
dirkf
956b8c5855 [YouTube] Bug-fix for c1f5c3274a 2025-11-26 03:02:36 +00:00
dirkf
d5f561166b [core] Re-work format_note display in format list with abbreviated codec name 2025-11-26 03:02:36 +00:00
dirkf
d0283f5385 [YouTube] Revert forcing player JS by default
* still leaving the parameters in place

thx bashonly for confirming this suggestion
2025-11-21 01:52:11 +00:00
dirkf
6315f4b1df [utils] Support additional codecs and dynamic_range 2025-11-21 01:52:11 +00:00
dirkf
aeb1254fcf [YouTube] Fix playlist thumbnail extraction
Thx seproDev, yt-dlp/yt-dlp#11615
2025-11-21 01:52:11 +00:00
dirkf
25890f2ad1 [YouTube] Improve detection of geo-restriction
Thx yt-dlp
2025-11-21 01:52:11 +00:00
dirkf
d65882a022 [YouTube] Improve mark_watched()
Thx: Brett824, yt-dlp/yt-dlp#4146
2025-11-21 01:52:11 +00:00
dirkf
39378f7b5c [YouTube] Fix incorrect chapter extraction
* align `_get_text()` with yt-dlp (thx, passim) at last
2025-11-21 01:52:11 +00:00
dirkf
6f5d4c3289 [YouTube] Improve targeting of pre-roll wait
Experimental for now.
Thx: yt-dlp/yt-dlp#14646
2025-11-21 01:52:11 +00:00
dirkf
5d445f8c5f [YouTube] Re-work client selection
* use `android_sdkless` by default
* use `web_safari` (HLS only) if logged in
* skip any non-HLS format with n-challenge
2025-11-21 01:52:11 +00:00
dirkf
a1e2c7d90b [YouTube] Add further InnerTube clients
FWIW: android-sdkless, tv_downgraded, web_creator
Thx yt-dlp passim
2025-11-21 01:52:11 +00:00
dirkf
c55ace3c50 [YouTube] Use insertion-order-preserving dict for InnerTube client data 2025-11-21 01:52:11 +00:00
dirkf
43e3121020 [utils] Align parse_duration() behaviour with yt-dlp
* handle comma-separated long-form durations
* support : as millisecond separator.
2025-11-21 01:52:11 +00:00
dirkf
7a488f7fae [utils] Stabilise traversal results using compat_dict
In `traverse_obj()`, use `compat_dict` to construct dicts,
ensuring insertion order sort, but`compat_builtin_dict`
to test for dict-iness...
2025-11-21 01:52:11 +00:00
dirkf
5585d76da6 [compat] Add compat_dict
A dict that preserves insertion order and otherwise resembles the
dict builtin (if it isn't it) rather than `collections.OrderedDict`.

Also:
* compat_builtins_dict: the built-in definition in case `compat_dict`
  was imported as `dict`
* compat_dict_items: use instead of `dict.items` to get items from
  a `compat_dict` in insertion order, if you didn't define `dict` as
  `compat_dict`.
2025-11-21 01:52:11 +00:00
dirkf
931e15621c [compat] Add compat_abc_ABC
Base class for abstract classes
2025-11-21 01:52:11 +00:00
dirkf
27867cc814 [compat] Add compat_thread 2025-11-21 01:52:11 +00:00
dirkf
70b40dd1ef [utils] Add subs_list_to_dict() traversal helper
Thx: yt-dlp/yt-dlp#10653, etc
2025-11-21 01:52:11 +00:00
dirkf
a9b4649d92 [utils] Apply partial_application decorator to existing functions
Thx: yt-dlp/yt-dlp#10653 (etc)
2025-11-21 01:52:11 +00:00
dirkf
23a848c314 [utils] Add partial_application decorator function
Thx: yt-dlp/yt-dlp#10653
2025-11-21 01:52:11 +00:00
dirkf
a96a778750 [core] Fix housekeeping for available_at 2025-11-21 01:52:11 +00:00
dirkf
68fe8c1781 [utils] Support traversal helper functions require, value, unpack
Thx: yt-dlp/yt-dlp#10653
2025-11-21 01:52:11 +00:00
dirkf
96419fa706 [utils] Support filter traversal key
Thx yt-dlp/yt-dlp#10653
2025-11-21 01:52:11 +00:00
dirkf
cca41c9d2c [test] Move dict_get() traversal test to its own class
Matches yt-dlp/yt-dlp#9426
2025-11-21 01:52:11 +00:00
dirkf
bc39e5e678 [test] Fix test_traversal_morsel for Py 3.14+
Thx: yt-dlp/yt-dlp#13471
2025-11-21 01:52:11 +00:00
dirkf
014ae63a11 [test] Support additional args and kwargs in report_warning() mocks 2025-11-21 01:52:11 +00:00
10 changed files with 865 additions and 239 deletions

View File

@@ -85,10 +85,10 @@ class FakeYDL(YoutubeDL):
# Silence an expected warning matching a regex
old_report_warning = self.report_warning
def report_warning(self, message):
def report_warning(self, message, *args, **kwargs):
if re.match(regex, message):
return
old_report_warning(message)
old_report_warning(message, *args, **kwargs)
self.report_warning = types.MethodType(report_warning, self)
@@ -265,11 +265,11 @@ def assertRegexpMatches(self, text, regexp, msg=None):
def expect_warnings(ydl, warnings_re):
real_warning = ydl.report_warning
def _report_warning(w):
def _report_warning(self, w, *args, **kwargs):
if not any(re.search(w_re, w) for w_re in warnings_re):
real_warning(w)
ydl.report_warning = _report_warning
ydl.report_warning = types.MethodType(_report_warning, ydl)
def http_server_port(httpd):

View File

@@ -9,21 +9,32 @@ import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import itertools
import re
from youtube_dl.traversal import (
dict_get,
get_first,
require,
subs_list_to_dict,
T,
traverse_obj,
unpack,
value,
)
from youtube_dl.compat import (
compat_chr as chr,
compat_etree_fromstring,
compat_http_cookies,
compat_map as map,
compat_str,
compat_zip as zip,
)
from youtube_dl.utils import (
determine_ext,
ExtractorError,
int_or_none,
join_nonempty,
str_or_none,
)
@@ -446,42 +457,164 @@ class TestTraversal(_TestCase):
msg='`any` should allow further branching')
def test_traversal_morsel(self):
values = {
'expires': 'a',
'path': 'b',
'comment': 'c',
'domain': 'd',
'max-age': 'e',
'secure': 'f',
'httponly': 'g',
'version': 'h',
'samesite': 'i',
}
# SameSite added in Py3.8, breaks .update for 3.5-3.7
if sys.version_info < (3, 8):
del values['samesite']
morsel = compat_http_cookies.Morsel()
# SameSite added in Py3.8, breaks .update for 3.5-3.7
# Similarly Partitioned, Py3.14, thx Grub4k
values = dict(zip(morsel, map(chr, itertools.count(ord('a')))))
morsel.set(str('item_key'), 'item_value', 'coded_value')
morsel.update(values)
values['key'] = str('item_key')
values['value'] = 'item_value'
values.update({
'key': str('item_key'),
'value': 'item_value',
}),
values = dict((str(k), v) for k, v in values.items())
# make test pass even without ordered dict
value_set = set(values.values())
for key, value in values.items():
self.assertEqual(traverse_obj(morsel, key), value,
for key, val in values.items():
self.assertEqual(traverse_obj(morsel, key), val,
msg='Morsel should provide access to all values')
self.assertEqual(set(traverse_obj(morsel, Ellipsis)), value_set,
msg='`...` should yield all values')
self.assertEqual(set(traverse_obj(morsel, lambda k, v: True)), value_set,
msg='function key should yield all values')
values = list(values.values())
self.assertMaybeCountEqual(traverse_obj(morsel, Ellipsis), values,
msg='`...` should yield all values')
self.assertMaybeCountEqual(traverse_obj(morsel, lambda k, v: True), values,
msg='function key should yield all values')
self.assertIs(traverse_obj(morsel, [(None,), any]), morsel,
msg='Morsel should not be implicitly changed to dict on usage')
def test_get_first(self):
self.assertEqual(get_first([{'a': None}, {'a': 'spam'}], 'a'), 'spam')
def test_traversal_filter(self):
data = [None, False, True, 0, 1, 0.0, 1.1, '', 'str', {}, {0: 0}, [], [1]]
self.assertEqual(
traverse_obj(data, (Ellipsis, filter)),
[True, 1, 1.1, 'str', {0: 0}, [1]],
'`filter` should filter falsy values')
class TestTraversalHelpers(_TestCase):
def test_traversal_require(self):
with self.assertRaises(ExtractorError, msg='Missing `value` should raise'):
traverse_obj(_TEST_DATA, ('None', T(require('value'))))
self.assertEqual(
traverse_obj(_TEST_DATA, ('str', T(require('value')))), 'str',
'`require` should pass through non-`None` values')
def test_subs_list_to_dict(self):
self.assertEqual(traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.vtt'},
{'name': 'en', 'url': 'https://example.com/subs/en1.ass'},
{'name': 'en', 'url': 'https://example.com/subs/en2.ass'},
], [Ellipsis, {
'id': 'name',
'url': 'url',
}, all, T(subs_list_to_dict)]), {
'de': [{'url': 'https://example.com/subs/de.vtt'}],
'en': [
{'url': 'https://example.com/subs/en1.ass'},
{'url': 'https://example.com/subs/en2.ass'},
],
}, 'function should build subtitle dict from list of subtitles')
self.assertEqual(traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.ass'},
{'name': 'de'},
{'name': 'en', 'content': 'content'},
{'url': 'https://example.com/subs/en'},
], [Ellipsis, {
'id': 'name',
'data': 'content',
'url': 'url',
}, all, T(subs_list_to_dict(lang=None))]), {
'de': [{'url': 'https://example.com/subs/de.ass'}],
'en': [{'data': 'content'}],
}, 'subs with mandatory items missing should be filtered')
self.assertEqual(traverse_obj([
{'url': 'https://example.com/subs/de.ass', 'name': 'de'},
{'url': 'https://example.com/subs/en', 'name': 'en'},
], [Ellipsis, {
'id': 'name',
'ext': ['url', T(determine_ext(default_ext=None))],
'url': 'url',
}, all, T(subs_list_to_dict(ext='ext'))]), {
'de': [{'url': 'https://example.com/subs/de.ass', 'ext': 'ass'}],
'en': [{'url': 'https://example.com/subs/en', 'ext': 'ext'}],
}, '`ext` should set default ext but leave existing value untouched')
self.assertEqual(traverse_obj([
{'name': 'en', 'url': 'https://example.com/subs/en2', 'prio': True},
{'name': 'en', 'url': 'https://example.com/subs/en1', 'prio': False},
], [Ellipsis, {
'id': 'name',
'quality': ['prio', T(int)],
'url': 'url',
}, all, T(subs_list_to_dict(ext='ext'))]), {'en': [
{'url': 'https://example.com/subs/en1', 'ext': 'ext'},
{'url': 'https://example.com/subs/en2', 'ext': 'ext'},
]}, '`quality` key should sort subtitle list accordingly')
self.assertEqual(traverse_obj([
{'name': 'de', 'url': 'https://example.com/subs/de.ass'},
{'name': 'de'},
{'name': 'en', 'content': 'content'},
{'url': 'https://example.com/subs/en'},
], [Ellipsis, {
'id': 'name',
'url': 'url',
'data': 'content',
}, all, T(subs_list_to_dict(lang='en'))]), {
'de': [{'url': 'https://example.com/subs/de.ass'}],
'en': [
{'data': 'content'},
{'url': 'https://example.com/subs/en'},
],
}, 'optionally provided lang should be used if no id available')
self.assertEqual(traverse_obj([
{'name': 1, 'url': 'https://example.com/subs/de1'},
{'name': {}, 'url': 'https://example.com/subs/de2'},
{'name': 'de', 'ext': 1, 'url': 'https://example.com/subs/de3'},
{'name': 'de', 'ext': {}, 'url': 'https://example.com/subs/de4'},
], [Ellipsis, {
'id': 'name',
'url': 'url',
'ext': 'ext',
}, all, T(subs_list_to_dict(lang=None))]), {
'de': [
{'url': 'https://example.com/subs/de3'},
{'url': 'https://example.com/subs/de4'},
],
}, 'non str types should be ignored for id and ext')
self.assertEqual(traverse_obj([
{'name': 1, 'url': 'https://example.com/subs/de1'},
{'name': {}, 'url': 'https://example.com/subs/de2'},
{'name': 'de', 'ext': 1, 'url': 'https://example.com/subs/de3'},
{'name': 'de', 'ext': {}, 'url': 'https://example.com/subs/de4'},
], [Ellipsis, {
'id': 'name',
'url': 'url',
'ext': 'ext',
}, all, T(subs_list_to_dict(lang='de'))]), {
'de': [
{'url': 'https://example.com/subs/de1'},
{'url': 'https://example.com/subs/de2'},
{'url': 'https://example.com/subs/de3'},
{'url': 'https://example.com/subs/de4'},
],
}, 'non str types should be replaced by default id')
def test_unpack(self):
self.assertEqual(
unpack(lambda *x: ''.join(map(compat_str, x)))([1, 2, 3]), '123')
self.assertEqual(
unpack(join_nonempty)([1, 2, 3]), '1-2-3')
self.assertEqual(
unpack(join_nonempty, delim=' ')([1, 2, 3]), '1 2 3')
with self.assertRaises(TypeError):
unpack(join_nonempty)()
with self.assertRaises(TypeError):
unpack()
def test_value(self):
self.assertEqual(
traverse_obj(_TEST_DATA, ('str', T(value('other')))), 'other',
'`value` should substitute specified value')
class TestDictGet(_TestCase):
def test_dict_get(self):
FALSE_VALUES = {
'none': None,
@@ -504,6 +637,9 @@ class TestTraversal(_TestCase):
self.assertEqual(dict_get(d, ('b', 'c', key, )), None)
self.assertEqual(dict_get(d, ('b', 'c', key, ), skip_false_values=False), false_value)
def test_get_first(self):
self.assertEqual(get_first([{'a': None}, {'a': 'spam'}], 'a'), 'spam')
if __name__ == '__main__':
unittest.main()

View File

@@ -69,6 +69,7 @@ from youtube_dl.utils import (
parse_iso8601,
parse_resolution,
parse_qs,
partial_application,
pkcs1pad,
prepend_extension,
read_batch_urls,
@@ -664,6 +665,8 @@ class TestUtil(unittest.TestCase):
self.assertEqual(parse_duration('3h 11m 53s'), 11513)
self.assertEqual(parse_duration('3 hours 11 minutes 53 seconds'), 11513)
self.assertEqual(parse_duration('3 hours 11 mins 53 secs'), 11513)
self.assertEqual(parse_duration('3 hours, 11 minutes, 53 seconds'), 11513)
self.assertEqual(parse_duration('3 hours, 11 mins, 53 secs'), 11513)
self.assertEqual(parse_duration('62m45s'), 3765)
self.assertEqual(parse_duration('6m59s'), 419)
self.assertEqual(parse_duration('49s'), 49)
@@ -682,6 +685,10 @@ class TestUtil(unittest.TestCase):
self.assertEqual(parse_duration('PT1H0.040S'), 3600.04)
self.assertEqual(parse_duration('PT00H03M30SZ'), 210)
self.assertEqual(parse_duration('P0Y0M0DT0H4M20.880S'), 260.88)
self.assertEqual(parse_duration('01:02:03:050'), 3723.05)
self.assertEqual(parse_duration('103:050'), 103.05)
self.assertEqual(parse_duration('1HR 3MIN'), 3780)
self.assertEqual(parse_duration('2hrs 3mins'), 7380)
def test_fix_xml_ampersands(self):
self.assertEqual(
@@ -895,6 +902,30 @@ class TestUtil(unittest.TestCase):
'vcodec': 'av01.0.05M.08',
'acodec': 'none',
})
self.assertEqual(parse_codecs('vp9.2'), {
'vcodec': 'vp9.2',
'acodec': 'none',
'dynamic_range': 'HDR10',
})
self.assertEqual(parse_codecs('vp09.02.50.10.01.09.18.09.00'), {
'vcodec': 'vp09.02.50.10.01.09.18.09.00',
'acodec': 'none',
'dynamic_range': 'HDR10',
})
self.assertEqual(parse_codecs('av01.0.12M.10.0.110.09.16.09.0'), {
'vcodec': 'av01.0.12M.10.0.110.09.16.09.0',
'acodec': 'none',
'dynamic_range': 'HDR10',
})
self.assertEqual(parse_codecs('dvhe'), {
'vcodec': 'dvhe',
'acodec': 'none',
'dynamic_range': 'DV',
})
self.assertEqual(parse_codecs('fLaC'), {
'vcodec': 'none',
'acodec': 'flac',
})
self.assertEqual(parse_codecs('theora, vorbis'), {
'vcodec': 'theora',
'acodec': 'vorbis',
@@ -1723,6 +1754,21 @@ Line 1
'a', 'b', 'c', 'd',
from_dict={'a': 'c', 'c': [], 'b': 'd', 'd': None}), 'c-d')
def test_partial_application(self):
test_fn = partial_application(lambda x, kwarg=None: '{0}, kwarg={1!r}'.format(x, kwarg))
self.assertTrue(
callable(test_fn(kwarg=10)),
'missing positional parameter should apply partially')
self.assertEqual(
test_fn(10, kwarg=42), '10, kwarg=42',
'positionally passed argument should call function')
self.assertEqual(
test_fn(x=10), '10, kwarg=None',
'keyword passed positional should call function')
self.assertEqual(
test_fn(kwarg=42)(10), '10, kwarg=42',
'call after partial application should call the function')
if __name__ == '__main__':
unittest.main()

View File

@@ -357,7 +357,7 @@ class YoutubeDL(object):
_NUMERIC_FIELDS = set((
'width', 'height', 'tbr', 'abr', 'asr', 'vbr', 'fps', 'filesize', 'filesize_approx',
'timestamp', 'upload_year', 'upload_month', 'upload_day',
'timestamp', 'upload_year', 'upload_month', 'upload_day', 'available_at',
'duration', 'view_count', 'like_count', 'dislike_count', 'repost_count',
'average_rating', 'comment_count', 'age_limit',
'start_time', 'end_time',
@@ -2404,60 +2404,52 @@ class YoutubeDL(object):
return res
def _format_note(self, fdict):
res = ''
if fdict.get('ext') in ['f4f', 'f4m']:
res += '(unsupported) '
if fdict.get('language'):
if res:
res += ' '
res += '[%s] ' % fdict['language']
if fdict.get('format_note') is not None:
res += fdict['format_note'] + ' '
if fdict.get('tbr') is not None:
res += '%4dk ' % fdict['tbr']
def simplified_codec(f, field):
assert field in ('acodec', 'vcodec')
codec = f.get(field)
return (
'unknown' if not codec
else '.'.join(codec.split('.')[:4]) if codec != 'none'
else 'images' if field == 'vcodec' and f.get('acodec') == 'none'
else None if field == 'acodec' and f.get('vcodec') == 'none'
else 'audio only' if field == 'vcodec'
else 'video only')
res = join_nonempty(
fdict.get('ext') in ('f4f', 'f4m') and '(unsupported)',
fdict.get('language') and ('[%s]' % (fdict['language'],)),
fdict.get('format_note') is not None and fdict['format_note'],
fdict.get('tbr') is not None and ('%4dk' % fdict['tbr']),
delim=' ')
res = [res] if res else []
if fdict.get('container') is not None:
if res:
res += ', '
res += '%s container' % fdict['container']
if (fdict.get('vcodec') is not None
and fdict.get('vcodec') != 'none'):
if res:
res += ', '
res += fdict['vcodec']
if fdict.get('vbr') is not None:
res += '@'
res.append('%s container' % (fdict['container'],))
if fdict.get('vcodec') not in (None, 'none'):
codec = simplified_codec(fdict, 'vcodec')
if codec and fdict.get('vbr') is not None:
codec += '@'
elif fdict.get('vbr') is not None and fdict.get('abr') is not None:
res += 'video@'
if fdict.get('vbr') is not None:
res += '%4dk' % fdict['vbr']
codec = 'video@'
else:
codec = None
codec = join_nonempty(codec, fdict.get('vbr') is not None and ('%4dk' % fdict['vbr']))
if codec:
res.append(codec)
if fdict.get('fps') is not None:
if res:
res += ', '
res += '%sfps' % fdict['fps']
if fdict.get('acodec') is not None:
if res:
res += ', '
if fdict['acodec'] == 'none':
res += 'video only'
else:
res += '%-5s' % fdict['acodec']
elif fdict.get('abr') is not None:
if res:
res += ', '
res += 'audio'
if fdict.get('abr') is not None:
res += '@%3dk' % fdict['abr']
if fdict.get('asr') is not None:
res += ' (%5dHz)' % fdict['asr']
res.append('%sfps' % (fdict['fps'],))
codec = (
simplified_codec(fdict, 'acodec') if fdict.get('acodec') is not None
else 'audio' if fdict.get('abr') is not None else None)
if codec:
res.append(join_nonempty(
'%-4s' % (codec + (('@%3dk' % fdict['abr']) if fdict.get('abr') else ''),),
fdict.get('asr') and '(%5dHz)' % fdict['asr'], delim=' '))
if fdict.get('filesize') is not None:
if res:
res += ', '
res += format_bytes(fdict['filesize'])
res.append(format_bytes(fdict['filesize']))
elif fdict.get('filesize_approx') is not None:
if res:
res += ', '
res += '~' + format_bytes(fdict['filesize_approx'])
return res
res.append('~' + format_bytes(fdict['filesize_approx']))
return ', '.join(res)
def list_formats(self, info_dict):
formats = info_dict.get('formats', [info_dict])

View File

@@ -55,7 +55,7 @@ except AttributeError:
try:
import collections.abc as compat_collections_abc
except ImportError:
import collections as compat_collections_abc
compat_collections_abc = collections
# compat_urllib_request
@@ -3452,6 +3452,8 @@ except ImportError:
except ImportError:
compat_map = map
# compat_filter, compat_filter_fns
try:
from future_builtins import filter as compat_filter
except ImportError:
@@ -3459,6 +3461,9 @@ except ImportError:
from itertools import ifilter as compat_filter
except ImportError:
compat_filter = filter
# "Is this function one or maybe the other filter()?"
compat_filter_fns = tuple(set((filter, compat_filter)))
# compat_zip
try:
@@ -3478,6 +3483,40 @@ except ImportError:
from itertools import izip_longest as compat_itertools_zip_longest
# compat_abc_ABC
try:
from abc import ABC as compat_abc_ABC
except ImportError:
# Py < 3.4
from abc import ABCMeta as _ABCMeta
compat_abc_ABC = _ABCMeta(str('ABC'), (object,), {})
# dict mixin used here
# like UserDict.DictMixin, without methods created by MutableMapping
class _DictMixin(compat_abc_ABC):
def has_key(self, key):
return key in self
# get(), clear(), setdefault() in MM
def iterkeys(self):
return (k for k in self)
def itervalues(self):
return (self[k] for k in self)
def iteritems(self):
return ((k, self[k]) for k in self)
# pop(), popitem() in MM
def copy(self):
return type(self)(self)
# update() in MM
# compat_collections_chain_map
# collections.ChainMap: new class
try:
@@ -3632,6 +3671,129 @@ except ImportError:
compat_zstandard = None
# compat_thread
try:
import _thread as compat_thread
except ImportError:
try:
import thread as compat_thread
except ImportError:
import dummy_thread as compat_thread
# compat_dict
# compat_builtins_dict
# compat_dict_items
if sys.version_info >= (3, 6):
compat_dict = compat_builtins_dict = dict
compat_dict_items = dict.items
else:
_get_ident = compat_thread.get_ident
class compat_dict(compat_collections_abc.MutableMapping, _DictMixin, dict):
"""`dict` that preserves insertion order with interface like Py3.7+"""
_order = [] # default that should never be used
def __init__(self, *mappings_or_iterables, **kwargs):
# order an unordered dict using a list of keys: actual Py 2.7+
# OrderedDict uses a doubly linked list for better performance
self._order = []
for arg in mappings_or_iterables:
self.__update(arg)
if kwargs:
self.__update(kwargs)
def __getitem__(self, key):
return dict.__getitem__(self, key)
def __setitem__(self, key, value):
try:
if key not in self._order:
self._order.append(key)
dict.__setitem__(self, key, value)
except Exception:
if key in self._order[-1:] and key not in self:
del self._order[-1]
raise
def __len__(self):
return dict.__len__(self)
def __delitem__(self, key):
dict.__delitem__(self, key)
try:
# expected case, O(len(self)), but who dels anyway?
self._order.remove(key)
except ValueError:
pass
def __iter__(self):
for from_ in self._order:
if from_ in self:
yield from_
def __del__(self):
for attr in ('_order',):
try:
delattr(self, attr)
except Exception:
pass
def __repr__(self, _repr_running={}):
# skip recursive items ...
call_key = id(self), _get_ident()
if _repr_running.get(call_key):
return '...'
_repr_running[call_key] = True
try:
return '%s({%s})' % (
type(self).__name__,
','.join('%r: %r' % k_v for k_v in self.items()))
finally:
del _repr_running[call_key]
# merge/update (PEP 584)
def __or__(self, other):
if not isinstance(other, compat_collections_abc.Mapping):
return NotImplemented
new = type(self)(self)
new.update(other)
return new
def __ror__(self, other):
if not isinstance(other, compat_collections_abc.Mapping):
return NotImplemented
new = type(other)(other)
new.update(self)
return new
def __ior__(self, other):
self.update(other)
return self
# optimisations
def __reversed__(self):
for from_ in reversed(self._order):
if from_ in self:
yield from_
def __contains__(self, item):
return dict.__contains__(self, item)
# allow overriding update without breaking __init__
def __update(self, *args, **kwargs):
super(compat_dict, self).update(*args, **kwargs)
compat_builtins_dict = dict
# Using the object's method, not dict's:
# an ordered dict's items can be returned unstably by unordered
# dict.items as if the method was not ((k, self[k]) for k in self)
compat_dict_items = lambda d: d.items()
legacy = [
'compat_HTMLParseError',
'compat_HTMLParser',
@@ -3662,9 +3824,11 @@ legacy = [
__all__ = [
'compat_Struct',
'compat_abc_ABC',
'compat_base64_b64decode',
'compat_basestring',
'compat_brotli',
'compat_builtins_dict',
'compat_casefold',
'compat_chr',
'compat_collections_abc',
@@ -3672,9 +3836,12 @@ __all__ = [
'compat_contextlib_suppress',
'compat_ctypes_WINFUNCTYPE',
'compat_datetime_timedelta_total_seconds',
'compat_dict',
'compat_dict_items',
'compat_etree_fromstring',
'compat_etree_iterfind',
'compat_filter',
'compat_filter_fns',
'compat_get_terminal_size',
'compat_getenv',
'compat_getpass_getpass',
@@ -3716,6 +3883,7 @@ __all__ = [
'compat_struct_unpack',
'compat_subprocess_get_DEVNULL',
'compat_subprocess_Popen',
'compat_thread',
'compat_tokenize_tokenize',
'compat_urllib_error',
'compat_urllib_parse',

View File

@@ -214,6 +214,7 @@ class InfoExtractor(object):
width : height ratio as float.
* no_resume The server does not support resuming the
(HTTP or RTMP) download. Boolean.
* available_at Unix timestamp of when a format will be available to download
* downloader_options A dictionary of downloader options as
described in FileDownloader

View File

@@ -17,6 +17,8 @@ from ..compat import (
compat_chr,
compat_HTTPError,
compat_map as map,
compat_dict as o_dict,
compat_dict_items as dict_items,
compat_str,
compat_urllib_parse,
compat_urllib_parse_parse_qs as compat_parse_qs,
@@ -86,8 +88,24 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
_PLAYLIST_ID_RE = r'(?:(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}|RDMM)'
_INNERTUBE_CLIENTS = {
'ios': {
# priority order for now
_INNERTUBE_CLIENTS = o_dict((
# Doesn't require a PoToken for some reason: thx yt-dlp/yt-dlp#14693
('android_sdkless', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'ANDROID',
'clientVersion': '20.10.38',
'userAgent': 'com.google.android.youtube/20.10.38 (Linux; U; Android 11) gzip',
'osName': 'Android',
'osVersion': '11',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
'REQUIRE_JS_PLAYER': False,
'WITH_COOKIES': False,
}),
('ios', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'IOS',
@@ -100,12 +118,13 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
'REQUIRE_PO_TOKEN': False,
'REQUIRE_PO_TOKEN': True,
'REQUIRE_JS_PLAYER': False,
},
'WITH_COOKIES': False,
}),
# mweb has 'ultralow' formats
# See: https://github.com/yt-dlp/yt-dlp/pull/557
'mweb': {
('mweb', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'MWEB',
@@ -116,9 +135,19 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 2,
'REQUIRE_PO_TOKEN': True,
}),
('tv_downgraded', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'TVHTML5',
'clientVersion': '4', # avoids SABR formats, thx yt-dlp/yt-dlp#14887
'userAgent': 'Mozilla/5.0 (ChromiumStylePlatform) Cobalt/Version',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
'SUPPORTS_COOKIES': True,
},
'tv': {
}),
('tv', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'TVHTML5',
@@ -128,10 +157,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 7,
'SUPPORTS_COOKIES': True,
},
'web': {
}),
('web', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
@@ -141,10 +168,20 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 1,
'REQUIRE_PO_TOKEN': True,
}),
('web_embedded', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB_EMBEDDED_PLAYER',
'clientVersion': '1.20250923.21.00',
'embedUrl': 'https://www.youtube.com/', # Can be any valid URL
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 56,
'SUPPORTS_COOKIES': True,
},
}),
# Safari UA returns pre-merged video+audio 144p/240p/360p/720p/1080p HLS formats
'web_safari': {
('web_safari', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB',
@@ -152,8 +189,24 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
'userAgent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15,gzip(gfe)',
},
},
},
}
'INNERTUBE_CONTEXT_CLIENT_NAME': 1,
'SUPPORTS_COOKIES': True,
'REQUIRE_PO': True,
}),
# This client now requires sign-in for every video
('web_creator', {
'INNERTUBE_CONTEXT': {
'client': {
'clientName': 'WEB_CREATOR',
'clientVersion': '1.20250922.03.00',
},
},
'INNERTUBE_CONTEXT_CLIENT_NAME': 62,
'REQUIRE_AUTH': True,
'SUPPORTS_COOKIES': True,
'WITH_COOKIES': True,
}),
))
def _login(self):
"""
@@ -430,6 +483,12 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
('responseContext', 'visitorData')),
T(compat_str)))
# @functools.cached_property
def is_authenticated(self, _cache={}):
if self not in _cache:
_cache[self] = bool(self._generate_sapisidhash_header())
return _cache[self]
def _extract_ytcfg(self, video_id, webpage):
ytcfg = self._search_json(
r'ytcfg\.set\s*\(', webpage, 'ytcfg', video_id,
@@ -474,6 +533,27 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
'uploader': uploader,
}
@staticmethod
def _get_text(data, *path_list, **kw_max_runs):
max_runs = kw_max_runs.get('max_runs')
for path in path_list or [None]:
if path is None:
obj = [data] # shortcut
else:
obj = traverse_obj(data, tuple(variadic(path) + (all,)))
for runs in traverse_obj(
obj, ('simpleText', {'text': T(compat_str)}, all, filter),
('runs', lambda _, r: isinstance(r.get('text'), compat_str), all, filter),
(T(list), lambda _, r: isinstance(r.get('text'), compat_str)),
default=[]):
max_runs = int_or_none(max_runs, default=len(runs))
if max_runs < len(runs):
runs = runs[:max_runs]
text = ''.join(traverse_obj(runs, (Ellipsis, 'text')))
if text:
return text
@staticmethod
def _extract_thumbnails(data, *path_list, **kw_final_key):
"""
@@ -1589,10 +1669,12 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'_rtmp': {'protocol': 'rtmp'},
# av01 video only formats sometimes served with "unknown" codecs
'394': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
'395': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
'396': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
'397': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
'394': {'acodec': 'none', 'vcodec': 'av01.0.00M.08'},
'395': {'acodec': 'none', 'vcodec': 'av01.0.00M.08'},
'396': {'acodec': 'none', 'vcodec': 'av01.0.01M.08'},
'397': {'acodec': 'none', 'vcodec': 'av01.0.04M.08'},
'398': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
'399': {'acodec': 'none', 'vcodec': 'av01.0.08M.08'},
}
_PLAYER_JS_VARIANT_MAP = (
@@ -1619,16 +1701,17 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self._player_cache = {}
def _get_player_js_version(self):
player_js_version = self.get_param('youtube_player_js_version') or '20348@0004de42'
sts_hash = self._search_regex(
('^actual$(^)?(^)?', r'^([0-9]{5,})@([0-9a-f]{8,})$'),
player_js_version, 'player_js_version', group=(1, 2), default=None)
if sts_hash:
return sts_hash
self.report_warning(
'Invalid player JS version "{0}" specified. '
'It should be "{1}" or in the format of {2}'.format(
player_js_version, 'actual', 'SignatureTimeStamp@Hash'), only_once=True)
player_js_version = self.get_param('youtube_player_js_version')
if player_js_version:
sts_hash = self._search_regex(
('^actual$(^)?(^)?', r'^([0-9]{5,})@([0-9a-f]{8,})$'),
player_js_version, 'player_js_version', group=(1, 2), default=None)
if sts_hash:
return sts_hash
self.report_warning(
'Invalid player JS version "{0}" specified. '
'It should be "{1}" or in the format of {2}'.format(
player_js_version, 'actual', 'SignatureTimeStamp@Hash'), only_once=True)
return None, None
# *ytcfgs, webpage=None
@@ -1643,18 +1726,18 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
ytcfgs = ytcfgs + ({'PLAYER_JS_URL': player_url},)
player_url = traverse_obj(
ytcfgs, (Ellipsis, 'PLAYER_JS_URL'), (Ellipsis, 'WEB_PLAYER_CONTEXT_CONFIGS', Ellipsis, 'jsUrl'),
get_all=False, expected_type=lambda u: urljoin('https://www.youtube.com', u))
get_all=False, expected_type=self._yt_urljoin)
player_id_override = self._get_player_js_version()[1]
requested_js_variant = self.get_param('youtube_player_js_variant') or 'main'
requested_js_variant = self.get_param('youtube_player_js_variant')
variant_js = next(
(v for k, v in self._PLAYER_JS_VARIANT_MAP if k == requested_js_variant),
None)
if variant_js:
player_id_override = self._get_player_js_version()[1]
player_id = player_id_override or self._extract_player_info(player_url)
original_url = player_url
player_url = '/s/player/{0}/{1}'.format(player_id, variant_js)
player_url = self._yt_urljoin(
'/s/player/{0}/{1}'.format(player_id, variant_js))
if original_url != player_url:
self.write_debug(
'Forcing "{0}" player JS variant for player {1}\n'
@@ -1668,7 +1751,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
requested_js_variant, ','.join(k for k, _ in self._PLAYER_JS_VARIANT_MAP)),
only_once=True)
return urljoin('https://www.youtube.com', player_url)
return player_url
def _download_player_url(self, video_id, fatal=False):
res = self._download_webpage(
@@ -2048,8 +2131,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return self._cached(self._decrypt_nsig, 'nsig', n, player_url)
for fmt in formats:
parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url'])
n_param = compat_parse_qs(parsed_fmt_url.query).get('n')
n_param = parse_qs(fmt['url']).get('n')
if not n_param:
continue
n_param = n_param[-1]
@@ -2098,32 +2180,35 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return sts
def _mark_watched(self, video_id, player_response):
playback_url = url_or_none(try_get(
player_response,
lambda x: x['playbackTracking']['videostatsPlaybackUrl']['baseUrl']))
if not playback_url:
return
# cpn generation algorithm is reverse engineered from base.js.
# In fact it works even with dummy cpn.
CPN_ALPHABET = string.ascii_letters + string.digits + '-_'
cpn = ''.join(CPN_ALPHABET[random.randint(0, 256) & 63] for _ in range(16))
# more consistent results setting it to right before the end
qs = parse_qs(playback_url)
video_length = '{0}'.format(float((qs.get('len') or ['1.5'])[0]) - 1)
for is_full, key in enumerate(('videostatsPlaybackUrl', 'videostatsWatchtimeUrl')):
label = 'fully ' if is_full > 0 else ''
playback_url = update_url_query(
playback_url, {
'ver': '2',
'cpn': cpn,
'cmt': video_length,
'el': 'detailpage', # otherwise defaults to "shorts"
})
playback_url = traverse_obj(player_response, (
'playbackTracking'. key, 'baseUrl', T(url_or_none)))
if not playback_url:
self.report_warning('Unable to mark {0}watched'.format(label))
continue
self._download_webpage(
playback_url, video_id, 'Marking watched',
'Unable to mark watched', fatal=False)
# more consistent results setting it to right before the end
qs = parse_qs(playback_url)
video_length = '{0}'.format(float((qs.get('len') or ['1.5'])[0]) - 1)
playback_url = update_url_query(
playback_url, {
'ver': '2',
'cpn': cpn,
'cmt': video_length,
'el': 'detailpage', # otherwise defaults to "shorts"
})
self._download_webpage(
playback_url, video_id, 'Marking {0}watched'.format(label),
'Unable to mark watched', fatal=False)
@staticmethod
def _extract_urls(webpage):
@@ -2215,6 +2300,49 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
(r'%s\s*%s' % (regex, self._YT_INITIAL_BOUNDARY_RE),
regex), webpage, name, default='{}'), video_id, fatal=False)
def _get_preroll_length(self, ad_slot_lists):
def parse_instream_ad_renderer(instream_renderer):
for skippable, path in (
('', ('skipOffsetMilliseconds', T(int))),
('non-', ('playerVars', T(compat_parse_qs),
'length_seconds', -1, T(int_or_none(invscale=1000))))):
length_ms = traverse_obj(instream_renderer, path)
if length_ms is not None:
self.write_debug('Detected a %ds %sskippable ad' % (
length_ms // 1000, skippable))
break
return length_ms
for slot_renderer in traverse_obj(ad_slot_lists, ('adSlots', Ellipsis, 'adSlotRenderer', T(dict))):
if traverse_obj(slot_renderer, ('adSlotMetadata', 'triggerEvent')) != 'SLOT_TRIGGER_EVENT_BEFORE_CONTENT':
continue
rendering_content = traverse_obj(slot_renderer, (
'fulfillmentContent', 'fulfilledLayout', 'playerBytesAdLayoutRenderer',
'renderingContent', 'instreamVideoAdRenderer', T(dict)))
length_ms = parse_instream_ad_renderer(rendering_content)
if length_ms is not None:
return length_ms
times = traverse_obj(rendering_content, ((
('playerBytesSequentialLayoutRenderer', 'sequentialLayouts'),
None), any, Ellipsis, 'playerBytesAdLayoutRenderer',
'renderingContent', 'instreamVideoAdRenderer',
T(parse_instream_ad_renderer)))
if times:
return sum(times)
return 0
def _is_premium_subscriber(self, initial_data):
if not self.is_authenticated or not initial_data:
return False
tlr = traverse_obj(
initial_data, ('topbar', 'desktopTopbarRenderer', 'logo', 'topbarLogoRenderer'))
return (
traverse_obj(tlr, ('iconImage', 'iconType')) == 'YOUTUBE_PREMIUM_LOGO'
or 'premium' in (self._get_text(tlr, 'tooltipText') or '').lower()
)
def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url, {})
video_id = self._match_id(url)
@@ -2242,32 +2370,36 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
if True or not player_response:
origin = 'https://www.youtube.com'
pb_context = {'html5Preference': 'HTML5_PREF_WANTS'}
fetched_timestamp = int(time.time())
player_url = self._extract_player_url(webpage)
ytcfg = self._extract_ytcfg(video_id, webpage or '')
sts = self._extract_signature_timestamp(video_id, player_url, ytcfg)
if sts:
pb_context['signatureTimestamp'] = sts
client_names = traverse_obj(self._INNERTUBE_CLIENTS, (
T(dict.items), lambda _, k_v: not k_v[1].get('REQUIRE_PO_TOKEN'),
0))[:1]
auth = self._generate_sapisidhash_header(origin)
client_names = []
if auth or self._is_premium_subscriber(player_response):
client_names = traverse_obj(self._INNERTUBE_CLIENTS, (
T(dict_items), lambda _, k_v: k_v[0] == 'web_safari', 0))[:1]
if not client_names:
client_names = traverse_obj(self._INNERTUBE_CLIENTS, (
T(dict_items), lambda _, k_v: not (
k_v[1].get('REQUIRE_PO_TOKEN')
or (bool(k_v[1].get('WITH_COOKIES', auth)) ^ bool(auth))
), 0))[:1]
if 'web' not in client_names:
# webpage links won't download: ignore links and playability
# only live HLS webpage links will download: ignore playability
player_response = filter_dict(
player_response or {},
lambda k, _: k not in ('streamingData', 'playabilityStatus'))
if is_live and 'ios' not in client_names:
client_names.append('ios')
lambda k, _: k != 'playabilityStatus')
headers = {
'Sec-Fetch-Mode': 'navigate',
'Origin': origin,
'X-Goog-Visitor-Id': self._extract_visitor_data(ytcfg) or '',
}
auth = self._generate_sapisidhash_header(origin)
if auth is not None:
headers['Authorization'] = auth
headers['X-Origin'] = origin
@@ -2297,7 +2429,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'INNERTUBE_CONTEXT', 'client', 'clientVersion'),
'User-Agent': (
'INNERTUBE_CONTEXT', 'client', 'userAgent'),
}))
}) or {})
api_player_response = self._call_api(
'player', query, video_id, fatal=False, headers=api_headers,
@@ -2306,19 +2438,22 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
'context', 'client', 'clientName')),
'API JSON', delim=' '))
hls = traverse_obj(
(player_response, api_player_response),
(Ellipsis, 'streamingData', 'hlsManifestUrl', T(url_or_none)))
# be sure to find HLS in case of is_live
hls = traverse_obj(player_response, (
'streamingData', 'hlsManifestUrl', T(url_or_none)))
fetched_timestamp = int(time.time())
if len(hls) == 2 and not hls[0] and hls[1]:
player_response['streamingData']['hlsManifestUrl'] = hls[1]
else:
video_details = merge_dicts(*traverse_obj(
(player_response, api_player_response),
(Ellipsis, 'videoDetails', T(dict))))
player_response.update(filter_dict(
api_player_response or {}, cndn=lambda k, _: k != 'captions'))
player_response['videoDetails'] = video_details
preroll_length_ms = (
self._get_preroll_length(api_player_response)
or self._get_preroll_length(player_response))
video_details = merge_dicts(*traverse_obj(
(player_response, api_player_response),
(Ellipsis, 'videoDetails', T(dict))))
player_response.update(filter_dict(
api_player_response or {}, cndn=lambda k, _: k != 'captions'))
player_response['videoDetails'] = video_details
if hls and not traverse_obj(player_response, (
'streamingData', 'hlsManifestUrl', T(url_or_none))):
player_response['streamingData']['hlsManifestUrl'] = hls
def is_agegated(playability):
# playability: dict
@@ -2385,10 +2520,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
return self.url_result(
trailer_video_id, self.ie_key(), trailer_video_id)
def get_text(x):
return ''.join(traverse_obj(
x, (('simpleText',),), ('runs', Ellipsis, 'text'),
expected_type=compat_str))
get_text = lambda x: self._get_text(x) or ''
search_meta = (
(lambda x: self._html_search_meta(x, webpage, default=None))
@@ -2476,7 +2608,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
elif fetched_timestamp is not None:
# Handle preroll waiting period
preroll_sleep = self.get_param('youtube_preroll_sleep')
preroll_sleep = int_or_none(preroll_sleep, default=6)
preroll_sleep = min(6, int_or_none(preroll_sleep, default=preroll_length_ms / 1000))
fetched_timestamp += preroll_sleep
for fmt in streaming_formats:
@@ -2522,6 +2654,10 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.write_debug(error_to_compat_str(e), only_once=True)
continue
if parse_qs(fmt_url).get('n'):
# this and (we assume) all the formats here are n-scrambled
break
language_preference = (
10 if audio_track.get('audioIsDefault')
else -10 if 'descriptive' in (traverse_obj(audio_track, ('displayName', T(lower))) or '')
@@ -2654,7 +2790,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
subreason = pemr.get('subreason')
if subreason:
subreason = clean_html(get_text(subreason))
if subreason == 'The uploader has not made this video available in your country.':
if subreason.startswith('The uploader has not made this video available in your country'):
countries = microformat.get('availableCountries')
if not countries:
regions_allowed = search_meta('regionsAllowed')
@@ -2848,24 +2984,21 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
chapters = self._extract_chapters_from_json(
initial_data, video_id, duration)
if not chapters:
for engagment_pannel in (initial_data.get('engagementPanels') or []):
contents = try_get(
engagment_pannel, lambda x: x['engagementPanelSectionListRenderer']['content']['macroMarkersListRenderer']['contents'],
list)
if not contents:
continue
def chapter_time(mmlir):
return parse_duration(
get_text(mmlir.get('timeDescription')))
def chapter_time(mmlir):
return parse_duration(
get_text(mmlir.get('timeDescription')))
for markers in traverse_obj(initial_data, (
'engagementPanels', Ellipsis, 'engagementPanelSectionListRenderer',
'content', 'macroMarkersListRenderer', 'contents', T(list))):
chapters = []
for next_num, content in enumerate(contents, start=1):
for next_num, content in enumerate(markers, start=1):
mmlir = content.get('macroMarkersListItemRenderer') or {}
start_time = chapter_time(mmlir)
end_time = (traverse_obj(
contents, (next_num, 'macroMarkersListItemRenderer', T(chapter_time)))
if next_num < len(contents) else duration)
end_time = (traverse_obj(markers, (
next_num, 'macroMarkersListItemRenderer', T(chapter_time)))
if next_num < len(markers) else duration)
if start_time is None or end_time is None:
continue
chapters.append({
@@ -3424,12 +3557,6 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
T(dict.items), lambda _, k_v: k_v[0].startswith('grid') and k_v[0].endswith('Renderer'),
1, T(dict)), get_all=False)
@staticmethod
def _get_text(r, k):
return traverse_obj(
r, (k, 'runs', 0, 'text'), (k, 'simpleText'),
expected_type=txt_or_none)
def _grid_entries(self, grid_renderer):
for item in traverse_obj(grid_renderer, ('items', Ellipsis, T(dict))):
lockup_view_model = traverse_obj(item, ('lockupViewModel', T(dict)))
@@ -3540,15 +3667,25 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
'Unsupported lockup view model content type "{0}"{1}'.format(content_type, bug_reports_message()),
only_once=True)
return
thumb_keys = ('contentImage',) + thumb_keys + ('thumbnailViewModel', 'image')
return merge_dicts(self.url_result(
url, ie=ie.ie_key(), video_id=content_id), {
'title': traverse_obj(view_model, (
'metadata', 'lockupMetadataViewModel', 'title',
'content', T(compat_str))),
'thumbnails': self._extract_thumbnails(
view_model, thumb_keys, final_key='sources'),
})
url, ie=ie.ie_key(), video_id=content_id),
traverse_obj(view_model, {
'title': ('metadata', 'lockupMetadataViewModel', 'title',
'content', T(compat_str)),
'thumbnails': T(lambda vm: self._extract_thumbnails(
vm, thumb_keys, final_key='sources')),
'duration': (
'contentImage', 'thumbnailViewModel', 'overlays',
Ellipsis, (
('thumbnailBottomOverlayViewModel', 'badges'),
('thumbnailOverlayBadgeViewModel', 'thumbnailBadges')
), Ellipsis, 'thumbnailBadgeViewModel', 'text',
T(parse_duration), any),
})
)
def _extract_shorts_lockup_view_model(self, view_model):
content_id = traverse_obj(view_model, (
@@ -3676,7 +3813,7 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor):
continuation = None
for is_renderer in traverse_obj(slr_renderer, (
'contents', Ellipsis, 'itemSectionRenderer', T(dict))):
for isr_content in traverse_obj(slr_renderer, (
for isr_content in traverse_obj(is_renderer, (
'contents', Ellipsis, T(dict))):
renderer = isr_content.get('playlistVideoListRenderer')
if renderer:

View File

@@ -421,12 +421,12 @@ def parseOpts(overrideArguments=None):
action='store', dest='youtube_player_js_variant',
help='For YouTube, the player javascript variant to use for n/sig deciphering; `actual` to follow the site; default `%default`.',
choices=('actual', 'main', 'tcc', 'tce', 'es5', 'es6', 'tv', 'tv_es6', 'phone', 'tablet'),
default='main', metavar='VARIANT')
default='actual', metavar='VARIANT')
video_format.add_option(
'--youtube-player-js-version',
action='store', dest='youtube_player_js_version',
help='For YouTube, the player javascript version to use for n/sig deciphering, specified as `signature_timestamp@hash`, or `actual` to follow the site; default `%default`',
default='20348@0004de42', metavar='STS@HASH')
default='actual', metavar='STS@HASH')
video_format.add_option(
'--merge-output-format',
action='store', dest='merge_output_format', metavar='FORMAT', default=None,

View File

@@ -5,6 +5,10 @@
from .utils import (
dict_get,
get_first,
require,
subs_list_to_dict,
T,
traverse_obj,
unpack,
value,
)

View File

@@ -53,6 +53,8 @@ from .compat import (
compat_etree_fromstring,
compat_etree_iterfind,
compat_expanduser,
compat_filter as filter,
compat_filter_fns,
compat_html_entities,
compat_html_entities_html5,
compat_http_client,
@@ -1859,6 +1861,39 @@ def write_json_file(obj, fn):
raise
class partial_application(object):
"""Allow a function to use pre-set argument values"""
# see _try_bind_args()
try:
inspect.signature
@staticmethod
def required_args(fn):
return [
param.name for param in inspect.signature(fn).parameters.values()
if (param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
and param.default is inspect.Parameter.empty)]
except AttributeError:
# Py < 3.3
@staticmethod
def required_args(fn):
fn_args = inspect.getargspec(fn)
n_defaults = len(fn_args.defaults or [])
return (fn_args.args or [])[:-n_defaults if n_defaults > 0 else None]
def __new__(cls, func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
if set(cls.required_args(func)[len(args):]).difference(kwargs):
return functools.partial(func, *args, **kwargs)
return func(*args, **kwargs)
return wrapped
if sys.version_info >= (2, 7):
def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """
@@ -3152,6 +3187,7 @@ def extract_timezone(date_str):
return timezone, date_str
@partial_application
def parse_iso8601(date_str, delimiter='T', timezone=None):
""" Return a UNIX timestamp from the given date """
@@ -3229,6 +3265,7 @@ def unified_timestamp(date_str, day_first=True):
return calendar.timegm(timetuple) + pm_delta * 3600 - compat_datetime_timedelta_total_seconds(timezone)
@partial_application
def determine_ext(url, default_ext='unknown_video'):
if url is None or '.' not in url:
return default_ext
@@ -3807,6 +3844,7 @@ def base_url(url):
return re.match(r'https?://[^?#&]+/', url).group()
@partial_application
def urljoin(base, path):
path = _decode_compat_str(path, encoding='utf-8', or_none=True)
if not path:
@@ -3831,6 +3869,7 @@ class PUTRequest(compat_urllib_request.Request):
return 'PUT'
@partial_application
def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1, base=None):
if get_attr:
if v is not None:
@@ -3857,6 +3896,7 @@ def str_to_int(int_str):
return int_or_none(int_str)
@partial_application
def float_or_none(v, scale=1, invscale=1, default=None):
if v is None:
return default
@@ -3891,38 +3931,46 @@ def parse_duration(s):
return None
s = s.strip()
if not s:
return None
days, hours, mins, secs, ms = [None] * 5
m = re.match(r'(?:(?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?Z?$', s)
m = re.match(r'''(?x)
(?P<before_secs>
(?:(?:(?P<days>[0-9]+):)?(?P<hours>[0-9]+):)?
(?P<mins>[0-9]+):)?
(?P<secs>(?(before_secs)[0-9]{1,2}|[0-9]+))
(?:[.:](?P<ms>[0-9]+))?Z?$
''', s)
if m:
days, hours, mins, secs, ms = m.groups()
days, hours, mins, secs, ms = m.group('days', 'hours', 'mins', 'secs', 'ms')
else:
m = re.match(
r'''(?ix)(?:P?
(?:
[0-9]+\s*y(?:ears?)?\s*
[0-9]+\s*y(?:ears?)?,?\s*
)?
(?:
[0-9]+\s*m(?:onths?)?\s*
[0-9]+\s*m(?:onths?)?,?\s*
)?
(?:
[0-9]+\s*w(?:eeks?)?\s*
[0-9]+\s*w(?:eeks?)?,?\s*
)?
(?:
(?P<days>[0-9]+)\s*d(?:ays?)?\s*
(?P<days>[0-9]+)\s*d(?:ays?)?,?\s*
)?
T)?
(?:
(?P<hours>[0-9]+)\s*h(?:ours?)?\s*
(?P<hours>[0-9]+)\s*h(?:(?:ou)?rs?)?,?\s*
)?
(?:
(?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?\s*
(?P<mins>[0-9]+)\s*m(?:in(?:ute)?s?)?,?\s*
)?
(?:
(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*s(?:ec(?:ond)?s?)?\s*
(?P<secs>[0-9]+)(?:\.(?P<ms>[0-9]+))?\s*s(?:ec(?:ond)?s?)?\s*
)?Z?$''', s)
if m:
days, hours, mins, secs, ms = m.groups()
days, hours, mins, secs, ms = m.group('days', 'hours', 'mins', 'secs', 'ms')
else:
m = re.match(r'(?i)(?:(?P<hours>[0-9.]+)\s*(?:hours?)|(?P<mins>[0-9.]+)\s*(?:mins?\.?|minutes?)\s*)Z?$', s)
if m:
@@ -3930,17 +3978,13 @@ def parse_duration(s):
else:
return None
duration = 0
if secs:
duration += float(secs)
if mins:
duration += float(mins) * 60
if hours:
duration += float(hours) * 60 * 60
if days:
duration += float(days) * 24 * 60 * 60
if ms:
duration += float(ms)
duration = (
((((float(days) * 24) if days else 0)
+ (float(hours) if hours else 0)) * 60
+ (float(mins) if mins else 0)) * 60
+ (float(secs) if secs else 0)
+ (float(ms) / 10 ** len(ms) if ms else 0))
return duration
@@ -4251,6 +4295,7 @@ def urlencode_postdata(*args, **kargs):
return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
@partial_application
def update_url(url, **kwargs):
"""Replace URL components specified by kwargs
url: compat_str or parsed URL tuple
@@ -4272,6 +4317,7 @@ def update_url(url, **kwargs):
return compat_urllib_parse.urlunparse(url._replace(**kwargs))
@partial_application
def update_url_query(url, query):
return update_url(url, query_update=query)
@@ -4698,30 +4744,45 @@ def parse_codecs(codecs_str):
if not codecs_str:
return {}
split_codecs = list(filter(None, map(
lambda str: str.strip(), codecs_str.strip().strip(',').split(','))))
vcodec, acodec = None, None
lambda s: s.strip(), codecs_str.strip().split(','))))
vcodec, acodec, hdr = None, None, None
for full_codec in split_codecs:
codec = full_codec.split('.')[0]
if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora'):
if not vcodec:
vcodec = full_codec
elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
codec, rest = full_codec.partition('.')[::2]
codec = codec.lower()
full_codec = '.'.join((codec, rest)) if rest else codec
codec = re.sub(r'0+(?=\d)', '', codec)
if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
if vcodec:
continue
vcodec = full_codec
if codec in ('dvh1', 'dvhe'):
hdr = 'DV'
elif codec in ('av1', 'vp9'):
n, m = {
'av1': (2, '10'),
'vp9': (0, '2'),
}[codec]
if (rest.split('.', n + 1)[n:] or [''])[0].lstrip('0') == m:
hdr = 'HDR10'
elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-4',
'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
if not acodec:
acodec = full_codec
else:
write_string('WARNING: Unknown codec %s\n' % full_codec, sys.stderr)
if not vcodec and not acodec:
if len(split_codecs) == 2:
return {
'vcodec': split_codecs[0],
'acodec': split_codecs[1],
}
else:
return {
write_string('WARNING: Unknown codec %s\n' % (full_codec,), sys.stderr)
return (
filter_dict({
'vcodec': vcodec or 'none',
'acodec': acodec or 'none',
}
return {}
'dynamic_range': hdr,
}) if vcodec or acodec
else {
'vcodec': split_codecs[0],
'acodec': split_codecs[1],
} if len(split_codecs) == 2
else {})
def urlhandle_detect_ext(url_handle):
@@ -6283,6 +6344,7 @@ def traverse_obj(obj, *paths, **kwargs):
Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- `any`-builtin: Take the first matching object and return it, resetting branching.
- `all`-builtin: Take all matching objects and return them as a list, resetting branching.
- `filter`-builtin: Return the value if it is truthy, `None` otherwise.
`tuple`, `list`, and `dict` all support nested paths and branches.
@@ -6324,6 +6386,11 @@ def traverse_obj(obj, *paths, **kwargs):
# instant compat
str = compat_str
from .compat import (
compat_builtins_dict as dict_, # the basic dict type
compat_dict as dict, # dict preserving imsertion order
)
casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k
if isinstance(expected_type, type):
@@ -6406,7 +6473,7 @@ def traverse_obj(obj, *paths, **kwargs):
if not branching: # string traversal
result = ''.join(result)
elif isinstance(key, dict):
elif isinstance(key, dict_):
iter_obj = ((k, _traverse_obj(obj, v, False, is_last)) for k, v in key.items())
result = dict((k, v if v is not None else default) for k, v in iter_obj
if v is not None or default is not NO_DEFAULT) or None
@@ -6484,7 +6551,7 @@ def traverse_obj(obj, *paths, **kwargs):
has_branched = False
key = None
for last, key in lazy_last(variadic(path, (str, bytes, dict, set))):
for last, key in lazy_last(variadic(path, (str, bytes, dict_, set))):
if not casesense and isinstance(key, str):
key = compat_casefold(key)
@@ -6497,6 +6564,11 @@ def traverse_obj(obj, *paths, **kwargs):
objs = (list(filtered_objs),)
continue
# filter might be from __builtin__, future_builtins, or itertools.ifilter
if key in compat_filter_fns:
objs = filter(None, objs)
continue
if __debug__ and callable(key):
# Verify function signature
_try_bind_args(key, None, None)
@@ -6509,10 +6581,10 @@ def traverse_obj(obj, *paths, **kwargs):
objs = from_iterable(new_objs)
if test_type and not isinstance(key, (dict, list, tuple)):
if test_type and not isinstance(key, (dict_, list, tuple)):
objs = map(type_test, objs)
return objs, has_branched, isinstance(key, dict)
return objs, has_branched, isinstance(key, dict_)
def _traverse_obj(obj, path, allow_empty, test_type):
results, has_branched, is_dict = apply_path(obj, path, test_type)
@@ -6535,6 +6607,76 @@ def traverse_obj(obj, *paths, **kwargs):
return None if default is NO_DEFAULT else default
def value(value):
return lambda _: value
class require(ExtractorError):
def __init__(self, name, expected=False):
super(require, self).__init__(
'Unable to extract {0}'.format(name), expected=expected)
def __call__(self, value):
if value is None:
raise self
return value
@partial_application
# typing: (subs: list[dict], /, *, lang='und', ext=None) -> dict[str, list[dict]
def subs_list_to_dict(subs, lang='und', ext=None):
"""
Convert subtitles from a traversal into a subtitle dict.
The path should have an `all` immediately before this function.
Arguments:
`lang` The default language tag for subtitle dicts with no
`lang` (`und`: undefined)
`ext` The default value for `ext` in the subtitle dicts
In the dict you can set the following additional items:
`id` The language tag to which the subtitle dict should be added
`quality` The sort order for each subtitle dict
"""
result = collections.defaultdict(list)
for sub in subs:
tn_url = url_or_none(sub.pop('url', None))
if tn_url:
sub['url'] = tn_url
elif not sub.get('data'):
continue
sub_lang = sub.pop('id', None)
if not isinstance(sub_lang, compat_str):
if not lang:
continue
sub_lang = lang
sub_ext = sub.get('ext')
if not isinstance(sub_ext, compat_str):
if not ext:
sub.pop('ext', None)
else:
sub['ext'] = ext
result[sub_lang].append(sub)
result = dict(result)
for subs in result.values():
subs.sort(key=lambda x: x.pop('quality', 0) or 0)
return result
def unpack(func, **kwargs):
"""Make a function that applies `partial(func, **kwargs)` to its argument as *args"""
@functools.wraps(func)
def inner(items):
return func(*items, **kwargs)
return inner
def T(*x):
""" For use in yt-dl instead of {type, ...} or set((type, ...)) """
return set(x)