Compare commits

...

7 commits

Author SHA1 Message Date
MoojMidge
6ec507d4b4
Merge pull request #1336 from MoojMidge/nexus-unofficial
Nexus unofficial v7.3.0+beta.10
2025-11-16 10:53:48 +11:00
MoojMidge
9d391ff998 Add ViewManager
- Updated to be more self-contained and work better with unsupported skins
- TODO: Add support for setting default sort order and sort direction

Fix content type not being set to episodes

- Fix #586, #589

Update for restructure of xbmc_plugin

- Only set view mode if directory items successfully added

Fix preselect on view_manager view lists

Update to match new setup wizard

Update for reorganised/renamed constants fca610c

Update for updated XbmcContext.apply_content 8a8247a

Update for new localize and logging methods

Update to fix setting view mode not working if container is still updating

Update to handle sort method and order and workaround #1243

Update to handle localised sort order #1309
2025-11-16 10:50:54 +11:00
MoojMidge
b746b36d89 Version bump v7.3.0+beta.10 2025-11-16 10:50:54 +11:00
MoojMidge
36f1cc6048 Misc optimisations
- Avoid using dir()
- Remove custom url quote methods that are no longer faster than urllib.parse methods in newer Python versions
- Reduce polling intervals when checking if Kodi is busy
- Use custom requests.Session class to avoid creation of unused default https adapter and ssl context
2025-11-16 10:50:30 +11:00
MoojMidge
ef99864c19 Change output and sorting of profiling stats 2025-11-16 08:47:30 +11:00
MoojMidge
bbc2411130 Prune invalid entries from DB when closing connection #1331 2025-11-16 08:47:29 +11:00
MoojMidge
a8fa1606cb Fix regressions with SQLite db operations #1331 2025-11-16 08:47:29 +11:00
15 changed files with 480 additions and 273 deletions

View file

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.youtube" name="YouTube" version="7.3.0+beta.9" provider-name="anxdpanic, bromix, MoojMidge">
<addon id="plugin.video.youtube" name="YouTube" version="7.3.0+beta.10" provider-name="anxdpanic, bromix, MoojMidge">
<requires>
<import addon="xbmc.python" version="3.0.0"/>
<import addon="script.module.requests" version="2.27.1"/>

View file

@ -1,3 +1,8 @@
## v7.3.0+beta.10
### Fixed
- Prune invalid entries from DB when closing connection #1331
- Fix regressions with SQLite db operations #1331
## v7.3.0+beta.9
### Fixed
- Disable label masks being used in Kodi 18 #1327

View file

@ -376,7 +376,7 @@ class AbstractProvider(object):
self.log.warning('Multiple busy dialogs active'
' - Rerouting workaround')
return UriItem('command://{0}'.format(action))
context.sleep(1)
context.sleep(0.1)
else:
context.execute(
action,

View file

@ -15,8 +15,6 @@ __all__ = (
'available_cpu_count',
'byte_string_type',
'datetime_infolabel',
'default_quote',
'default_quote_plus',
'entity_escape',
'generate_hash',
'parse_qs',
@ -120,70 +118,6 @@ try:
for ordinal in range(128, 256)
})
def default_quote(string,
safe='',
encoding=None,
errors=None,
_encoding='utf-8',
_errors='strict',
_reserved=reserved,
_non_ascii=non_ascii,
_encode=str.encode,
_is_ascii=str.isascii,
_replace=str.replace,
_old='\\x',
_new='%',
_slice=slice(2, -1),
_str=str,
_translate=str.translate):
_string = _translate(string, _reserved)
if _is_ascii(_string):
return _string
_string = _str(_encode(_string, _encoding, _errors))[_slice]
if _string == string:
if _is_ascii(_string):
return _string
return _translate(_string, _non_ascii)
if _is_ascii(_string):
return _replace(_string, _old, _new)
return _translate(_replace(_string, _old, _new), _non_ascii)
def default_quote_plus(string,
safe='',
encoding=None,
errors=None,
_encoding='utf-8',
_errors='strict',
_reserved=reserved_plus,
_non_ascii=non_ascii,
_encode=str.encode,
_is_ascii=str.isascii,
_replace=str.replace,
_old='\\x',
_new='%',
_slice=slice(2, -1),
_str=str,
_translate=str.translate):
if (not safe and encoding is None and errors is None
and isinstance(string, str)):
_string = _translate(string, _reserved)
if _is_ascii(_string):
return _string
_string = _str(_encode(_string, _encoding, _errors))[_slice]
if _string == string:
if _is_ascii(_string):
return _string
return _translate(_string, _non_ascii)
if _is_ascii(_string):
return _replace(_string, _old, _new)
return _translate(_replace(_string, _old, _new), _non_ascii)
return quote_plus(string, safe, encoding, errors)
urlencode.__defaults__ = (False, '', None, None, default_quote_plus)
# Compatibility shims for Kodi v18 and Python v2.7
except ImportError:
import cPickle as pickle
@ -220,16 +154,10 @@ except ImportError:
return _quote(to_str(data), *args, **kwargs)
default_quote = quote
def quote_plus(data, *args, **kwargs):
return _quote_plus(to_str(data), *args, **kwargs)
default_quote_plus = quote_plus
def unquote(data):
return _unquote(to_str(data))

View file

