diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index 77f96c6e7..8df064c8c 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -1071,6 +1071,7 @@ void CentralDB::heartbeat() } catch (std::exception& e) { fprintf(stderr, "%s: Heartbeat update failed: %s\n", controllerId, e.what()); + _pool->unborrow(c); span->End(); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); continue; @@ -1532,6 +1533,7 @@ void CentralDB::onlineNotificationThread() auto span = tracer->StartSpan("CentralDB::onlineNotificationThread"); auto scope = tracer->WithActiveSpan(span); + std::shared_ptr c; try { std::unordered_map, NodeOnlineRecord, _PairHasher> lastOnline; { @@ -1540,7 +1542,7 @@ void CentralDB::onlineNotificationThread() } uint64_t writtenCount = 0; - auto c = _pool->borrow(); + c = _pool->borrow(); pqxx::work w(*c->c); for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) { uint64_t nwid_i = i->first.first; @@ -1610,11 +1612,13 @@ void CentralDB::onlineNotificationThread() (unsigned long long)lastOnline.size(), (unsigned long long)writtenCount); _statusWriter->writePending(); w.commit(); - _pool->unborrow(c); } catch (std::exception& e) { fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } + if (c) { + _pool->unborrow(c); + } } std::this_thread::sleep_for(std::chrono::seconds(10)); diff --git a/nonfree/controller/EmbeddedNetworkController.cpp b/nonfree/controller/EmbeddedNetworkController.cpp index 48134c036..f0213d27a 100644 --- a/nonfree/controller/EmbeddedNetworkController.cpp +++ b/nonfree/controller/EmbeddedNetworkController.cpp @@ -2462,19 +2462,24 @@ void EmbeddedNetworkController::_startThreads() } const long hwc = std::max((long)std::thread::hardware_concurrency(), (long)1); for (long t = 0; t < hwc; ++t) { - _threads.emplace_back([this]() { + _threads.emplace_back([this, t]() { Metrics::network_config_request_threads++; + uint64_t processedCount = 0; + uint64_t idleIterations = 0; for (;;) { _RQEntry* qe = (_RQEntry*)0; - Metrics::network_config_request_queue_size = _queue.size(); + auto queueSize = _queue.size(); + Metrics::network_config_request_queue_size = queueSize; auto timedWaitResult = _queue.get(qe, 1000); if (timedWaitResult == BlockingQueue<_RQEntry*>::STOP) { break; } else if (timedWaitResult == BlockingQueue<_RQEntry*>::OK) { + idleIterations = 0; if (qe) { try { _request(qe->nwid, qe->fromAddr, qe->requestPacketId, qe->identity, qe->metaData); + processedCount++; } catch (std::exception& e) { fprintf( @@ -2490,6 +2495,13 @@ void EmbeddedNetworkController::_startThreads() qe = nullptr; } } + else { + // Periodic liveness log every ~30s (30 timeout iterations of 1000ms) + if (++idleIterations % 30 == 0) { + fprintf(stderr, "request worker %ld: alive, queue_size=%lu, total_processed=%llu\n", + t, (unsigned long)queueSize, (unsigned long long)processedCount); + } + } } Metrics::network_config_request_threads--; });