undo and cleanup

This commit is contained in:
Jay Wren 2026-01-15 16:54:10 -06:00
parent 4228ec01a3
commit b7c3593edf
No known key found for this signature in database
3 changed files with 49 additions and 255 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/slackhq/nebula/cert"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/test"
"github.com/slackhq/nebula/test/device"
"github.com/slackhq/nebula/udp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -52,7 +53,7 @@ func Test_NewConnectionManagerTest(t *testing.T) {
lh := newTestLighthouse()
ifce := &Interface{
hostMap: hostMap,
inside: &test.NoopTun{},
inside: &device.NoopTun{},
outside: &udp.NoopConn{},
firewall: &Firewall{},
lightHouse: lh,
@ -135,7 +136,7 @@ func Test_NewConnectionManagerTest2(t *testing.T) {
lh := newTestLighthouse()
ifce := &Interface{
hostMap: hostMap,
inside: &test.NoopTun{},
inside: &device.NoopTun{},
outside: &udp.NoopConn{},
firewall: &Firewall{},
lightHouse: lh,
@ -220,7 +221,7 @@ func Test_NewConnectionManager_DisconnectInactive(t *testing.T) {
lh := newTestLighthouse()
ifce := &Interface{
hostMap: hostMap,
inside: &test.NoopTun{},
inside: &device.NoopTun{},
outside: &udp.NoopConn{},
firewall: &Firewall{},
lightHouse: lh,
@ -347,7 +348,7 @@ func Test_NewConnectionManagerTest_DisconnectInvalid(t *testing.T) {
lh := newTestLighthouse()
ifce := &Interface{
hostMap: hostMap,
inside: &test.NoopTun{},
inside: &device.NoopTun{},
outside: &udp.NoopConn{},
firewall: &Firewall{},
lightHouse: lh,

232
inside.go
View file

@ -2,7 +2,6 @@ package nebula
import (
"net/netip"
"time"
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/firewall"
@ -12,15 +11,6 @@ import (
"github.com/slackhq/nebula/routing"
)
// preEncryptionPacket holds packet data before batch encryption
type preEncryptionPacket struct {
hostinfo *HostInfo
ci *ConnectionState
packet []byte
out []byte
nb []byte
}
// consumeInsidePackets processes multiple packets in a batch for improved performance
// packets: slice of packet buffers to process
// sizes: slice of packet sizes
@ -105,108 +95,56 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in
continue
}
// Collect all packets that need encryption for batched processing
preEncryptionBatch := make([]preEncryptionPacket, 0, count)
for i := 0; i < count; i++ {
packet := packets[i][:sizes[i]]
out := outs[i]
// Inline the consumeInsidePacket logic for better performance
err := newPacket(packet, false, fwPacket)
if err != nil {
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
}
continue
}
// Ignore local broadcast packets
if f.dropLocalBroadcast {
if f.myBroadcastAddrsTable.Contains(fwPacket.RemoteAddr) {
continue
}
}
if f.myVpnAddrsTable.Contains(fwPacket.RemoteAddr) {
// Immediately forward packets from self to self.
if immediatelyForwardToSelf {
_, err := f.readers[q].Write(packet)
if err != nil {
f.l.WithError(err).Error("Failed to forward to tun")
}
}
continue
}
// Ignore multicast packets
if f.dropMulticast && fwPacket.RemoteAddr.IsMulticast() {
continue
}
hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) {
hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
})
if hostinfo == nil {
f.rejectInside(packet, out, q)
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddr", fwPacket.RemoteAddr).
WithField("fwPacket", fwPacket).
Debugln("dropping outbound packet, vpnAddr not in our vpn networks or in unsafe networks")
}
continue
}
if !ready {
continue
}
dropReason := f.firewall.Drop(*fwPacket, false, hostinfo, f.pki.GetCAPool(), localCache)
if dropReason != nil {
f.rejectInside(packet, out, q)
if f.l.Level >= logrus.DebugLevel {
hostinfo.logger(f.l).
WithField("fwPacket", fwPacket).
WithField("reason", dropReason).
Debugln("dropping outbound packet")
}
continue
}
ci := hostinfo.ConnectionState
if ci.eKey == nil {
continue
}
// Check if this needs relay - if so, send immediately and skip batching
useRelay := !hostinfo.remote.IsValid()
if useRelay {
// Handle relay sends individually (less common path)
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, packet, nb, out, q)
continue
}
// Collect for batched encryption
preEncryptionBatch = append(preEncryptionBatch, preEncryptionPacket{
hostinfo: hostinfo,
ci: ci,
packet: packet,
out: out,
nb: nb,
})
// Encrypt and prepare packet for batch sending
ci := hostinfo.ConnectionState
if ci.eKey == nil {
continue
}
// BATCH ENCRYPTION: Single lock for all packets
if len(preEncryptionBatch) > 0 {
f.encryptBatch(preEncryptionBatch, batchPackets, batchAddrs, q)
// Check if this needs relay - if so, send immediately and skip batching
useRelay := !hostinfo.remote.IsValid()
if useRelay {
// Handle relay sends individually (less common path)
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, packet, nb, out, q)
continue
}
// Encrypt the packet for batch sending
if noiseutil.EncryptLockNeeded {
ci.writeLock.Lock()
}
c := ci.messageCounter.Add(1)
out = header.Encode(out, header.Version, header.Message, 0, hostinfo.remoteIndexId, c)
f.connectionManager.Out(hostinfo)
// Query lighthouse if needed
if hostinfo.lastRebindCount != f.rebindCount {
f.lightHouse.QueryServer(hostinfo.vpnAddrs[0])
hostinfo.lastRebindCount = f.rebindCount
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddrs", hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter")
}
}
out, err = ci.eKey.EncryptDanger(out, out, packet, c, nb)
if noiseutil.EncryptLockNeeded {
ci.writeLock.Unlock()
}
if err != nil {
hostinfo.logger(f.l).WithError(err).
WithField("counter", c).
Error("Failed to encrypt outgoing packet")
continue
}
// Add to batch
*batchPackets = append(*batchPackets, out)
*batchAddrs = append(*batchAddrs, hostinfo.remote)
}
// Send all accumulated packets in one batch
if len(*batchPackets) > 0 {
batchSize := len(*batchPackets)
n, err := f.writers[q].WriteMulti(*batchPackets, *batchAddrs)
if err != nil {
f.l.WithError(err).WithField("sent", n).WithField("total", batchSize).Error("Failed to send batch")
@ -214,92 +152,6 @@ func (f *Interface) consumeInsidePackets(packets [][]byte, sizes []int, count in
}
}
// encryptBatch processes multiple packets with a single lock acquisition
func (f *Interface) encryptBatch(batch []preEncryptionPacket, batchPackets *[][]byte, batchAddrs *[]netip.AddrPort, q int) {
lockStart := time.Now()
// Single lock for entire batch - 90%+ reduction in lock acquisitions
if noiseutil.EncryptLockNeeded {
for i := range batch {
ci := batch[i].ci
// Validate packet data to prevent nil pointer dereference
if ci == nil || batch[i].hostinfo == nil {
continue
}
ci.writeLock.Lock()
defer ci.writeLock.Unlock()
c := ci.messageCounter.Add(1)
out := header.Encode(batch[i].out, header.Version, header.Message, 0, batch[i].hostinfo.remoteIndexId, c)
f.connectionManager.Out(batch[i].hostinfo)
// Query lighthouse if needed
if batch[i].hostinfo.lastRebindCount != f.rebindCount {
f.lightHouse.QueryServer(batch[i].hostinfo.vpnAddrs[0])
batch[i].hostinfo.lastRebindCount = f.rebindCount
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddrs", batch[i].hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter")
}
}
out, err := ci.eKey.EncryptDanger(out, out, batch[i].packet, c, batch[i].nb)
if err != nil {
batch[i].hostinfo.logger(f.l).WithError(err).
WithField("counter", c).
Error("Failed to encrypt outgoing packet")
continue
}
// Add to output batches
*batchPackets = append(*batchPackets, out)
*batchAddrs = append(*batchAddrs, batch[i].hostinfo.remote)
}
} else {
// Regular builds - no locks needed
for i := range batch {
ci := batch[i].ci
// Validate packet data to prevent nil pointer dereference
if ci == nil || batch[i].hostinfo == nil {
continue
}
c := ci.messageCounter.Add(1)
out := header.Encode(batch[i].out, header.Version, header.Message, 0, batch[i].hostinfo.remoteIndexId, c)
f.connectionManager.Out(batch[i].hostinfo)
// Query lighthouse if needed
if batch[i].hostinfo.lastRebindCount != f.rebindCount {
f.lightHouse.QueryServer(batch[i].hostinfo.vpnAddrs[0])
batch[i].hostinfo.lastRebindCount = f.rebindCount
if f.l.Level >= logrus.DebugLevel {
f.l.WithField("vpnAddrs", batch[i].hostinfo.vpnAddrs).Debug("Lighthouse update triggered for punch due to rebind counter")
}
}
out, err := ci.eKey.EncryptDanger(out, out, batch[i].packet, c, batch[i].nb)
if err != nil {
batch[i].hostinfo.logger(f.l).WithError(err).
WithField("counter", c).
Error("Failed to encrypt outgoing packet")
continue
}
// Add to output batches
*batchPackets = append(*batchPackets, out)
*batchAddrs = append(*batchAddrs, batch[i].hostinfo.remote)
}
}
// Record lock contention metrics
encryptionTime := time.Since(lockStart)
if noiseutil.EncryptLockNeeded {
f.batchMetrics.lockAcquisitions.Inc(1) // One lock acquisition per batch
}
f.batchMetrics.encryptionTime.Update(encryptionTime.Nanoseconds())
f.batchMetrics.batchSize.Update(int64(len(batch)))
}
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
err := newPacket(packet, false, fwPacket)
if err != nil {

View file

@ -1,63 +1,4 @@
package test
import (
"errors"
"io"
"net/netip"
"github.com/slackhq/nebula/overlay"
"github.com/slackhq/nebula/routing"
)
type NoopTun struct{}
func (NoopTun) RoutesFor(addr netip.Addr) routing.Gateways {
return routing.Gateways{}
}
func (NoopTun) Activate() error {
return nil
}
func (NoopTun) Networks() []netip.Prefix {
return []netip.Prefix{}
}
func (NoopTun) Name() string {
return "noop"
}
func (NoopTun) Read([]byte) (int, error) {
return 0, nil
}
func (NoopTun) Write([]byte) (int, error) {
return 0, nil
}
func (NoopTun) SupportsMultiqueue() bool {
return false
}
func (NoopTun) NewMultiQueueReader() (overlay.BatchReadWriter, error) {
return nil, errors.New("unsupported")
}
func (NoopTun) Close() error {
return nil
}
// BatchRead implements BatchReadWriter interface
func (NoopTun) BatchRead(bufs [][]byte, sizes []int) (int, error) {
return 0, io.EOF
}
// WriteBatch implements BatchReadWriter interface
func (NoopTun) WriteBatch(bufs [][]byte, offset int) (int, error) {
return len(bufs), nil
}
// BatchSize implements BatchReadWriter interface
func (NoopTun) BatchSize() int {
return 1
}
// NoopTun has been moved to github.com/slackhq/nebula/test/device
// Import that package for NoopTun.