@ -14,8 +14,8 @@ import os
from .. import logging
from ..compatibility import (
default_quote,
parse_qsl,
quote,
string_type,
to_str,
unquote,
@ -391,7 +391,7 @@ class AbstractContext(object):
params = urlencode([
(
('%' + param,
','.join([default_quote(item) for item in value]))
','.join([quote(item) for item in value]))
if len(value) > 1 else
(param, value[0])
)
@ -486,7 +486,7 @@ class AbstractContext(object):
return ('/', parts) if include_parts else '/'
if kwargs.get('is_uri'):
path = default_quote(path)
path = quote(path)
return (path, parts) if include_parts else path
def get_path(self):

View file

@ -18,7 +18,7 @@ from cProfile import Profile
from functools import wraps
from inspect import getargvalues
from os.path import normpath
from pstats import Stats
import pstats
from traceback import extract_stack, format_list
from weakref import ref
@ -99,7 +99,6 @@ class Profiler(object):
'_print_callees',
'_profiler',
'_reuse',
'_sort_by',
'_timer',
)
@ -121,14 +120,12 @@ class Profiler(object):
num_lines=20,
print_callees=False,
reuse=False,
sort_by=('cumulative', 'time'),
timer=None):
self._enabled = enabled
self._num_lines = num_lines
self._print_callees = print_callees
self._profiler = None
self._reuse = reuse
self._sort_by = sort_by
self._timer = timer
if enabled and not lazy:
@ -205,8 +202,7 @@ class Profiler(object):
flush=True,
num_lines=20,
print_callees=False,
reuse=False,
sort_by=('cumulative', 'time')):
reuse=False):
if not (self._enabled and self._profiler):
return None
@ -218,10 +214,14 @@ class Profiler(object):
self._profiler,
stream=output_stream
)
stats.strip_dirs().sort_stats(*sort_by)
stats.strip_dirs()
if print_callees:
stats.sort_stats('cumulative')
stats.print_callees(num_lines)
else:
stats.sort_stats('cumpercall')
stats.print_stats(num_lines)
stats.sort_stats('totalpercall')
stats.print_stats(num_lines)
output = output_stream.getvalue()
# Occurs when no stats were able to be generated from profiler
@ -242,7 +242,6 @@ class Profiler(object):
num_lines=self._num_lines,
print_callees=self._print_callees,
reuse=self._reuse,
sort_by=self._sort_by,
),
stacklevel=3)
@ -250,6 +249,82 @@ class Profiler(object):
self.__class__._instances.discard(self)
class Stats(pstats.Stats):
"""
Custom Stats class that adds functionality to sort by
- Cumulative time per call ("cumpercall")
- Total time per call ("totalpercall")
Code by alexnvdias from https://bugs.python.org/issue18795
"""
sort_arg_dict_default = {
"calls" : (((1,-1), ), "call count"),
"ncalls" : (((1,-1), ), "call count"),
"cumtime" : (((4,-1), ), "cumulative time"),
"cumulative" : (((4,-1), ), "cumulative time"),
"filename" : (((6, 1), ), "file name"),
"line" : (((7, 1), ), "line number"),
"module" : (((6, 1), ), "file name"),
"name" : (((8, 1), ), "function name"),
"nfl" : (((8, 1),(6, 1),(7, 1),), "name/file/line"),
"pcalls" : (((0,-1), ), "primitive call count"),
"stdname" : (((9, 1), ), "standard name"),
"time" : (((2,-1), ), "internal time"),
"tottime" : (((2,-1), ), "internal time"),
"cumpercall" : (((5,-1), ), "cumulative time per call"),
"totalpercall": (((3,-1), ), "total time per call"),
}
def sort_stats(self, *field):
if not field:
self.fcn_list = 0
return self
if len(field) == 1 and isinstance(field[0], int):
# Be compatible with old profiler
field = [{-1: "stdname",
0: "calls",
1: "time",
2: "cumulative"}[field[0]]]
elif len(field) >= 2:
for arg in field[1:]:
if type(arg) != type(field[0]):
raise TypeError("Can't have mixed argument type")
sort_arg_defs = self.get_sort_arg_defs()
sort_tuple = ()
self.sort_type = ""
connector = ""
for word in field:
if isinstance(word, pstats.SortKey):
word = word.value
sort_tuple = sort_tuple + sort_arg_defs[word][0]
self.sort_type += connector + sort_arg_defs[word][1]
connector = ", "
stats_list = []
for func, (cc, nc, tt, ct, callers) in self.stats.items():
if nc == 0:
npc = 0
else:
npc = float(tt) / nc
if cc == 0:
cpc = 0
else:
cpc = float(ct) / cc
stats_list.append((cc, nc, tt, npc, ct, cpc) + func +
(pstats.func_std_string(func), func))
stats_list.sort(key=pstats.cmp_to_key(pstats.TupleComp(sort_tuple).compare))
self.fcn_list = fcn_list = []
for tuple in stats_list:
fcn_list.append(tuple[-1])
return self
class ExecTimeout(object):
log = logging.getLogger('__name__')
src_file = None

View file

@ -348,12 +348,15 @@ class _Encoder(json.JSONEncoder):
def encode(self, obj, nested=False):
if isinstance(obj, (date, datetime)):
class_name = obj.__class__.__name__
if 'fromisoformat' in dir(obj):
obj = {
'__class__': class_name,
'__isoformat__': obj.isoformat(),
}
else:
try:
if obj.fromisoformat:
obj = {
'__class__': class_name,
'__isoformat__': obj.isoformat(),
}
else:
raise AttributeError
except AttributeError:
if class_name == 'datetime':
if obj.tzinfo:
format_string = '%Y-%m-%dT%H:%M:%S%z'

View file

