Compare commits

...

6 commits

Author SHA1 Message Date
MoojMidge
f5f616ce5d
Merge pull request #1335 from MoojMidge/v7.3
v7.3.0+beta.10
2025-11-16 10:52:46 +11:00
MoojMidge
b746b36d89 Version bump v7.3.0+beta.10 2025-11-16 10:50:54 +11:00
MoojMidge
36f1cc6048 Misc optimisations
- Avoid using dir()
- Remove custom url quote methods that are no longer faster than urllib.parse methods in newer Python versions
- Reduce polling intervals when checking if Kodi is busy
- Use custom requests.Session class to avoid creation of unused default https adapter and ssl context
2025-11-16 10:50:30 +11:00
MoojMidge
ef99864c19 Change output and sorting of profiling stats 2025-11-16 08:47:30 +11:00
MoojMidge
bbc2411130 Prune invalid entries from DB when closing connection #1331 2025-11-16 08:47:29 +11:00
MoojMidge
a8fa1606cb Fix regressions with SQLite db operations #1331 2025-11-16 08:47:29 +11:00
15 changed files with 480 additions and 273 deletions

View file

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.youtube" name="YouTube" version="7.3.0+beta.9" provider-name="anxdpanic, bromix, MoojMidge">
<addon id="plugin.video.youtube" name="YouTube" version="7.3.0+beta.10" provider-name="anxdpanic, bromix, MoojMidge">
<requires>
<import addon="xbmc.python" version="3.0.0"/>
<import addon="script.module.requests" version="2.27.1"/>

View file

@ -1,3 +1,8 @@
## v7.3.0+beta.10
### Fixed
- Prune invalid entries from DB when closing connection #1331
- Fix regressions with SQLite db operations #1331
## v7.3.0+beta.9
### Fixed
- Disable label masks being used in Kodi 18 #1327

View file

@ -375,7 +375,7 @@ class AbstractProvider(object):
self.log.warning('Multiple busy dialogs active'
' - Rerouting workaround')
return UriItem('command://{0}'.format(action))
context.sleep(1)
context.sleep(0.1)
else:
context.execute(
action,

View file

@ -15,8 +15,6 @@ __all__ = (
'available_cpu_count',
'byte_string_type',
'datetime_infolabel',
'default_quote',
'default_quote_plus',
'entity_escape',
'generate_hash',
'parse_qs',
@ -120,70 +118,6 @@ try:
for ordinal in range(128, 256)
})
def default_quote(string,
safe='',
encoding=None,
errors=None,
_encoding='utf-8',
_errors='strict',
_reserved=reserved,
_non_ascii=non_ascii,
_encode=str.encode,
_is_ascii=str.isascii,
_replace=str.replace,
_old='\\x',
_new='%',
_slice=slice(2, -1),
_str=str,
_translate=str.translate):
_string = _translate(string, _reserved)
if _is_ascii(_string):
return _string
_string = _str(_encode(_string, _encoding, _errors))[_slice]
if _string == string:
if _is_ascii(_string):
return _string
return _translate(_string, _non_ascii)
if _is_ascii(_string):
return _replace(_string, _old, _new)
return _translate(_replace(_string, _old, _new), _non_ascii)
def default_quote_plus(string,
safe='',
encoding=None,
errors=None,
_encoding='utf-8',
_errors='strict',
_reserved=reserved_plus,
_non_ascii=non_ascii,
_encode=str.encode,
_is_ascii=str.isascii,
_replace=str.replace,
_old='\\x',
_new='%',
_slice=slice(2, -1),
_str=str,
_translate=str.translate):
if (not safe and encoding is None and errors is None
and isinstance(string, str)):
_string = _translate(string, _reserved)
if _is_ascii(_string):
return _string
_string = _str(_encode(_string, _encoding, _errors))[_slice]
if _string == string:
if _is_ascii(_string):
return _string
return _translate(_string, _non_ascii)
if _is_ascii(_string):
return _replace(_string, _old, _new)
return _translate(_replace(_string, _old, _new), _non_ascii)
return quote_plus(string, safe, encoding, errors)
urlencode.__defaults__ = (False, '', None, None, default_quote_plus)
# Compatibility shims for Kodi v18 and Python v2.7
except ImportError:
import cPickle as pickle
@ -220,16 +154,10 @@ except ImportError:
return _quote(to_str(data), *args, **kwargs)
default_quote = quote
def quote_plus(data, *args, **kwargs):
return _quote_plus(to_str(data), *args, **kwargs)
default_quote_plus = quote_plus
def unquote(data):
return _unquote(to_str(data))

