From b7c3593edfca9649df9a077dffa1ef910f09fa96 Mon Sep 17 00:00:00 2001 From: Jay Wren Date: Thu, 15 Jan 2026 16:54:10 -0600 Subject: [PATCH] undo and cleanup --- connection_manager_test.go | 9 +- inside.go | 232 +++++++------------------------------ test/tun.go | 63 +--------- 3 files changed, 49 insertions(+), 255 deletions(-) diff --git a/connection_manager_test.go b/connection_manager_test.go index 647dd72b..f8b15b6c 100644 --- a/connection_manager_test.go +++ b/connection_manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/test" + "github.com/slackhq/nebula/test/device" "github.com/slackhq/nebula/udp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -52,7 +53,7 @@ func Test_NewConnectionManagerTest(t *testing.T) { lh := newTestLighthouse() ifce := &Interface{ hostMap: hostMap, - inside: &test.NoopTun{}, + inside: &device.NoopTun{}, outside: &udp.NoopConn{}, firewall: &Firewall{}, lightHouse: lh, @@ -135,7 +136,7 @@ func Test_NewConnectionManagerTest2(t *testing.T) { lh := newTestLighthouse() ifce := &Interface{ hostMap: hostMap, - inside: &test.NoopTun{}, + inside: &device.NoopTun{}, outside: &udp.NoopConn{}, firewall: &Firewall{}, lightHouse: lh, @@ -220,7 +221,7 @@ func Test_NewConnectionManager_DisconnectInactive(t *testing.T) { lh := newTestLighthouse() ifce := &Interface{ hostMap: hostMap, - inside: &test.NoopTun{}, + inside: &device.NoopTun{}, outside: &udp.NoopConn{}, firewall: &Firewall{}, lightHouse: lh, @@ -347,7 +348,7 @@ func Test_NewConnectionManagerTest_DisconnectInvalid(t *testing.T) { lh := newTestLighthouse() ifce := &Interface{ hostMap: hostMap, - inside: &test.NoopTun{}, + inside: &device.NoopTun{}, outside: &udp.NoopConn{}, firewall: &Firewall{}, lightHouse: lh, diff --git a/inside.go b/inside.go index cbe54425..91ab22a4 100644 --- a/inside.go +++ b/inside.go @@ -2,7 +2,6 @@ package nebula import ( "net/netip" - "time" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/firewall" @@ -12,15 +11,6 @@ import ( "github.com/slackhq/nebula/routing" ) -// preEncryptionPacket holds packet data before batch encryption -type preEncryptionPacket struct { - hostinfo *HostInfo - ci *ConnectionState - packet []byte - out []byte - nb []byte -} - // consumeInsidePackets processes multiple packets in a batch for improved performance // packets: slice of packet buffers to process // sizes: slice of packet sizes @@ -105,108 +95,56 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in continue } - // Collect all packets that need encryption for batched processing - preEncryptionBatch := make([]preEncryptionPacket, 0, count) - - for i := 0; i < count; i++ { - packet := packets[i][:sizes[i]] - out := outs[i] - - // Inline the consumeInsidePacket logic for better performance - err := newPacket(packet, false, fwPacket) - if err != nil { - if f.l.Level >= logrus.DebugLevel { - f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err) - } - continue - } - - // Ignore local broadcast packets - if f.dropLocalBroadcast { - if f.myBroadcastAddrsTable.Contains(fwPacket.RemoteAddr) { - continue - } - } - - if f.myVpnAddrsTable.Contains(fwPacket.RemoteAddr) { - // Immediately forward packets from self to self. - if immediatelyForwardToSelf { - _, err := f.readers[q].Write(packet) - if err != nil { - f.l.WithError(err).Error("Failed to forward to tun") - } - } - continue - } - - // Ignore multicast packets - if f.dropMulticast && fwPacket.RemoteAddr.IsMulticast() { - continue - } - - hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) { - hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics) - }) - - if hostinfo == nil { - f.rejectInside(packet, out, q) - if f.l.Level >= logrus.DebugLevel { - f.l.WithField("vpnAddr", fwPacket.RemoteAddr). - WithField("fwPacket", fwPacket). - Debugln("dropping outbound packet, vpnAddr not in our vpn networks or in unsafe networks") - } - continue - } - - if !ready { - continue - } - - dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache) - if dropReason != nil { - f.rejectInside(packet, out, q) - if f.l.Level >= logrus.DebugLevel { - hostinfo.logger(f.l). - WithField("fwPacket", fwPacket). - WithField("reason", dropReason). - Debugln("dropping outbound packet") - } - continue - } - - ci := hostinfo.ConnectionState - if ci.eKey == nil { - continue - } - - // Check if this needs relay - if so, send immediately and skip batching - useRelay := !hostinfo.remote.IsValid() - if useRelay { - // Handle relay sends individually (less common path) - f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, packet, nb, out, q) - continue - } - - // Collect for batched encryption - preEncryptionBatch = append(preEncryptionBatch, preEncryptionPacket{ - hostinfo: hostinfo, - ci: ci, - packet: packet, - out: out, - nb: nb, - }) + // Encrypt and prepare packet for batch sending + ci := hostinfo.ConnectionState + if ci.eKey == nil { + continue } - // BATCH ENCRYPTION: Single lock for all packets - if len(preEncryptionBatch) > 0 { - f.encryptBatch(preEncryptionBatch, batchPackets, batchAddrs, q) + // Check if this needs relay - if so, send immediately and skip batching + useRelay := !hostinfo.remote.IsValid() + if useRelay { + // Handle relay sends individually (less common path) + f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, packet, nb, out, q) + continue } + + // Encrypt the packet for batch sending + if noiseutil.EncryptLockNeeded { + ci.writeLock.Lock() + } + c := ci.messageCounter.Add(1) + out = header.Encode(out, header.Version, header.Message, 0, hostinfo.remoteIndexId, c) + f.connectionManager.Out(hostinfo) + + // Query lighthouse if needed + if hostinfo.lastRebindCount != f.rebindCount { + f.lightHouse.QueryServer(hostinfo.vpnAddrs[0]) + hostinfo.lastRebindCount = f.rebindCount + if f.l.Level >= logrus.DebugLevel { + f.l.WithField("vpnAddrs", hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter") + } + } + + out, err = ci.eKey.EncryptDanger(out, out, packet, c, nb) + if noiseutil.EncryptLockNeeded { + ci.writeLock.Unlock() + } + if err != nil { + hostinfo.logger(f.l).WithError(err). + WithField("counter", c). + Error("Failed to encrypt outgoing packet") + continue + } + + // Add to batch + *batchPackets = append(*batchPackets, out) + *batchAddrs = append(*batchAddrs, hostinfo.remote) } // Send all accumulated packets in one batch if len(*batchPackets) > 0 { batchSize := len(*batchPackets) - n, err := f.writers[q].WriteMulti(*batchPackets, *batchAddrs) if err != nil { f.l.WithError(err).WithField("sent", n).WithField("total", batchSize).Error("Failed to send batch") @@ -214,92 +152,6 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in } } -// encryptBatch processes multiple packets with a single lock acquisition -func (f *Interface) encryptBatch(batch []preEncryptionPacket, batchPackets *[][]byte, batchAddrs *[]netip.AddrPort, q int) { - lockStart := time.Now() - - // Single lock for entire batch - 90%+ reduction in lock acquisitions - if noiseutil.EncryptLockNeeded { - for i := range batch { - ci := batch[i].ci - // Validate packet data to prevent nil pointer dereference - if ci == nil || batch[i].hostinfo == nil { - continue - } - - ci.writeLock.Lock() - defer ci.writeLock.Unlock() - - c := ci.messageCounter.Add(1) - out := header.Encode(batch[i].out, header.Version, header.Message, 0, batch[i].hostinfo.remoteIndexId, c) - f.connectionManager.Out(batch[i].hostinfo) - - // Query lighthouse if needed - if batch[i].hostinfo.lastRebindCount != f.rebindCount { - f.lightHouse.QueryServer(batch[i].hostinfo.vpnAddrs[0]) - batch[i].hostinfo.lastRebindCount = f.rebindCount - if f.l.Level >= logrus.DebugLevel { - f.l.WithField("vpnAddrs", batch[i].hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter") - } - } - - out, err := ci.eKey.EncryptDanger(out, out, batch[i].packet, c, batch[i].nb) - if err != nil { - batch[i].hostinfo.logger(f.l).WithError(err). - WithField("counter", c). - Error("Failed to encrypt outgoing packet") - continue - } - - // Add to output batches - *batchPackets = append(*batchPackets, out) - *batchAddrs = append(*batchAddrs, batch[i].hostinfo.remote) - } - } else { - // Regular builds - no locks needed - for i := range batch { - ci := batch[i].ci - // Validate packet data to prevent nil pointer dereference - if ci == nil || batch[i].hostinfo == nil { - continue - } - - c := ci.messageCounter.Add(1) - out := header.Encode(batch[i].out, header.Version, header.Message, 0, batch[i].hostinfo.remoteIndexId, c) - f.connectionManager.Out(batch[i].hostinfo) - - // Query lighthouse if needed - if batch[i].hostinfo.lastRebindCount != f.rebindCount { - f.lightHouse.QueryServer(batch[i].hostinfo.vpnAddrs[0]) - batch[i].hostinfo.lastRebindCount = f.rebindCount - if f.l.Level >= logrus.DebugLevel { - f.l.WithField("vpnAddrs", batch[i].hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter") - } - } - - out, err := ci.eKey.EncryptDanger(out, out, batch[i].packet, c, batch[i].nb) - if err != nil { - batch[i].hostinfo.logger(f.l).WithError(err). - WithField("counter", c). - Error("Failed to encrypt outgoing packet") - continue - } - - // Add to output batches - *batchPackets = append(*batchPackets, out) - *batchAddrs = append(*batchAddrs, batch[i].hostinfo.remote) - } - } - - // Record lock contention metrics - encryptionTime := time.Since(lockStart) - if noiseutil.EncryptLockNeeded { - f.batchMetrics.lockAcquisitions.Inc(1) // One lock acquisition per batch - } - f.batchMetrics.encryptionTime.Update(encryptionTime.Nanoseconds()) - f.batchMetrics.batchSize.Update(int64(len(batch))) -} - func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) { err := newPacket(packet, false, fwPacket) if err != nil { diff --git a/test/tun.go b/test/tun.go index 65eb3f3a..b8fe8954 100644 --- a/test/tun.go +++ b/test/tun.go @@ -1,63 +1,4 @@ package test -import ( - "errors" - "io" - "net/netip" - - "github.com/slackhq/nebula/overlay" - "github.com/slackhq/nebula/routing" -) - -type NoopTun struct{} - -func (NoopTun) RoutesFor(addr netip.Addr) routing.Gateways { - return routing.Gateways{} -} - -func (NoopTun) Activate() error { - return nil -} - -func (NoopTun) Networks() []netip.Prefix { - return []netip.Prefix{} -} - -func (NoopTun) Name() string { - return "noop" -} - -func (NoopTun) Read([]byte) (int, error) { - return 0, nil -} - -func (NoopTun) Write([]byte) (int, error) { - return 0, nil -} - -func (NoopTun) SupportsMultiqueue() bool { - return false -} - -func (NoopTun) NewMultiQueueReader() (overlay.BatchReadWriter, error) { - return nil, errors.New("unsupported") -} - -func (NoopTun) Close() error { - return nil -} - -// BatchRead implements BatchReadWriter interface -func (NoopTun) BatchRead(bufs [][]byte, sizes []int) (int, error) { - return 0, io.EOF -} - -// WriteBatch implements BatchReadWriter interface -func (NoopTun) WriteBatch(bufs [][]byte, offset int) (int, error) { - return len(bufs), nil -} - -// BatchSize implements BatchReadWriter interface -func (NoopTun) BatchSize() int { - return 1 -} +// NoopTun has been moved to github.com/slackhq/nebula/test/device +// Import that package for NoopTun.