@ -11,11 +11,19 @@ from __future__ import absolute_import, division, unicode_literals
import socket
from atexit import register as atexit_register
from collections import OrderedDict
from requests import Request, Session
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import InvalidJSONError, RequestException, URLRequired
from requests.utils import DEFAULT_CA_BUNDLE_PATH, extract_zipped_paths
from requests.hooks import default_hooks
from requests.models import DEFAULT_REDIRECT_LIMIT, Request
from requests.sessions import Session
from requests.utils import (
DEFAULT_CA_BUNDLE_PATH,
cookiejar_from_dict,
default_headers,
extract_zipped_paths,
)
from urllib3.util.ssl_ import create_urllib3_context
from .. import logging
@ -64,21 +72,83 @@ class SSLHTTPAdapter(HTTPAdapter):
return super(SSLHTTPAdapter, self).cert_verify(conn, url, verify, cert)
class CustomSession(Session):
def __init__(self):
#: A case-insensitive dictionary of headers to be sent on each
#: :class:`Request <Request>` sent from this
#: :class:`Session <Session>`.
self.headers = default_headers()
#: Default Authentication tuple or object to attach to
#: :class:`Request <Request>`.
self.auth = None
#: Dictionary mapping protocol or protocol and host to the URL of the proxy
#: (e.g. {'http': 'foo.bar:3128', 'http://host.name': 'foo.bar:4012'}) to
#: be used on each :class:`Request <Request>`.
self.proxies = {}
#: Event-handling hooks.
self.hooks = default_hooks()
#: Dictionary of querystring data to attach to each
#: :class:`Request <Request>`. The dictionary values may be lists for
#: representing multivalued query parameters.
self.params = {}
#: Stream response content default.
self.stream = False
#: SSL Verification default.
#: Defaults to `True`, requiring requests to verify the TLS certificate at the
#: remote end.
#: If verify is set to `False`, requests will accept any TLS certificate
#: presented by the server, and will ignore hostname mismatches and/or
#: expired certificates, which will make your application vulnerable to
#: man-in-the-middle (MitM) attacks.
#: Only set this to `False` for testing.
self.verify = True
#: SSL client certificate default, if String, path to ssl client
#: cert file (.pem). If Tuple, ('cert', 'key') pair.
self.cert = None
#: Maximum number of redirects allowed. If the request exceeds this
#: limit, a :class:`TooManyRedirects` exception is raised.
#: This defaults to requests.models.DEFAULT_REDIRECT_LIMIT, which is
#: 30.
self.max_redirects = DEFAULT_REDIRECT_LIMIT
#: Trust environment settings for proxy configuration, default
#: authentication and similar.
#: CustomSession.trust_env is False
self.trust_env = False
#: A CookieJar containing all currently outstanding cookies set on this
#: session. By default it is a
#: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`, but
#: may be any other ``cookielib.CookieJar`` compatible object.
self.cookies = cookiejar_from_dict({})
# Default connection adapters.
self.adapters = OrderedDict()
self.mount('https://', SSLHTTPAdapter(
pool_maxsize=20,
pool_block=True,
max_retries=Retry(
total=3,
backoff_factor=0.1,
status_forcelist={500, 502, 503, 504},
allowed_methods=None,
)
))
self.mount('http://', HTTPAdapter())
class BaseRequestsClass(object):
log = logging.getLogger(__name__)
_session = Session()
_session.trust_env = False
_session.mount('https://', SSLHTTPAdapter(
pool_maxsize=10,
pool_block=True,
max_retries=Retry(
total=3,
backoff_factor=0.1,
status_forcelist={500, 502, 503, 504},
allowed_methods=None,
)
))
_session = CustomSession()
atexit_register(_session.close)
_context = None

View file

@ -465,7 +465,7 @@ class XbmcPlugin(AbstractPlugin):
@staticmethod
def post_run(context, ui, *actions, **kwargs):
timeout = kwargs.get('timeout', 30)
interval = kwargs.get('interval', 0.1)
interval = kwargs.get('interval', 0.01)
for action in actions:
while not ui.get_container(container_type=None, check_ready=True):
timeout -= interval

View file

@ -18,7 +18,7 @@ class DataCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_memory_store = {}
def __init__(self, filepath, max_file_size_mb=5):
max_file_size_kb = max_file_size_mb * 1024

View file

