diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 2e777c9be..1313310bf 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -67,6 +67,12 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s PubSubListener::~PubSubListener() { _run = false; + { + std::lock_guard lock(_sessionMutex); + if (_hasSession) { + _session.cancel(); + } + } if (_subscriberThread.joinable()) { _subscriberThread.join(); } @@ -121,15 +127,25 @@ void PubSubListener::subscribe() } }); - auto result = session.wait_for(std::chrono::seconds(10)); - if (result == std::future_status::timeout) { - session.cancel(); - continue; + { + std::lock_guard lock(_sessionMutex); + _session = std::move(session); + _hasSession = true; } - if (! session.valid()) { - fprintf(stderr, "Subscription session no longer valid\n"); - continue; + // Block until the session ends (server disconnect, error, or + // cancel from destructor). Do NOT cancel on a timer — cancelling + // races with in-flight acks and silently drops messages, + // especially with ordering enabled. + auto status = _session.get(); + + { + std::lock_guard lock(_sessionMutex); + _hasSession = false; + } + + if (! status.ok()) { + fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str()); } } catch (google::cloud::Status const& status) { diff --git a/nonfree/controller/PubSubListener.hpp b/nonfree/controller/PubSubListener.hpp index 3fe52543b..ddde76423 100644 --- a/nonfree/controller/PubSubListener.hpp +++ b/nonfree/controller/PubSubListener.hpp @@ -36,6 +36,9 @@ class PubSubListener : public NotificationListener { private: void subscribe(); bool _run = false; + std::mutex _sessionMutex; + google::cloud::future _session; + bool _hasSession = false; google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient; google::cloud::pubsub::Subscription* _subscription; std::shared_ptr _subscriber;