make sure subscribe pulls stay running

This commit is contained in:
Grant Limberg 2026-04-11 13:42:44 -07:00
parent d2361a9a66
commit 0e8ec661c8
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735
2 changed files with 30 additions and 9 deletions

View file

@ -88,10 +88,11 @@ void PubSubListener::subscribe()
while (_run) {
try {
fprintf(stderr, "PubSubListener::subscribe: starting session for subscription %s\n", _subscription_id.c_str());
_lastMessageTime.store(std::chrono::steady_clock::now());
auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
_lastMessageTime.store(std::chrono::steady_clock::now());
try {
fprintf(stderr, "PubSubListener callback invoked: subscription=%s message_id=%s ordering_key=%s\n",
_subscription_id.c_str(), m.message_id().c_str(), m.ordering_key().c_str());
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("PubSubListener");
@ -143,19 +144,38 @@ void PubSubListener::subscribe()
_hasSession = true;
}
// Block until the session ends (server disconnect, error, or
// cancel from destructor). Do NOT cancel on a timer — cancelling
// races with in-flight acks and silently drops messages,
// especially with ordering enabled.
auto status = _session.get();
// Poll the session with a timeout. If no messages have been
// received for 60 seconds, assume the pull stream is frozen
// and cancel so we can reconnect. This avoids the old
// 10-second blind cancel that raced with in-flight acks.
while (_run) {
auto result = _session.wait_for(std::chrono::seconds(15));
if (result == std::future_status::ready) {
break; // session ended naturally
}
auto idle = std::chrono::steady_clock::now() - _lastMessageTime.load();
if (idle > std::chrono::seconds(60)) {
fprintf(stderr, "PubSubListener: no messages for 60s on %s, reconnecting\n",
_subscription_id.c_str());
_session.cancel();
break;
}
}
{
std::lock_guard<std::mutex> lock(_sessionMutex);
_hasSession = false;
}
if (! status.ok()) {
fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str());
// Retrieve the session status (blocks until cancel or natural end completes)
try {
auto status = _session.get();
if (! status.ok()) {
fprintf(stderr, "Subscription session ended: %s\n", status.message().c_str());
}
}
catch (...) {
fprintf(stderr, "Subscription session ended with exception on %s\n", _subscription_id.c_str());
}
}
catch (google::cloud::Status const& status) {

View file

@ -39,6 +39,7 @@ class PubSubListener : public NotificationListener {
std::mutex _sessionMutex;
google::cloud::future<google::cloud::Status> _session;
bool _hasSession = false;
std::atomic<std::chrono::steady_clock::time_point> _lastMessageTime;
google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient;
google::cloud::pubsub::Subscription* _subscription;
std::shared_ptr<google::cloud::pubsub::Subscriber> _subscriber;