View file

@ -14,8 +14,8 @@ import os
from .. import logging
from ..compatibility import (
default_quote,
parse_qsl,
quote,
string_type,
to_str,
unquote,
@ -387,7 +387,7 @@ class AbstractContext(object):
params = urlencode([
(
('%' + param,
','.join([default_quote(item) for item in value]))
','.join([quote(item) for item in value]))
if len(value) > 1 else
(param, value[0])
)
@ -482,7 +482,7 @@ class AbstractContext(object):
return ('/', parts) if include_parts else '/'
if kwargs.get('is_uri'):
path = default_quote(path)
path = quote(path)
return (path, parts) if include_parts else path
def get_path(self):

View file

@ -18,7 +18,7 @@ from cProfile import Profile
from functools import wraps
from inspect import getargvalues
from os.path import normpath
from pstats import Stats
import pstats
from traceback import extract_stack, format_list
from weakref import ref
@ -99,7 +99,6 @@ class Profiler(object):
'_print_callees',
'_profiler',
'_reuse',
'_sort_by',
'_timer',
)
@ -121,14 +120,12 @@ class Profiler(object):
num_lines=20,
print_callees=False,
reuse=False,
sort_by=('cumulative', 'time'),
timer=None):
self._enabled = enabled
self._num_lines = num_lines
self._print_callees = print_callees
self._profiler = None
self._reuse = reuse
self._sort_by = sort_by
self._timer = timer
if enabled and not lazy:
@ -205,8 +202,7 @@ class Profiler(object):
flush=True,
num_lines=20,
print_callees=False,
reuse=False,
sort_by=('cumulative', 'time')):
reuse=False):
if not (self._enabled and self._profiler):
return None
@ -218,10 +214,14 @@ class Profiler(object):
self._profiler,
stream=output_stream
)
stats.strip_dirs().sort_stats(*sort_by)
stats.strip_dirs()
if print_callees:
stats.sort_stats('cumulative')
stats.print_callees(num_lines)
else:
stats.sort_stats('cumpercall')
stats.print_stats(num_lines)
stats.sort_stats('totalpercall')
stats.print_stats(num_lines)
output = output_stream.getvalue()
# Occurs when no stats were able to be generated from profiler
@ -242,7 +242,6 @@ class Profiler(object):
num_lines=self._num_lines,
print_callees=self._print_callees,
reuse=self._reuse,
sort_by=self._sort_by,
),
stacklevel=3)
@ -250,6 +249,82 @@ class Profiler(object):
self.__class__._instances.discard(self)
class Stats(pstats.Stats):
"""
Custom Stats class that adds functionality to sort by
- Cumulative time per call ("cumpercall")
- Total time per call ("totalpercall")
Code by alexnvdias from https://bugs.python.org/issue18795
"""
sort_arg_dict_default = {
"calls" : (((1,-1), ), "call count"),
"ncalls" : (((1,-1), ), "call count"),
"cumtime" : (((4,-1), ), "cumulative time"),
"cumulative" : (((4,-1), ), "cumulative time"),
"filename" : (((6, 1), ), "file name"),
"line" : (((7, 1), ), "line number"),
"module" : (((6, 1), ), "file name"),
"name" : (((8, 1), ), "function name"),
"nfl" : (((8, 1),(6, 1),(7, 1),), "name/file/line"),
"pcalls" : (((0,-1), ), "primitive call count"),
"stdname" : (((9, 1), ), "standard name"),
"time" : (((2,-1), ), "internal time"),
"tottime" : (((2,-1), ), "internal time"),
"cumpercall" : (((5,-1), ), "cumulative time per call"),
"totalpercall": (((3,-1), ), "total time per call"),
}
def sort_stats(self, *field):
if not field:
self.fcn_list = 0
return self
if len(field) == 1 and isinstance(field[0], int):
# Be compatible with old profiler
field = [{-1: "stdname",
0: "calls",
1: "time",
2: "cumulative"}[field[0]]]
elif len(field) >= 2:
for arg in field[1:]:
if type(arg) != type(field[0]):
raise TypeError("Can't have mixed argument type")
sort_arg_defs = self.get_sort_arg_defs()
sort_tuple = ()
self.sort_type = ""
connector = ""
for word in field:
if isinstance(word, pstats.SortKey):
word = word.value
sort_tuple = sort_tuple + sort_arg_defs[word][0]
self.sort_type += connector + sort_arg_defs[word][1]
connector = ", "
stats_list = []
for func, (cc, nc, tt, ct, callers) in self.stats.items():
if nc == 0:
npc = 0
else:
npc = float(tt) / nc
if cc == 0:
cpc = 0
else:
cpc = float(ct) / cc
stats_list.append((cc, nc, tt, npc, ct, cpc) + func +
(pstats.func_std_string(func), func))
stats_list.sort(key=pstats.cmp_to_key(pstats.TupleComp(sort_tuple).compare))
self.fcn_list = fcn_list = []
for tuple in stats_list:
fcn_list.append(tuple[-1])
return self
class ExecTimeout(object):
log = logging.getLogger('__name__')
src_file = None

