Remove possibly invalid access token if an authentication error occurs

- Allows user to sign-in again and refresh missing access tokens without having to sign-out first
This commit is contained in:
MoojMidge 2025-11-08 12:44:26 +11:00
parent 419c37ddd1
commit a710a77576
3 changed files with 46 additions and 13 deletions

View file

@ -2956,22 +2956,34 @@ class YouTubeDataClient(YouTubeLoginClient):
message = strip_html_from_text(details.get('message', 'Unknown error'))
if getattr(exc, 'notify', True):
context = self._context
ok_dialog = False
if reason in {'accessNotConfigured', 'forbidden'}:
notification = self._context.localize('key.requirement')
notification = context.localize('key.requirement')
ok_dialog = True
elif reason == 'keyInvalid' and message == 'Bad Request':
notification = self._context.localize('api.key.incorrect')
notification = context.localize('api.key.incorrect')
elif reason in {'quotaExceeded', 'dailyLimitExceeded'}:
notification = message
elif reason == 'authError':
auth_type = kwargs.get('_auth_type')
if auth_type:
if auth_type in self._access_tokens:
self._access_tokens[auth_type] = None
self.set_access_token(self._access_tokens)
context.get_access_manager().update_access_token(
context.get_param('addon_id'),
access_token=self.convert_access_tokens(to_list=True),
)
notification = message
else:
notification = message
title = ': '.join((self._context.get_name(), reason))
title = ': '.join((context.get_name(), reason))
if ok_dialog:
self._context.get_ui().on_ok(title, notification)
context.get_ui().on_ok(title, notification)
else:
self._context.get_ui().show_notification(notification, title)
context.get_ui().show_notification(notification, title)
info = (
'Reason: {error_reason}',

View file

@ -72,9 +72,37 @@ class YouTubeLoginClient(YouTubeRequestClient):
def reinit(self, **kwargs):
super(YouTubeLoginClient, self).reinit(**kwargs)
@classmethod
def convert_access_tokens(cls,
access_tokens=None,
to_dict=False,
to_list=False):
if access_tokens is None:
access_tokens = cls._access_tokens
if to_dict or isinstance(access_tokens, (list, tuple)):
access_tokens = {
cls.TOKEN_TYPES[token_idx]: token
for token_idx, token in enumerate(access_tokens)
if token and token_idx in cls.TOKEN_TYPES
}
elif to_list or isinstance(access_tokens, dict):
_access_tokens = [None, None, None, None]
for token_type, token in access_tokens.items():
token_idx = cls.TOKEN_TYPES.get(token_type)
if token_idx is None:
continue
_access_tokens[token_idx] = token
access_tokens = _access_tokens
return access_tokens
def set_access_token(self, access_tokens=None):
existing_access_tokens = type(self)._access_tokens
if access_tokens:
if isinstance(access_tokens, (list, tuple)):
access_tokens = self.convert_access_tokens(
access_tokens,
to_dict=True,
)
token_status = 0
for token_type, token in existing_access_tokens.items():
if token_type in access_tokens:

View file

@ -313,14 +313,7 @@ class Provider(AbstractProvider):
access_token='',
refresh_token=refresh_token,
)
client.set_access_token({
client.TOKEN_TYPES[idx]: token
for idx, token in enumerate(access_tokens)
if token
})
client.set_access_token(access_tokens)
return client
def get_resource_manager(self, context, progress_dialog=None):