diff --git a/node/PacketMultiplexer.cpp b/node/PacketMultiplexer.cpp index 673b9a3c1..945f0aa45 100644 --- a/node/PacketMultiplexer.cpp +++ b/node/PacketMultiplexer.cpp @@ -12,8 +12,15 @@ #include "Node.hpp" #include "RuntimeEnvironment.hpp" +#include #include #include +#if defined(__linux__) +#include +#include +#include +#include +#endif namespace ZeroTier { @@ -67,7 +74,6 @@ void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, #endif _enabled = true; _concurrency = concurrency; - bool _enablePinning = cpuPinningEnabled; for (unsigned int i = 0; i < _concurrency; ++i) { fprintf(stderr, "Reserved queue for thread %d\n", i); @@ -76,8 +82,23 @@ void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, // Each thread picks from its own queue to feed into the core for (unsigned int i = 0; i < _concurrency; ++i) { - _rxThreads.push_back(std::thread([this, i, _enablePinning]() { + _rxThreads.push_back(std::thread([this, i, cpuPinningEnabled]() { fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i); +#if defined(__linux__) + if (cpuPinningEnabled) { + const unsigned int cpuCount = std::max(1u, std::thread::hardware_concurrency()); + const int pinCore = static_cast(i % cpuCount); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(pinCore, &cpuset); + int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + fprintf(stderr, "Failed to pin packet thread %d to core %d: %s\n", i, pinCore, strerror(errno)); + } + } +#else + (void)cpuPinningEnabled; +#endif PacketRecord* packet = nullptr; for (;;) {