@ -17,6 +17,8 @@ class FeedHistory(Storage):
_table_updated = False
_sql = {}
_memory_store = {}
def __init__(self, filepath):
super(FeedHistory, self).__init__(filepath)
@ -32,7 +34,7 @@ class FeedHistory(Storage):
return result
def set_items(self, items):
self._set_many(items)
self._set_many(items, defer=True)
def _optimize_item_count(self, limit=-1, defer=False):
return False

View file

@ -22,8 +22,6 @@ class FunctionCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_BUILTIN = str.__module__
SCOPE_NONE = 0
SCOPE_BUILTINS = 1
@ -136,7 +134,7 @@ class FunctionCache(Storage):
if callable(process):
data = process(data, _data)
if data != ignore_value:
self._set(cache_id, data, defer=True)
self._set(cache_id, data)
elif oneshot:
self._remove(cache_id)

View file

@ -18,7 +18,7 @@ class RequestCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_memory_store = {}
def __init__(self, filepath, max_file_size_mb=20):
max_file_size_kb = max_file_size_mb * 1024
@ -36,11 +36,11 @@ class RequestCache(Storage):
if response:
item = (etag, response)
if timestamp:
self._update(request_id, item, timestamp)
self._update(request_id, item, timestamp, defer=True)
else:
self._set(request_id, item, defer=True)
else:
self._refresh(request_id, timestamp)
self._refresh(request_id, timestamp, defer=True)
def _optimize_item_count(self, limit=-1, defer=False):
return False

View file