View file

@ -348,12 +348,15 @@ class _Encoder(json.JSONEncoder):
def encode(self, obj, nested=False):
if isinstance(obj, (date, datetime)):
class_name = obj.__class__.__name__
if 'fromisoformat' in dir(obj):
obj = {
'__class__': class_name,
'__isoformat__': obj.isoformat(),
}
else:
try:
if obj.fromisoformat:
obj = {
'__class__': class_name,
'__isoformat__': obj.isoformat(),
}
else:
raise AttributeError
except AttributeError:
if class_name == 'datetime':
if obj.tzinfo:
format_string = '%Y-%m-%dT%H:%M:%S%z'

View file

@ -11,11 +11,19 @@ from __future__ import absolute_import, division, unicode_literals
import socket
from atexit import register as atexit_register
from collections import OrderedDict
from requests import Request, Session
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import InvalidJSONError, RequestException, URLRequired
from requests.utils import DEFAULT_CA_BUNDLE_PATH, extract_zipped_paths
from requests.hooks import default_hooks
from requests.models import DEFAULT_REDIRECT_LIMIT, Request
from requests.sessions import Session
from requests.utils import (
DEFAULT_CA_BUNDLE_PATH,
cookiejar_from_dict,
default_headers,
extract_zipped_paths,
)
from urllib3.util.ssl_ import create_urllib3_context
from .. import logging
@ -64,21 +72,83 @@ class SSLHTTPAdapter(HTTPAdapter):
return super(SSLHTTPAdapter, self).cert_verify(conn, url, verify, cert)
class CustomSession(Session):
def __init__(self):
#: A case-insensitive dictionary of headers to be sent on each
#: :class:`Request <Request>` sent from this
#: :class:`Session <Session>`.
self.headers = default_headers()
#: Default Authentication tuple or object to attach to
#: :class:`Request <Request>`.
self.auth = None
#: Dictionary mapping protocol or protocol and host to the URL of the proxy
#: (e.g. {'http': 'foo.bar:3128', 'http://host.name': 'foo.bar:4012'}) to
#: be used on each :class:`Request <Request>`.
self.proxies = {}
#: Event-handling hooks.
self.hooks = default_hooks()
#: Dictionary of querystring data to attach to each
#: :class:`Request <Request>`. The dictionary values may be lists for
#: representing multivalued query parameters.
self.params = {}
#: Stream response content default.
self.stream = False
#: SSL Verification default.
#: Defaults to `True`, requiring requests to verify the TLS certificate at the
#: remote end.
#: If verify is set to `False`, requests will accept any TLS certificate
#: presented by the server, and will ignore hostname mismatches and/or
#: expired certificates, which will make your application vulnerable to
#: man-in-the-middle (MitM) attacks.
#: Only set this to `False` for testing.
self.verify = True
#: SSL client certificate default, if String, path to ssl client
#: cert file (.pem). If Tuple, ('cert', 'key') pair.
self.cert = None
#: Maximum number of redirects allowed. If the request exceeds this
#: limit, a :class:`TooManyRedirects` exception is raised.
#: This defaults to requests.models.DEFAULT_REDIRECT_LIMIT, which is
#: 30.
self.max_redirects = DEFAULT_REDIRECT_LIMIT
#: Trust environment settings for proxy configuration, default
#: authentication and similar.
#: CustomSession.trust_env is False
self.trust_env = False
#: A CookieJar containing all currently outstanding cookies set on this
#: session. By default it is a
#: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`, but
#: may be any other ``cookielib.CookieJar`` compatible object.
self.cookies = cookiejar_from_dict({})
# Default connection adapters.
self.adapters = OrderedDict()
self.mount('https://', SSLHTTPAdapter(
pool_maxsize=20,
pool_block=True,
max_retries=Retry(
total=3,
backoff_factor=0.1,
status_forcelist={500, 502, 503, 504},
allowed_methods=None,
)
))
self.mount('http://', HTTPAdapter())
class BaseRequestsClass(object):
log = logging.getLogger(__name__)
_session = Session()
_session.trust_env = False
_session.mount('https://', SSLHTTPAdapter(
pool_maxsize=10,
pool_block=True,
max_retries=Retry(
total=3,
backoff_factor=0.1,
status_forcelist={500, 502, 503, 504},
allowed_methods=None,
)
))
_session = CustomSession()
atexit_register(_session.close)
_context = None

View file

@ -439,7 +439,7 @@ class XbmcPlugin(AbstractPlugin):
@staticmethod
def post_run(context, ui, *actions, **kwargs):
timeout = kwargs.get('timeout', 30)
interval = kwargs.get('interval', 0.1)
interval = kwargs.get('interval', 0.01)
for action in actions:
while not ui.get_container(container_type=None, check_ready=True):
timeout -= interval

View file

@ -18,7 +18,7 @@ class DataCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_memory_store = {}
def __init__(self, filepath, max_file_size_mb=5):
max_file_size_kb = max_file_size_mb * 1024

View file

@ -17,6 +17,8 @@ class FeedHistory(Storage):
_table_updated = False
_sql = {}
_memory_store = {}
def __init__(self, filepath):
super(FeedHistory, self).__init__(filepath)
@ -32,7 +34,7 @@ class FeedHistory(Storage):
return result
def set_items(self, items):
self._set_many(items)
self._set_many(items, defer=True)
def _optimize_item_count(self, limit=-1, defer=False):
return False

View file

@ -22,8 +22,6 @@ class FunctionCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_BUILTIN = str.__module__
SCOPE_NONE = 0
SCOPE_BUILTINS = 1
@ -136,7 +134,7 @@ class FunctionCache(Storage):
if callable(process):
data = process(data, _data)
if data != ignore_value:
self._set(cache_id, data, defer=True)
self._set(cache_id, data)
elif oneshot:
self._remove(cache_id)

View file

@ -18,7 +18,7 @@ class RequestCache(Storage):
_table_updated = False
_sql = {}
memory_store = {}
_memory_store = {}
def __init__(self, filepath, max_file_size_mb=20):
max_file_size_kb = max_file_size_mb * 1024
@ -36,11 +36,11 @@ class RequestCache(Storage):
if response:
item = (etag, response)
if timestamp:
self._update(request_id, item, timestamp)
self._update(request_id, item, timestamp, defer=True)
else:
self._set(request_id, item, defer=True)
else:
self._refresh(request_id, timestamp)
self._refresh(request_id, timestamp, defer=True)
def _optimize_item_count(self, limit=-1, defer=False):
return False

View file

