From 0e8ec661c8eff027653d1653bee20df7e06d62a2 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Sat, 11 Apr 2026 13:42:44 -0700 Subject: [PATCH] make sure subscribe pulls stay running --- nonfree/controller/PubSubListener.cpp | 38 ++++++++++++++++++++------- nonfree/controller/PubSubListener.hpp | 1 + 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index c8ddcebc4..e3b02f1f9 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -88,10 +88,11 @@ void PubSubListener::subscribe() while (_run) { try { fprintf(stderr, "PubSubListener::subscribe: starting session for subscription %s\n", _subscription_id.c_str()); + _lastMessageTime.store(std::chrono::steady_clock::now()); + auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { + _lastMessageTime.store(std::chrono::steady_clock::now()); try { - fprintf(stderr, "PubSubListener callback invoked: subscription=%s message_id=%s ordering_key=%s\n", - _subscription_id.c_str(), m.message_id().c_str(), m.ordering_key().c_str()); auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("PubSubListener"); @@ -143,19 +144,38 @@ void PubSubListener::subscribe() _hasSession = true; } - // Block until the session ends (server disconnect, error, or - // cancel from destructor). Do NOT cancel on a timer — cancelling - // races with in-flight acks and silently drops messages, - // especially with ordering enabled. - auto status = _session.get(); + // Poll the session with a timeout. If no messages have been + // received for 60 seconds, assume the pull stream is frozen + // and cancel so we can reconnect. This avoids the old + // 10-second blind cancel that raced with in-flight acks. + while (_run) { + auto result = _session.wait_for(std::chrono::seconds(15)); + if (result == std::future_status::ready) { + break; // session ended naturally + } + auto idle = std::chrono::steady_clock::now() - _lastMessageTime.load(); + if (idle > std::chrono::seconds(60)) { + fprintf(stderr, "PubSubListener: no messages for 60s on %s, reconnecting\n", + _subscription_id.c_str()); + _session.cancel(); + break; + } + } { std::lock_guard lock(_sessionMutex); _hasSession = false; } - if (! status.ok()) { - fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str()); + // Retrieve the session status (blocks until cancel or natural end completes) + try { + auto status = _session.get(); + if (! status.ok()) { + fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str()); + } + } + catch (...) { + fprintf(stderr, "Subscription session ended with exception on %s\n", _subscription_id.c_str()); } } catch (google::cloud::Status const& status) { diff --git a/nonfree/controller/PubSubListener.hpp b/nonfree/controller/PubSubListener.hpp index ddde76423..026e956b8 100644 --- a/nonfree/controller/PubSubListener.hpp +++ b/nonfree/controller/PubSubListener.hpp @@ -39,6 +39,7 @@ class PubSubListener : public NotificationListener { std::mutex _sessionMutex; google::cloud::future _session; bool _hasSession = false; + std::atomic _lastMessageTime; google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient; google::cloud::pubsub::Subscription* _subscription; std::shared_ptr _subscriber;