@ -185,6 +185,11 @@ class Storage(object):
' ) <= {{0}}'
' );'
),
'prune_invalid': (
'DELETE'
' FROM {table}'
' WHERE key IS NULL;'
),
'refresh': (
'UPDATE'
' {table}'
@ -230,6 +235,7 @@ class Storage(object):
self._filepath = os.path.join(*filepath)
self._db = None
self._lock = StorageLock()
self._memory_store = getattr(self.__class__, '_memory_store', None)
self._close_timer = None
self._close_actions = False
self._max_item_count = -1 if migrate else max_item_count
@ -307,10 +313,10 @@ class Storage(object):
self._close_timer = close_timer
def _open(self):
statements = []
table_queries = []
if not os.path.exists(self._filepath):
make_dirs(os.path.dirname(self._filepath))
statements.extend((
table_queries.extend((
self._sql['create_table'],
))
self._base._table_updated = True
@ -336,7 +342,7 @@ class Storage(object):
cursor = db.cursor()
sql_script = [
queries = [
'PRAGMA busy_timeout = 1000;',
'PRAGMA read_uncommitted = TRUE;',
'PRAGMA secure_delete = FALSE;',
@ -356,18 +362,18 @@ class Storage(object):
if not self._table_updated:
for result in self._execute(cursor, self._sql['has_old_table']):
if result[0] == 1:
statements.extend((
table_queries.extend((
'PRAGMA writable_schema = 1;',
self._sql['drop_old_table'],
'PRAGMA writable_schema = 0;',
))
break
if statements:
transaction_begin = len(sql_script) + 1
sql_script.extend(('BEGIN;', 'COMMIT;', 'VACUUM;'))
sql_script[transaction_begin:transaction_begin] = statements
self._execute(cursor, '\n'.join(sql_script), script=True)
if table_queries:
transaction_begin = len(queries) + 1
queries.extend(('BEGIN IMMEDIATE;', 'COMMIT;', 'VACUUM;'))
queries[transaction_begin:transaction_begin] = table_queries
self._execute(cursor, queries)
self._base._table_updated = True
self._db = db
@ -382,20 +388,42 @@ class Storage(object):
return False
db = self._db
if not db and self._close_actions:
db = self._open()
else:
return None
if not db:
if self._close_actions:
db = self._open()
else:
return None
if self._close_actions:
memory_store = getattr(self, 'memory_store', None)
if memory_store:
self._set_many(items=None, memory_store=memory_store)
self._optimize_item_count()
self._optimize_file_size()
self._close_actions = False
self._execute(db.cursor(), 'PRAGMA optimize')
if event or self._close_actions:
if not event:
queries = (
'BEGIN IMMEDIATE;',
self._set_many(items=None, defer=True, flush=True),
self._optimize_item_count(defer=True),
self._optimize_file_size(defer=True),
'COMMIT;',
'VACUUM;',
)
elif self._close_actions:
queries = (
'BEGIN IMMEDIATE;',
self._sql['prune_invalid'],
self._set_many(items=None, defer=True, flush=True),
self._optimize_item_count(defer=True),
self._optimize_file_size(defer=True),
'COMMIT;',
'VACUUM;',
'PRAGMA optimize;',
)
else:
queries = (
'BEGIN IMMEDIATE;',
self._sql['prune_invalid'],
'COMMIT;',
'VACUUM;',
'PRAGMA optimize;',
)
self._execute(db.cursor(), queries)
# Not needed if using db as a context manager
if commit:
@ -404,42 +432,69 @@ class Storage(object):
if event:
db.close()
self._db = None
self._close_actions = False
self._close_timer = None
return True
def _execute(self, cursor, query, values=None, many=False, script=False):
def _execute(self, cursor, queries, values=None, many=False, script=False):
result = []
if not cursor:
self.log.error_trace('Database not available')
return []
if values is None:
values = ()
"""
Tests revealed that sqlite has problems to release the database in time
This happens no so often, but just to be sure, we try at least 3 times
to execute our statement.
"""
for attempt in range(1, 4):
try:
if many:
return cursor.executemany(query, values)
if script:
return cursor.executescript(query)
return cursor.execute(query, values)
except (sqlite3.Error, sqlite3.OperationalError) as exc:
if attempt < 3:
if isinstance(exc, sqlite3.OperationalError):
return result
if isinstance(queries, (list, tuple)):
if script:
queries = ('\n'.join(queries),)
else:
queries = (queries,)
for query in queries:
if not query:
continue
if isinstance(query, tuple):
query, _values, _many = query
else:
_many = many
_values = values or ()
# Retry DB operation 3 times in case DB is locked or busy
abort = False
for attempt in range(1, 4):
try:
if _many:
result = cursor.executemany(query, _values)
elif script:
result = cursor.executescript(query)
else:
result = cursor.execute(query, _values)
break
except (sqlite3.Error, sqlite3.OperationalError) as exc:
if attempt >= 3:
abort = True
elif isinstance(exc, sqlite3.OperationalError):
time.sleep(0.1)
elif isinstance(exc, sqlite3.InterfaceError):
cursor = self._db.cursor()
else:
self.log.exception('Failed')
abort = True
if abort:
self.log.exception(('Failed',
'Query: {query!r}',
'Values: {values!r}'),
attempt=attempt,
query=query,
values=values)
break
self.log.warning_trace('Attempt %d of 3',
attempt,
self.log.warning_trace(('Attempt {attempt} of 3',
'Query: {query!r}',
'Values: {values!r}'),
attempt=attempt,
query=query,
values=values,
exc_info=True)
else:
self.log.exception('Failed')
return []
if abort:
break
return result
def _optimize_file_size(self, defer=False):
# do nothing - optimize only if max size limit has been set
@ -468,13 +523,12 @@ class Storage(object):
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
),
)
return None
@ -497,149 +551,194 @@ class Storage(object):
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
),
)
return None
def _set(self, item_id, item, defer=False, flush=False, memory_store=None):
if memory_store is None:
memory_store = getattr(self, 'memory_store', None)
def _set(self, item_id, item, defer=False, flush=False):
memory_store = self._memory_store
if memory_store is not None:
key = to_str(item_id)
if defer:
memory_store[item_id] = item
memory_store[key] = (
item_id,
since_epoch(),
item,
)
self._close_actions = True
return None
if flush:
memory_store.clear()
return False
if memory_store:
memory_store[item_id] = item
return self._set_many(items=None, memory_store=memory_store)
memory_store[key] = (
item_id,
since_epoch(),
item,
)
return self._set_many(items=None)
values = self._encode(item_id, item)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
'BEGIN IMMEDIATE;',
self._sql['set'],
'COMMIT;',
)),
values,
script=True,
self._sql['set'],
self._encode(item_id, item),
)
self._close_actions = True
return True
def _set_many(self,
items,
flatten=False,
defer=False,
flush=False,
memory_store=None):
if memory_store is None:
memory_store = getattr(self, 'memory_store', None)
def _set_many(self, items, flatten=False, defer=False, flush=False):
memory_store = self._memory_store
if memory_store is not None:
if defer:
memory_store.update(items)
if defer and not flush:
now = since_epoch()
memory_store.update({
to_str(item_id): (
item_id,
now,
item,
)
for item_id, item in items.items()
})
self._close_actions = True
return None
if flush:
if flush and not defer:
memory_store.clear()
return False
if memory_store:
if items:
memory_store.update(items)
items = memory_store
flush = True
now = since_epoch()
num_items = len(items)
values = []
if flatten:
values = [enc_part
for item in items.items()
for enc_part in self._encode(*item, timestamp=now)]
num_item = 0
if items:
values.extend([
part
for item_id, item in items.items()
for part in self._encode(item_id, item, now)
])
num_item += len(items)
if memory_store:
values.extend([
part
for item_id, timestamp, item in memory_store.values()
for part in self._encode(item_id, item, timestamp)
])
num_item += len(memory_store)
query = self._sql['set_flat'].format(
'(?,?,?,?),' * (num_items - 1) + '(?,?,?,?)'
'(?,?,?,?),' * (num_item - 1) + '(?,?,?,?)'
)
many = False
else:
values = [self._encode(*item, timestamp=now)
for item in items.items()]
if items:
values.extend([
self._encode(item_id, item, now)
for item_id, item in items.items()
])
if memory_store:
values.extend([
self._encode(item_id, item, timestamp)
for item_id, timestamp, item in memory_store.values()
])
query = self._sql['set']
many = True
with self as (db, cursor), db:
if flatten:
if flush and memory_store:
memory_store.clear()
if values:
if defer:
return query, values, many
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
)),
values,
script=True,
(query, values, many),
),
)
else:
self._execute(cursor, 'BEGIN IMMEDIATE')
self._execute(cursor, query, many=True, values=values)
self._close_actions = True
self._close_actions = True
return None
if flush:
memory_store.clear()
return True
def _refresh(self, item_id, timestamp=None, defer=False):
key = to_str(item_id)
if not timestamp:
timestamp = since_epoch()
def _refresh(self, item_id, timestamp=None):
values = (timestamp or since_epoch(), to_str(item_id))
memory_store = self._memory_store
if memory_store and key in memory_store:
if defer:
item = memory_store[key]
memory_store[key] = (
item_id,
timestamp,
item[2],
)
self._close_actions = True
return None
del memory_store[key]
values = (timestamp, key)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['refresh'],
'COMMIT;',
)),
values,
script=True,
(self._sql['refresh'], values, False),
),
)
return True
def _update(self, item_id, item, timestamp=None):
def _update(self, item_id, item, timestamp=None, defer=False):
key = to_str(item_id)
if not timestamp:
timestamp = since_epoch()
memory_store = self._memory_store
if memory_store and key in memory_store:
if defer:
memory_store[key] = (
item_id,
timestamp,
item,
)
self._close_actions = True
return None
del memory_store[key]
values = self._encode(item_id, item, timestamp, for_update=True)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['update'],
'COMMIT;',
)),
values,
script=True,
(self._sql['update'], values, False),
),
)
return True
def clear(self, defer=False):
memory_store = self._memory_store
if memory_store:
memory_store.clear()
query = self._sql['clear']
if defer:
return query
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
query,
)
self._close_actions = True
return None
def is_empty(self):
@ -683,11 +782,27 @@ class Storage(object):
seconds=None,
as_dict=False,
with_timestamp=False):
with self as (db, cursor):
result = self._execute(cursor, self._sql['get'], (to_str(item_id),))
item = result.fetchone() if result else None
if not item or not all(item):
return None
key = to_str(item_id)
memory_store = self._memory_store
if memory_store and key in memory_store:
item = memory_store[key]
item = (
item_id,
item[1], # timestamp from memory store item
item[2], # object from memory store item
None,
)
else:
with self as (db, cursor):
result = self._execute(
cursor,
self._sql['get'],
(key,),
)
item = result.fetchone() if result else None
if not item or not all(item):
return None
cut_off = since_epoch() - seconds if seconds else 0
if not cut_off or item[1] >= cut_off:
if as_dict:
@ -705,9 +820,8 @@ class Storage(object):
def _get_by_ids(self, item_ids=None, oldest_first=True, limit=-1,
wildcard=False, seconds=None, process=None,
as_dict=False, values_only=True, excluding=None):
epoch = since_epoch()
cut_off = epoch - seconds if seconds else 0
in_memory_result = None
result = None
if not item_ids:
if oldest_first:
@ -729,29 +843,29 @@ class Storage(object):
)
item_ids = tuple(item_ids) + tuple(excluding)
else:
memory_store = getattr(self, 'memory_store', None)
memory_store = self._memory_store
if memory_store:
in_memory_result = []
_item_ids = []
for key in item_ids:
for item_id in item_ids:
key = to_str(item_id)
if key in memory_store:
item = memory_store[key]
in_memory_result.append((
key,
epoch,
memory_store[key],
item_id,
item[1], # timestamp from memory store item
item[2], # object from memory store item
None,
))
else:
_item_ids.append(key)
_item_ids.append(item_id)
item_ids = _item_ids
else:
in_memory_result = None
if item_ids:
query = self._sql['get_by_key'].format(
'?,' * (len(item_ids) - 1) + '?'
)
item_ids = tuple(item_ids)
item_ids = tuple(map(to_str, item_ids))
else:
query = None
@ -760,14 +874,15 @@ class Storage(object):
result = self._execute(cursor, query, item_ids)
if result:
result = result.fetchall()
else:
result = None
if in_memory_result:
if result:
in_memory_result.extend(result)
result = in_memory_result
now = since_epoch()
cut_off = now - seconds if seconds else 0
if as_dict:
if values_only:
result = {
@ -777,7 +892,7 @@ class Storage(object):
else:
result = {
item[0]: {
'age': epoch - item[1],
'age': now - item[1],
'value': self._decode(item[2], process, item),
}
for item in result if not cut_off or item[1] >= cut_off
@ -797,32 +912,43 @@ class Storage(object):
return result
def _remove(self, item_id):
key = to_str(item_id)
memory_store = self._memory_store
if memory_store and key in memory_store:
del memory_store[key]
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['remove'],
'COMMIT;',
)),
[item_id],
script=True,
(self._sql['remove'], (key,), False),
),
)
self._close_actions = True
return True
def _remove_many(self, item_ids):
memory_store = self._memory_store
if memory_store:
_item_ids = []
for item_id in item_ids:
key = to_str(item_id)
if key in memory_store:
del memory_store[key]
else:
_item_ids.append(item_id)
item_ids = _item_ids
num_ids = len(item_ids)
query = self._sql['remove_by_key'].format('?,' * (num_ids - 1) + '?')
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
tuple(item_ids),
script=True,
(query, tuple(map(to_str, item_ids)), False),
),
)
self._close_actions = True
return True

View file

@ -481,7 +481,7 @@ def process_items_for_playlist(context,
command = playlist_player.play_playlist_item(position,
defer=True)
return UriItem(command)
context.sleep(1)
context.sleep(0.1)
else:
playlist_player.play_playlist_item(position)
return items[position - 1]