@ -185,6 +185,11 @@ class Storage(object):
' ) <= {{0}}'
' );'
),
'prune_invalid': (
'DELETE'
' FROM {table}'
' WHERE key IS NULL;'
),
'refresh': (
'UPDATE'
' {table}'
@ -230,6 +235,7 @@ class Storage(object):
self._filepath = os.path.join(*filepath)
self._db = None
self._lock = StorageLock()
self._memory_store = getattr(self.__class__, '_memory_store', None)
self._close_timer = None
self._close_actions = False
self._max_item_count = -1 if migrate else max_item_count
@ -307,10 +313,10 @@ class Storage(object):
self._close_timer = close_timer
def _open(self):
statements = []
table_queries = []
if not os.path.exists(self._filepath):
make_dirs(os.path.dirname(self._filepath))
statements.extend((
table_queries.extend((
self._sql['create_table'],
))
self._base._table_updated = True
@ -336,7 +342,7 @@ class Storage(object):
cursor = db.cursor()
sql_script = [
queries = [
'PRAGMA busy_timeout = 1000;',
'PRAGMA read_uncommitted = TRUE;',
'PRAGMA secure_delete = FALSE;',
@ -356,18 +362,18 @@ class Storage(object):
if not self._table_updated:
for result in self._execute(cursor, self._sql['has_old_table']):
if result[0] == 1:
statements.extend((
table_queries.extend((
'PRAGMA writable_schema = 1;',
self._sql['drop_old_table'],
'PRAGMA writable_schema = 0;',
))
break
if statements:
transaction_begin = len(sql_script) + 1
sql_script.extend(('BEGIN;', 'COMMIT;', 'VACUUM;'))
sql_script[transaction_begin:transaction_begin] = statements
self._execute(cursor, '\n'.join(sql_script), script=True)
if table_queries:
transaction_begin = len(queries) + 1
queries.extend(('BEGIN IMMEDIATE;', 'COMMIT;', 'VACUUM;'))
queries[transaction_begin:transaction_begin] = table_queries
self._execute(cursor, queries)
self._base._table_updated = True
self._db = db
@ -382,20 +388,42 @@ class Storage(object):
return False
db = self._db
if not db and self._close_actions:
db = self._open()
else:
return None
if not db:
if self._close_actions:
db = self._open()
else:
return None
if self._close_actions:
memory_store = getattr(self, 'memory_store', None)
if memory_store:
self._set_many(items=None, memory_store=memory_store)
self._optimize_item_count()
self._optimize_file_size()
self._close_actions = False
self._execute(db.cursor(), 'PRAGMA optimize')
if event or self._close_actions:
if not event:
queries = (
'BEGIN IMMEDIATE;',
self._set_many(items=None, defer=True, flush=True),
self._optimize_item_count(defer=True),
self._optimize_file_size(defer=True),
'COMMIT;',
'VACUUM;',
)
elif self._close_actions:
queries = (
'BEGIN IMMEDIATE;',
self._sql['prune_invalid'],
self._set_many(items=None, defer=True, flush=True),
self._optimize_item_count(defer=True),
self._optimize_file_size(defer=True),
'COMMIT;',
'VACUUM;',
'PRAGMA optimize;',
)
else:
queries = (
'BEGIN IMMEDIATE;',
self._sql['prune_invalid'],
'COMMIT;',
'VACUUM;',
'PRAGMA optimize;',
)
self._execute(db.cursor(), queries)
# Not needed if using db as a context manager
if commit:
@ -404,42 +432,69 @@ class Storage(object):
if event:
db.close()
self._db = None
self._close_actions = False
self._close_timer = None
return True
def _execute(self, cursor, query, values=None, many=False, script=False):
def _execute(self, cursor, queries, values=None, many=False, script=False):
result = []
if not cursor:
self.log.error_trace('Database not available')
return []
if values is None:
values = ()
"""
Tests revealed that sqlite has problems to release the database in time
This happens no so often, but just to be sure, we try at least 3 times
to execute our statement.
"""
for attempt in range(1, 4):
try:
if many:
return cursor.executemany(query, values)
if script:
return cursor.executescript(query)
return cursor.execute(query, values)
except (sqlite3.Error, sqlite3.OperationalError) as exc:
if attempt < 3:
if isinstance(exc, sqlite3.OperationalError):
return result
if isinstance(queries, (list, tuple)):
if script:
queries = ('\n'.join(queries),)
else:
queries = (queries,)
for query in queries:
if not query:
continue
if isinstance(query, tuple):
query, _values, _many = query
else:
_many = many
_values = values or ()
# Retry DB operation 3 times in case DB is locked or busy
abort = False
for attempt in range(1, 4):
try:
if _many:
result = cursor.executemany(query, _values)
elif script:
result = cursor.executescript(query)
else:
result = cursor.execute(query, _values)
break
except (sqlite3.Error, sqlite3.OperationalError) as exc:
if attempt >= 3:
abort = True
elif isinstance(exc, sqlite3.OperationalError):
time.sleep(0.1)
elif isinstance(exc, sqlite3.InterfaceError):
cursor = self._db.cursor()
else:
self.log.exception('Failed')
abort = True
if abort:
self.log.exception(('Failed',
'Query: {query!r}',
'Values: {values!r}'),
attempt=attempt,
query=query,
values=values)
break
self.log.warning_trace('Attempt %d of 3',
attempt,
self.log.warning_trace(('Attempt {attempt} of 3',
'Query: {query!r}',
'Values: {values!r}'),
attempt=attempt,
query=query,
values=values,
exc_info=True)
else:
self.log.exception('Failed')
return []
if abort:
break
return result
def _optimize_file_size(self, defer=False):
# do nothing - optimize only if max size limit has been set
@ -468,13 +523,12 @@ class Storage(object):
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
),
)
return None
@ -497,149 +551,194 @@ class Storage(object):
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
),
)
return None
def _set(self, item_id, item, defer=False, flush=False, memory_store=None):
if memory_store is None:
memory_store = getattr(self, 'memory_store', None)
def _set(self, item_id, item, defer=False, flush=False):
memory_store = self._memory_store
if memory_store is not None:
key = to_str(item_id)
if defer:
memory_store[item_id] = item
memory_store[key] = (
item_id,
since_epoch(),
item,
)
self._close_actions = True
return None
if flush:
memory_store.clear()
return False
if memory_store:
memory_store[item_id] = item
return self._set_many(items=None, memory_store=memory_store)
memory_store[key] = (
item_id,
since_epoch(),
item,
)
return self._set_many(items=None)
values = self._encode(item_id, item)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
'BEGIN IMMEDIATE;',
self._sql['set'],
'COMMIT;',
)),
values,
script=True,
self._sql['set'],
self._encode(item_id, item),
)
self._close_actions = True
return True
def _set_many(self,
items,
flatten=False,
defer=False,
flush=False,
memory_store=None):
if memory_store is None:
memory_store = getattr(self, 'memory_store', None)
def _set_many(self, items, flatten=False, defer=False, flush=False):
memory_store = self._memory_store
if memory_store is not None:
if defer:
memory_store.update(items)
if defer and not flush:
now = since_epoch()
memory_store.update({
to_str(item_id): (
item_id,
now,
item,
)
for item_id, item in items.items()
})
self._close_actions = True
return None
if flush:
if flush and not defer:
memory_store.clear()
return False
if memory_store:
if items:
memory_store.update(items)
items = memory_store
flush = True
now = since_epoch()
num_items = len(items)
values = []
if flatten:
values = [enc_part
for item in items.items()
for enc_part in self._encode(*item, timestamp=now)]
num_item = 0
if items:
values.extend([
part
for item_id, item in items.items()
for part in self._encode(item_id, item, now)
])
num_item += len(items)
if memory_store:
values.extend([
part
for item_id, timestamp, item in memory_store.values()
for part in self._encode(item_id, item, timestamp)
])
num_item += len(memory_store)
query = self._sql['set_flat'].format(
'(?,?,?,?),' * (num_items - 1) + '(?,?,?,?)'
'(?,?,?,?),' * (num_item - 1) + '(?,?,?,?)'
)
many = False
else:
values = [self._encode(*item, timestamp=now)
for item in items.items()]
if items:
values.extend([
self._encode(item_id, item, now)
for item_id, item in items.items()
])
if memory_store:
values.extend([
self._encode(item_id, item, timestamp)
for item_id, timestamp, item in memory_store.values()
])
query = self._sql['set']
many = True
with self as (db, cursor), db:
if flatten:
if flush and memory_store:
memory_store.clear()
if values:
if defer:
return query, values, many
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
)),
values,
script=True,
(query, values, many),
),
)
else:
self._execute(cursor, 'BEGIN IMMEDIATE')
self._execute(cursor, query, many=True, values=values)
self._close_actions = True
self._close_actions = True
return None
if flush:
memory_store.clear()
return True
def _refresh(self, item_id, timestamp=None, defer=False):
key = to_str(item_id)
if not timestamp:
timestamp = since_epoch()
def _refresh(self, item_id, timestamp=None):
values = (timestamp or since_epoch(), to_str(item_id))
memory_store = self._memory_store
if memory_store and key in memory_store:
if defer:
item = memory_store[key]
memory_store[key] = (
item_id,
timestamp,
item[2],
)
self._close_actions = True
return None
del memory_store[key]
values = (timestamp, key)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['refresh'],
'COMMIT;',
)),
values,
script=True,
(self._sql['refresh'], values, False),
),
)
return True
def _update(self, item_id, item, timestamp=None):
def _update(self, item_id, item, timestamp=None, defer=False):
key = to_str(item_id)
if not timestamp:
timestamp = since_epoch()
memory_store = self._memory_store
if memory_store and key in memory_store:
if defer:
memory_store[key] = (
item_id,
timestamp,
item,
)
self._close_actions = True
return None
del memory_store[key]
values = self._encode(item_id, item, timestamp, for_update=True)
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['update'],
'COMMIT;',
)),
values,
script=True,
(self._sql['update'], values, False),
),
)
return True
def clear(self, defer=False):
memory_store = self._memory_store
if memory_store:
memory_store.clear()
query = self._sql['clear']
if defer:
return query
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
script=True,
query,
)
self._close_actions = True
return None
def is_empty(self):
@ -683,11 +782,27 @@ class Storage(object):
seconds=None,
as_dict=False,
with_timestamp=False):
with self as (db, cursor):
result = self._execute(cursor, self._sql['get'], (to_str(item_id),))
item = result.fetchone() if result else None
if not item or not all(item):
return None
key = to_str(item_id)
memory_store = self._memory_store
if memory_store and key in memory_store:
item = memory_store[key]
item = (
item_id,
item[1], # timestamp from memory store item
item[2], # object from memory store item
None,
)
else:
with self as (db, cursor):
result = self._execute(
cursor,
self._sql['get'],
(key,),
)
item = result.fetchone() if result else None
if not item or not all(item):
return None
cut_off = since_epoch() - seconds if seconds else 0
if not cut_off or item[1] >= cut_off:
if as_dict:
@ -705,9 +820,8 @@ class Storage(object):
def _get_by_ids(self, item_ids=None, oldest_first=True, limit=-1,
wildcard=False, seconds=None, process=None,
as_dict=False, values_only=True, excluding=None):
epoch = since_epoch()
cut_off = epoch - seconds if seconds else 0
in_memory_result = None
result = None
if not item_ids:
if oldest_first:
@ -729,29 +843,29 @@ class Storage(object):
)
item_ids = tuple(item_ids) + tuple(excluding)
else:
memory_store = getattr(self, 'memory_store', None)
memory_store = self._memory_store
if memory_store:
in_memory_result = []
_item_ids = []
for key in item_ids:
for item_id in item_ids:
key = to_str(item_id)
if key in memory_store:
item = memory_store[key]
in_memory_result.append((
key,
epoch,
memory_store[key],
item_id,
item[1], # timestamp from memory store item
item[2], # object from memory store item
None,
))
else:
_item_ids.append(key)
_item_ids.append(item_id)
item_ids = _item_ids
else:
in_memory_result = None
if item_ids:
query = self._sql['get_by_key'].format(
'?,' * (len(item_ids) - 1) + '?'
)
item_ids = tuple(item_ids)
item_ids = tuple(map(to_str, item_ids))
else:
query = None
@ -760,14 +874,15 @@ class Storage(object):
result = self._execute(cursor, query, item_ids)
if result:
result = result.fetchall()
else:
result = None
if in_memory_result:
if result:
in_memory_result.extend(result)
result = in_memory_result
now = since_epoch()
cut_off = now - seconds if seconds else 0
if as_dict:
if values_only:
result = {
@ -777,7 +892,7 @@ class Storage(object):
else:
result = {
item[0]: {
'age': epoch - item[1],
'age': now - item[1],
'value': self._decode(item[2], process, item),
}
for item in result if not cut_off or item[1] >= cut_off
@ -797,32 +912,43 @@ class Storage(object):
return result
def _remove(self, item_id):
key = to_str(item_id)
memory_store = self._memory_store
if memory_store and key in memory_store:
del memory_store[key]
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
self._sql['remove'],
'COMMIT;',
)),
[item_id],
script=True,
(self._sql['remove'], (key,), False),
),
)
self._close_actions = True
return True
def _remove_many(self, item_ids):
memory_store = self._memory_store
if memory_store:
_item_ids = []
for item_id in item_ids:
key = to_str(item_id)
if key in memory_store:
del memory_store[key]
else:
_item_ids.append(item_id)
item_ids = _item_ids
num_ids = len(item_ids)
query = self._sql['remove_by_key'].format('?,' * (num_ids - 1) + '?')
with self as (db, cursor), db:
self._execute(
cursor,
'\n'.join((
(
'BEGIN IMMEDIATE;',
query,
'COMMIT;',
'VACUUM;',
)),
tuple(item_ids),
script=True,
(query, tuple(map(to_str, item_ids)), False),
),
)
self._close_actions = True
return True

View file

@ -481,7 +481,7 @@ def process_items_for_playlist(context,
command = playlist_player.play_playlist_item(position,
defer=True)
return UriItem(command)
context.sleep(1)
context.sleep(0.1)
else:
playlist_player.play_playlist_item(position)
return items[position - 1]