periodic queue size logging, and fix some db connection leaks

This commit is contained in:
Grant Limberg 2026-04-03 10:28:28 -07:00
parent af7eae5d9e
commit 1f3a04f303
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735
2 changed files with 20 additions and 4 deletions

View file

@ -1071,6 +1071,7 @@ void CentralDB::heartbeat()
}
catch (std::exception& e) {
fprintf(stderr, "%s: Heartbeat update failed: %s\n", controllerId, e.what());
_pool->unborrow(c);
span->End();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
continue;
@ -1532,6 +1533,7 @@ void CentralDB::onlineNotificationThread()
auto span = tracer->StartSpan("CentralDB::onlineNotificationThread");
auto scope = tracer->WithActiveSpan(span);
std::shared_ptr<PostgresConnection> c;
try {
std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher> lastOnline;
{
@ -1540,7 +1542,7 @@ void CentralDB::onlineNotificationThread()
}
uint64_t writtenCount = 0;
auto c = _pool->borrow();
c = _pool->borrow();
pqxx::work w(*c->c);
for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first;
@ -1610,11 +1612,13 @@ void CentralDB::onlineNotificationThread()
(unsigned long long)lastOnline.size(), (unsigned long long)writtenCount);
_statusWriter->writePending();
w.commit();
_pool->unborrow(c);
}
catch (std::exception& e) {
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
}
if (c) {
_pool->unborrow(c);
}
}
std::this_thread::sleep_for(std::chrono::seconds(10));

View file

@ -2462,19 +2462,24 @@ void EmbeddedNetworkController::_startThreads()
}
const long hwc = std::max((long)std::thread::hardware_concurrency(), (long)1);
for (long t = 0; t < hwc; ++t) {
_threads.emplace_back([this]() {
_threads.emplace_back([this, t]() {
Metrics::network_config_request_threads++;
uint64_t processedCount = 0;
uint64_t idleIterations = 0;
for (;;) {
_RQEntry* qe = (_RQEntry*)0;
Metrics::network_config_request_queue_size = _queue.size();
auto queueSize = _queue.size();
Metrics::network_config_request_queue_size = queueSize;
auto timedWaitResult = _queue.get(qe, 1000);
if (timedWaitResult == BlockingQueue<_RQEntry*>::STOP) {
break;
}
else if (timedWaitResult == BlockingQueue<_RQEntry*>::OK) {
idleIterations = 0;
if (qe) {
try {
_request(qe->nwid, qe->fromAddr, qe->requestPacketId, qe->identity, qe->metaData);
processedCount++;
}
catch (std::exception& e) {
fprintf(
@ -2490,6 +2495,13 @@ void EmbeddedNetworkController::_startThreads()
qe = nullptr;
}
}
else {
// Periodic liveness log every ~30s (30 timeout iterations of 1000ms)
if (++idleIterations % 30 == 0) {
fprintf(stderr, "request worker %ld: alive, queue_size=%lu, total_processed=%llu\n",
t, (unsigned long)queueSize, (unsigned long long)processedCount);
}
}
}
Metrics::network_config_request_threads--;
});