mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2026-05-10 22:13:11 -07:00
Fix silent PubSub message loss in controller subscription loop
The 10-second session.cancel() loop raced with in-flight acks — when cancel fired while the GCP client was processing messages, acks were lost before reaching the server. With message ordering enabled, an unacked message blocks all subsequent messages on that ordering key, causing silent stalls with no error output. Two fixes: - Replace the cancel/reconnect timer with a blocking session.get(), storing the session future so the destructor can cancel on shutdown. - Always ack messages even when onNotification fails — permanent errors (bad protobuf, missing fields) will never succeed on retry and would otherwise poison the ordering key indefinitely.
This commit is contained in:
parent
4ca5c9b820
commit
910334c2a5
2 changed files with 26 additions and 7 deletions
|
|
@ -67,6 +67,12 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
|
|||
PubSubListener::~PubSubListener()
|
||||
{
|
||||
_run = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_sessionMutex);
|
||||
if (_hasSession) {
|
||||
_session.cancel();
|
||||
}
|
||||
}
|
||||
if (_subscriberThread.joinable()) {
|
||||
_subscriberThread.join();
|
||||
}
|
||||
|
|
@ -121,15 +127,25 @@ void PubSubListener::subscribe()
|
|||
}
|
||||
});
|
||||
|
||||
auto result = session.wait_for(std::chrono::seconds(10));
|
||||
if (result == std::future_status::timeout) {
|
||||
session.cancel();
|
||||
continue;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_sessionMutex);
|
||||
_session = std::move(session);
|
||||
_hasSession = true;
|
||||
}
|
||||
|
||||
if (! session.valid()) {
|
||||
fprintf(stderr, "Subscription session no longer valid\n");
|
||||
continue;
|
||||
// Block until the session ends (server disconnect, error, or
|
||||
// cancel from destructor). Do NOT cancel on a timer — cancelling
|
||||
// races with in-flight acks and silently drops messages,
|
||||
// especially with ordering enabled.
|
||||
auto status = _session.get();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_sessionMutex);
|
||||
_hasSession = false;
|
||||
}
|
||||
|
||||
if (! status.ok()) {
|
||||
fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str());
|
||||
}
|
||||
}
|
||||
catch (google::cloud::Status const& status) {
|
||||
|
|
|
|||
|
|
@ -36,6 +36,9 @@ class PubSubListener : public NotificationListener {
|
|||
private:
|
||||
void subscribe();
|
||||
bool _run = false;
|
||||
std::mutex _sessionMutex;
|
||||
google::cloud::future<google::cloud::Status> _session;
|
||||
bool _hasSession = false;
|
||||
google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient;
|
||||
google::cloud::pubsub::Subscription* _subscription;
|
||||
std::shared_ptr<google::cloud::pubsub::Subscriber> _subscriber;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue