From 9bfdfbafc1e1ab9fbb77c59a510d8f7560d6f3e4 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Wed, 20 Nov 2024 22:49:53 -0500 Subject: [PATCH 01/15] Backport reestablish relays from cert-v2 to release-1.9 (#1277) --- e2e/handshakes_test.go | 134 ++++++++++++++++++++++++++++++++++++++++ handshake_ix.go | 3 + handshake_manager.go | 90 ++++++++++++++------------- hostmap.go | 51 ++++++++++++++++ relay_manager.go | 136 ++++++++++++++++------------------------- 5 files changed, 288 insertions(+), 126 deletions(-) diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go index 3d42a560..7267dd26 100644 --- a/e2e/handshakes_test.go +++ b/e2e/handshakes_test.go @@ -6,9 +6,12 @@ package e2e import ( "fmt" "net/netip" + "slices" "testing" "time" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" "github.com/sirupsen/logrus" "github.com/slackhq/nebula" "github.com/slackhq/nebula/e2e/router" @@ -369,6 +372,137 @@ func TestRelays(t *testing.T) { //TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it } +func TestReestablishRelays(t *testing.T) { + ca, _, caKey, _ := NewTestCaCert(time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{}) + myControl, myVpnIpNet, _, _ := newSimpleServer(ca, caKey, "me ", "10.128.0.1/24", m{"relay": m{"use_relays": true}}) + relayControl, relayVpnIpNet, relayUdpAddr, _ := newSimpleServer(ca, caKey, "relay ", "10.128.0.128/24", m{"relay": m{"am_relay": true}}) + theirControl, theirVpnIpNet, theirUdpAddr, _ := newSimpleServer(ca, caKey, "them ", "10.128.0.2/24", m{"relay": m{"use_relays": true}}) + + // Teach my how to get to the relay and that their can be reached via the relay + myControl.InjectLightHouseAddr(relayVpnIpNet.Addr(), relayUdpAddr) + myControl.InjectRelays(theirVpnIpNet.Addr(), []netip.Addr{relayVpnIpNet.Addr()}) + relayControl.InjectLightHouseAddr(theirVpnIpNet.Addr(), theirUdpAddr) + + // Build a router so we don't have to reason who gets which packet + r := router.NewR(t, myControl, relayControl, theirControl) + defer r.RenderFlow() + + // Start the servers + myControl.Start() + relayControl.Start() + theirControl.Start() + + t.Log("Trigger a handshake from me to them via the relay") + myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me")) + + p := r.RouteForAllUntilTxTun(theirControl) + r.Log("Assert the tunnel works") + assertUdpPacket(t, []byte("Hi from me"), p, myVpnIpNet.Addr(), theirVpnIpNet.Addr(), 80, 80) + + t.Log("Ensure packet traversal from them to me via the relay") + theirControl.InjectTunUDPPacket(myVpnIpNet.Addr(), 80, 80, []byte("Hi from them")) + + p = r.RouteForAllUntilTxTun(myControl) + r.Log("Assert the tunnel works") + assertUdpPacket(t, []byte("Hi from them"), p, theirVpnIpNet.Addr(), myVpnIpNet.Addr(), 80, 80) + + // If we break the relay's connection to 'them', 'me' needs to detect and recover the connection + r.Log("Close the tunnel") + relayControl.CloseTunnel(theirVpnIpNet.Addr(), true) + + start := len(myControl.GetHostmap().Indexes) + curIndexes := len(myControl.GetHostmap().Indexes) + for curIndexes >= start { + curIndexes = len(myControl.GetHostmap().Indexes) + r.Logf("Wait for the dead index to go away:start=%v indexes, currnet=%v indexes", start, curIndexes) + myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me should fail")) + + r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { + return router.RouteAndExit + }) + time.Sleep(2 * time.Second) + } + r.Log("Dead index went away. Woot!") + r.RenderHostmaps("Me removed hostinfo", myControl, relayControl, theirControl) + // Next packet should re-establish a relayed connection and work just great. + + t.Logf("Assert the tunnel...") + for { + t.Log("RouteForAllUntilTxTun") + myControl.InjectLightHouseAddr(relayVpnIpNet.Addr(), relayUdpAddr) + myControl.InjectRelays(theirVpnIpNet.Addr(), []netip.Addr{relayVpnIpNet.Addr()}) + relayControl.InjectLightHouseAddr(theirVpnIpNet.Addr(), theirUdpAddr) + myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me")) + + p = r.RouteForAllUntilTxTun(theirControl) + r.Log("Assert the tunnel works") + packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy) + v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + if slices.Compare(v4.SrcIP, myVpnIpNet.Addr().AsSlice()) != 0 { + t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking") + continue + } + if slices.Compare(v4.DstIP, theirVpnIpNet.Addr().AsSlice()) != 0 { + t.Logf("DstIP is unexpected...this is not the packet I'm looking for. Keep looking") + continue + } + + udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP) + if udp == nil { + t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking") + continue + } + data := packet.ApplicationLayer() + if data == nil { + t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.") + continue + } + if string(data.Payload()) != "Hi from me" { + t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload())) + continue + } + t.Log("I found my lost packet. I am so happy.") + break + } + t.Log("Assert the tunnel works the other way, too") + for { + t.Log("RouteForAllUntilTxTun") + theirControl.InjectTunUDPPacket(myVpnIpNet.Addr(), 80, 80, []byte("Hi from them")) + + p = r.RouteForAllUntilTxTun(myControl) + r.Log("Assert the tunnel works") + packet := gopacket.NewPacket(p, layers.LayerTypeIPv4, gopacket.Lazy) + v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + if slices.Compare(v4.DstIP, myVpnIpNet.Addr().AsSlice()) != 0 { + t.Logf("Dst is unexpected...this is not the packet I'm looking for. Keep looking") + continue + } + if slices.Compare(v4.SrcIP, theirVpnIpNet.Addr().AsSlice()) != 0 { + t.Logf("SrcIP is unexpected...this is not the packet I'm looking for. Keep looking") + continue + } + + udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP) + if udp == nil { + t.Log("Not a UDP packet. This is not the packet I'm looking for. Keep looking") + continue + } + data := packet.ApplicationLayer() + if data == nil { + t.Log("No data found in packet. This is not the packet I'm looking for. Keep looking.") + continue + } + if string(data.Payload()) != "Hi from them" { + t.Logf("Unexpected payload: '%v', keep looking", string(data.Payload())) + continue + } + t.Log("I found my lost packet. I am so happy.") + break + } + r.RenderHostmaps("Final hostmaps", myControl, relayControl, theirControl) + +} + func TestStage1RaceRelays(t *testing.T) { //NOTE: this is a race between me and relay resulting in a full tunnel from me to them via relay ca, _, caKey, _ := NewTestCaCert(time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{}) diff --git a/handshake_ix.go b/handshake_ix.go index 8cf53411..3e701b0d 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -322,6 +322,9 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet return } hostinfo.relayState.InsertRelayTo(via.relayHI.vpnIp) + // I successfully received a handshake. Just in case I marked this tunnel as 'Disestablished', ensure + // it's correctly marked as working. + via.relayHI.relayState.UpdateRelayForByIdxState(via.remoteIdx, Established) f.SendVia(via.relayHI, via.relay, msg, make([]byte, 12), make([]byte, mtu), false) f.l.WithField("vpnIp", vpnIp).WithField("relay", via.relayHI.vpnIp). WithField("certName", certName). diff --git a/handshake_manager.go b/handshake_manager.go index 1df37bdb..d87ff02a 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -278,48 +278,8 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered continue } // Check the relay HostInfo to see if we already established a relay through it - if existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp); ok { - switch existingRelay.State { - case Established: - hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay") - hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false) - case Requested: - hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request") - - //TODO: IPV6-WORK - myVpnIpB := hm.f.myVpnNet.Addr().As4() - theirVpnIpB := vpnIp.As4() - - // Re-send the CreateRelay request, in case the previous one was lost. - m := NebulaControl{ - Type: NebulaControl_CreateRelayRequest, - InitiatorRelayIndex: existingRelay.LocalIndex, - RelayFromIp: binary.BigEndian.Uint32(myVpnIpB[:]), - RelayToIp: binary.BigEndian.Uint32(theirVpnIpB[:]), - } - msg, err := m.Marshal() - if err != nil { - hostinfo.logger(hm.l). - WithError(err). - Error("Failed to marshal Control message to create relay") - } else { - // This must send over the hostinfo, not over hm.Hosts[ip] - hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) - hm.l.WithFields(logrus.Fields{ - "relayFrom": hm.f.myVpnNet.Addr(), - "relayTo": vpnIp, - "initiatorRelayIndex": existingRelay.LocalIndex, - "relay": relay}). - Info("send CreateRelayRequest") - } - default: - hostinfo.logger(hm.l). - WithField("vpnIp", vpnIp). - WithField("state", existingRelay.State). - WithField("relay", relayHostInfo.vpnIp). - Errorf("Relay unexpected state") - } - } else { + existingRelay, ok := relayHostInfo.relayState.QueryRelayForByIp(vpnIp) + if !ok { // No relays exist or requested yet. if relayHostInfo.remote.IsValid() { idx, err := AddRelay(hm.l, relayHostInfo, hm.mainHostMap, vpnIp, nil, TerminalType, Requested) @@ -352,6 +312,52 @@ func (hm *HandshakeManager) handleOutbound(vpnIp netip.Addr, lighthouseTriggered Info("send CreateRelayRequest") } } + continue + } + switch existingRelay.State { + case Established: + hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Send handshake via relay") + hm.f.SendVia(relayHostInfo, existingRelay, hostinfo.HandshakePacket[0], make([]byte, 12), make([]byte, mtu), false) + case Disestablished: + // Mark this relay as 'requested' + relayHostInfo.relayState.UpdateRelayForByIpState(vpnIp, Requested) + fallthrough + case Requested: + hostinfo.logger(hm.l).WithField("relay", relay.String()).Info("Re-send CreateRelay request") + // Re-send the CreateRelay request, in case the previous one was lost. + relayFrom := hm.f.myVpnNet.Addr().As4() + relayTo := vpnIp.As4() + m := NebulaControl{ + Type: NebulaControl_CreateRelayRequest, + InitiatorRelayIndex: existingRelay.LocalIndex, + RelayFromIp: binary.BigEndian.Uint32(relayFrom[:]), + RelayToIp: binary.BigEndian.Uint32(relayTo[:]), + } + + msg, err := m.Marshal() + if err != nil { + hostinfo.logger(hm.l). + WithError(err). + Error("Failed to marshal Control message to create relay") + } else { + // This must send over the hostinfo, not over hm.Hosts[ip] + hm.f.SendMessageToHostInfo(header.Control, 0, relayHostInfo, msg, make([]byte, 12), make([]byte, mtu)) + hm.l.WithFields(logrus.Fields{ + "relayFrom": hm.f.myVpnNet, + "relayTo": vpnIp, + "initiatorRelayIndex": existingRelay.LocalIndex, + "relay": relay}). + Info("send CreateRelayRequest") + } + case PeerRequested: + // PeerRequested only occurs in Forwarding relays, not Terminal relays, and this is a Terminal relay case. + fallthrough + default: + hostinfo.logger(hm.l). + WithField("vpnIp", vpnIp). + WithField("state", existingRelay.State). + WithField("relay", relay). + Errorf("Relay unexpected state") } } } diff --git a/hostmap.go b/hostmap.go index fb97b76d..7258282d 100644 --- a/hostmap.go +++ b/hostmap.go @@ -35,6 +35,7 @@ const ( Requested = iota PeerRequested Established + Disestablished ) const ( @@ -79,6 +80,28 @@ func (rs *RelayState) DeleteRelay(ip netip.Addr) { delete(rs.relays, ip) } +func (rs *RelayState) UpdateRelayForByIpState(vpnIp netip.Addr, state int) { + rs.Lock() + defer rs.Unlock() + if r, ok := rs.relayForByIp[vpnIp]; ok { + newRelay := *r + newRelay.State = state + rs.relayForByIp[newRelay.PeerIp] = &newRelay + rs.relayForByIdx[newRelay.LocalIndex] = &newRelay + } +} + +func (rs *RelayState) UpdateRelayForByIdxState(idx uint32, state int) { + rs.Lock() + defer rs.Unlock() + if r, ok := rs.relayForByIdx[idx]; ok { + newRelay := *r + newRelay.State = state + rs.relayForByIp[newRelay.PeerIp] = &newRelay + rs.relayForByIdx[newRelay.LocalIndex] = &newRelay + } +} + func (rs *RelayState) CopyAllRelayFor() []*Relay { rs.RLock() defer rs.RUnlock() @@ -361,6 +384,7 @@ func (hm *HostMap) unlockedMakePrimary(hostinfo *HostInfo) { func (hm *HostMap) unlockedDeleteHostInfo(hostinfo *HostInfo) { primary, ok := hm.Hosts[hostinfo.vpnIp] + isLastHostinfo := hostinfo.next == nil && hostinfo.prev == nil if ok && primary == hostinfo { // The vpnIp pointer points to the same hostinfo as the local index id, we can remove it delete(hm.Hosts, hostinfo.vpnIp) @@ -410,6 +434,12 @@ func (hm *HostMap) unlockedDeleteHostInfo(hostinfo *HostInfo) { Debug("Hostmap hostInfo deleted") } + if isLastHostinfo { + // I have lost connectivity to my peers. My relay tunnel is likely broken. Mark the next + // hops as 'Disestablished' so that new relay tunnels are created in the future. + hm.unlockedDisestablishVpnAddrRelayFor(hostinfo) + } + // Clean up any local relay indexes for which I am acting as a relay hop for _, localRelayIdx := range hostinfo.relayState.CopyRelayForIdxs() { delete(hm.Relays, localRelayIdx) } @@ -470,6 +500,27 @@ func (hm *HostMap) QueryVpnIpRelayFor(targetIp, relayHostIp netip.Addr) (*HostIn return nil, nil, errors.New("unable to find host with relay") } +func (hm *HostMap) unlockedDisestablishVpnAddrRelayFor(hi *HostInfo) { + for _, relayHostIp := range hi.relayState.CopyRelayIps() { + if h, ok := hm.Hosts[relayHostIp]; ok { + for h != nil { + h.relayState.UpdateRelayForByIpState(hi.vpnIp, Disestablished) + h = h.next + } + } + } + for _, rs := range hi.relayState.CopyAllRelayFor() { + if rs.Type == ForwardingType { + if h, ok := hm.Hosts[rs.PeerIp]; ok { + for h != nil { + h.relayState.UpdateRelayForByIpState(hi.vpnIp, Disestablished) + h = h.next + } + } + } + } +} + func (hm *HostMap) queryVpnIp(vpnIp netip.Addr, promoteIfce *Interface) *HostInfo { hm.RLock() if h, ok := hm.Hosts[vpnIp]; ok { diff --git a/relay_manager.go b/relay_manager.go index 1a3a4d48..375b4223 100644 --- a/relay_manager.go +++ b/relay_manager.go @@ -146,10 +146,14 @@ func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m * rm.l.WithField("relayTo", peerHostInfo.vpnIp).Error("peerRelay does not have Relay state for relayTo") return } - if peerRelay.State == PeerRequested { + switch peerRelay.State { + case Requested: + // I initiated the request to this peer, but haven't heard back from the peer yet. I must wait for this peer + // to respond to complete the connection. + case PeerRequested, Disestablished, Established: + peerHostInfo.relayState.UpdateRelayForByIpState(targetAddr, Established) //TODO: IPV6-WORK b = peerHostInfo.vpnIp.As4() - peerRelay.State = Established resp := NebulaControl{ Type: NebulaControl_CreateRelayResponse, ResponderRelayIndex: peerRelay.LocalIndex, @@ -215,6 +219,21 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") return } + case Disestablished: + if existingRelay.RemoteIndex != m.InitiatorRelayIndex { + // We got a brand new Relay request, because its index is different than what we saw before. + // This should never happen. The peer should never change an index, once created. + logMsg.WithFields(logrus.Fields{ + "existingRemoteIndex": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") + return + } + // Mark the relay as 'Established' because it's safe to use again + h.relayState.UpdateRelayForByIpState(from, Established) + case PeerRequested: + // I should never be in this state, because I am terminal, not forwarding. + logMsg.WithFields(logrus.Fields{ + "existingRemoteIndex": existingRelay.RemoteIndex, + "state": existingRelay.State}).Error("Unexpected Relay State found") } } else { _, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established) @@ -226,7 +245,7 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N relay, ok := h.relayState.QueryRelayForByIp(from) if !ok { - logMsg.Error("Relay State not found") + logMsg.WithField("from", from).Error("Relay State not found") return } @@ -273,103 +292,52 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N // Only create relays to peers for whom I have a direct connection return } - sendCreateRequest := false var index uint32 var err error targetRelay, ok := peer.relayState.QueryRelayForByIp(from) if ok { index = targetRelay.LocalIndex - if targetRelay.State == Requested { - sendCreateRequest = true - } } else { // Allocate an index in the hostMap for this relay peer index, err = AddRelay(rm.l, peer, f.hostMap, from, nil, ForwardingType, Requested) if err != nil { return } - sendCreateRequest = true } - if sendCreateRequest { - //TODO: IPV6-WORK - fromB := h.vpnIp.As4() - targetB := target.As4() - - // Send a CreateRelayRequest to the peer. - req := NebulaControl{ - Type: NebulaControl_CreateRelayRequest, - InitiatorRelayIndex: index, - RelayFromIp: binary.BigEndian.Uint32(fromB[:]), - RelayToIp: binary.BigEndian.Uint32(targetB[:]), - } - msg, err := req.Marshal() - if err != nil { - logMsg. - WithError(err).Error("relayManager Failed to marshal Control message to create relay") - } else { - f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu)) - rm.l.WithFields(logrus.Fields{ - //TODO: IPV6-WORK another lazy used to use the req object - "relayFrom": h.vpnIp, - "relayTo": target, - "initiatorRelayIndex": req.InitiatorRelayIndex, - "responderRelayIndex": req.ResponderRelayIndex, - "vpnIp": target}). - Info("send CreateRelayRequest") - } + peer.relayState.UpdateRelayForByIpState(from, Requested) + // Send a CreateRelayRequest to the peer. + fromB := from.As4() + targetB := target.As4() + req := NebulaControl{ + Type: NebulaControl_CreateRelayRequest, + InitiatorRelayIndex: index, + RelayFromIp: binary.BigEndian.Uint32(fromB[:]), + RelayToIp: binary.BigEndian.Uint32(targetB[:]), } - // Also track the half-created Relay state just received - relay, ok := h.relayState.QueryRelayForByIp(target) - if !ok { - // Add the relay - state := PeerRequested - if targetRelay != nil && targetRelay.State == Established { - state = Established - } - _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state) - if err != nil { - logMsg. - WithError(err).Error("relayManager Failed to allocate a local index for relay") - return - } + msg, err := req.Marshal() + if err != nil { + logMsg. + WithError(err).Error("relayManager Failed to marshal Control message to create relay") } else { - switch relay.State { - case Established: - if relay.RemoteIndex != m.InitiatorRelayIndex { - // We got a brand new Relay request, because its index is different than what we saw before. - // This should never happen. The peer should never change an index, once created. - logMsg.WithFields(logrus.Fields{ - "existingRemoteIndex": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest") + f.SendMessageToHostInfo(header.Control, 0, peer, msg, make([]byte, 12), make([]byte, mtu)) + rm.l.WithFields(logrus.Fields{ + //TODO: IPV6-WORK another lazy used to use the req object + "relayFrom": h.vpnIp, + "relayTo": target, + "initiatorRelayIndex": req.InitiatorRelayIndex, + "responderRelayIndex": req.ResponderRelayIndex, + "vpnAddr": target}). + Info("send CreateRelayRequest") + // Also track the half-created Relay state just received + _, ok := h.relayState.QueryRelayForByIp(target) + if !ok { + // Add the relay + _, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, PeerRequested) + if err != nil { + logMsg. + WithError(err).Error("relayManager Failed to allocate a local index for relay") return } - //TODO: IPV6-WORK - fromB := h.vpnIp.As4() - targetB := target.As4() - resp := NebulaControl{ - Type: NebulaControl_CreateRelayResponse, - ResponderRelayIndex: relay.LocalIndex, - InitiatorRelayIndex: relay.RemoteIndex, - RelayFromIp: binary.BigEndian.Uint32(fromB[:]), - RelayToIp: binary.BigEndian.Uint32(targetB[:]), - } - msg, err := resp.Marshal() - if err != nil { - rm.l. - WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay") - } else { - f.SendMessageToHostInfo(header.Control, 0, h, msg, make([]byte, 12), make([]byte, mtu)) - rm.l.WithFields(logrus.Fields{ - //TODO: IPV6-WORK more lazy, used to use resp object - "relayFrom": h.vpnIp, - "relayTo": target, - "initiatorRelayIndex": resp.InitiatorRelayIndex, - "responderRelayIndex": resp.ResponderRelayIndex, - "vpnIp": h.vpnIp}). - Info("send CreateRelayResponse") - } - - case Requested: - // Keep waiting for the other relay to complete } } } From 2e85d138cdd731934a981e67ad3fcbaf4c35b168 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Tue, 3 Dec 2024 09:49:54 -0600 Subject: [PATCH 02/15] [v1.9.x] do not panic when loading a V2 CA certificate (#1282) Co-authored-by: Jack Doan --- cert/ca.go | 28 ++++++++++++++++++---------- cert/cert.go | 4 ++++ cert/cert_test.go | 37 +++++++++++++++++++++++++++++-------- cert/errors.go | 13 +++++++------ pki.go | 21 ++++++--------------- 5 files changed, 64 insertions(+), 39 deletions(-) diff --git a/cert/ca.go b/cert/ca.go index 0ffbd879..cfb99c22 100644 --- a/cert/ca.go +++ b/cert/ca.go @@ -24,31 +24,39 @@ func NewCAPool() *NebulaCAPool { // NewCAPoolFromBytes will create a new CA pool from the provided // input bytes, which must be a PEM-encoded set of nebula certificates. +// If the pool contains unsupported certificates, they will generate warnings +// in the []error return arg. // If the pool contains any expired certificates, an ErrExpired will be // returned along with the pool. The caller must handle any such errors. -func NewCAPoolFromBytes(caPEMs []byte) (*NebulaCAPool, error) { +func NewCAPoolFromBytes(caPEMs []byte) (*NebulaCAPool, []error, error) { pool := NewCAPool() var err error - var expired bool + var warnings []error + good := 0 + for { caPEMs, err = pool.AddCACertificate(caPEMs) if errors.Is(err, ErrExpired) { - expired = true - err = nil - } - if err != nil { - return nil, err + warnings = append(warnings, err) + } else if errors.Is(err, ErrInvalidPEMCertificateUnsupported) { + warnings = append(warnings, err) + } else if err != nil { + return nil, warnings, err + } else { + // Only consider a good certificate if there were no errors present + good++ } + if len(caPEMs) == 0 || strings.TrimSpace(string(caPEMs)) == "" { break } } - if expired { - return pool, ErrExpired + if good == 0 { + return nil, warnings, errors.New("no valid CA certificates present") } - return pool, nil + return pool, warnings, nil } // AddCACertificate verifies a Nebula CA certificate and adds it to the pool diff --git a/cert/cert.go b/cert/cert.go index a0164f7b..3cb50ddb 100644 --- a/cert/cert.go +++ b/cert/cert.go @@ -28,6 +28,7 @@ const publicKeyLen = 32 const ( CertBanner = "NEBULA CERTIFICATE" + CertificateV2Banner = "NEBULA CERTIFICATE V2" X25519PrivateKeyBanner = "NEBULA X25519 PRIVATE KEY" X25519PublicKeyBanner = "NEBULA X25519 PUBLIC KEY" EncryptedEd25519PrivateKeyBanner = "NEBULA ED25519 ENCRYPTED PRIVATE KEY" @@ -163,6 +164,9 @@ func UnmarshalNebulaCertificateFromPEM(b []byte) (*NebulaCertificate, []byte, er if p == nil { return nil, r, fmt.Errorf("input did not contain a valid PEM encoded block") } + if p.Type == CertificateV2Banner { + return nil, r, fmt.Errorf("%w: %s", ErrInvalidPEMCertificateUnsupported, p.Type) + } if p.Type != CertBanner { return nil, r, fmt.Errorf("bytes did not contain a proper nebula certificate banner") } diff --git a/cert/cert_test.go b/cert/cert_test.go index 30e99eca..3acc8ded 100644 --- a/cert/cert_test.go +++ b/cert/cert_test.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "errors" "fmt" "io" "net" @@ -572,6 +573,13 @@ CmYKEG5lYnVsYSBQMjU2IHRlc3Qo4s+7mgYw4tXrsAc6QQRkaW2jFmllYvN4+/k2 76gvQAGgBgESRzBFAiEAib0/te6eMiZOKD8gdDeloMTS0wGuX2t0C7TFdUhAQzgC IBNWYMep3ysx9zCgknfG5dKtwGTaqF++BWKDYdyl34KX -----END NEBULA CERTIFICATE----- +` + + v2 := ` +# valid PEM with the V2 header +-----BEGIN NEBULA CERTIFICATE V2----- +CmYKEG5lYnVsYSBQMjU2IHRlc3Qo4s+7mgYw4tXrsAc6QQRkaW2jFmllYvN4+/k2 +-----END NEBULA CERTIFICATE V2----- ` rootCA := NebulaCertificate{ @@ -592,33 +600,46 @@ IBNWYMep3ysx9zCgknfG5dKtwGTaqF++BWKDYdyl34KX }, } - p, err := NewCAPoolFromBytes([]byte(noNewLines)) + p, warn, err := NewCAPoolFromBytes([]byte(noNewLines)) assert.Nil(t, err) + assert.Nil(t, warn) assert.Equal(t, p.CAs[string("c9bfaf7ce8e84b2eeda2e27b469f4b9617bde192efd214b68891ecda6ed49522")].Details.Name, rootCA.Details.Name) assert.Equal(t, p.CAs[string("5c9c3f23e7ee7fe97637cbd3a0a5b854154d1d9aaaf7b566a51f4a88f76b64cd")].Details.Name, rootCA01.Details.Name) - pp, err := NewCAPoolFromBytes([]byte(withNewLines)) + pp, warn, err := NewCAPoolFromBytes([]byte(withNewLines)) assert.Nil(t, err) + assert.Nil(t, warn) assert.Equal(t, pp.CAs[string("c9bfaf7ce8e84b2eeda2e27b469f4b9617bde192efd214b68891ecda6ed49522")].Details.Name, rootCA.Details.Name) assert.Equal(t, pp.CAs[string("5c9c3f23e7ee7fe97637cbd3a0a5b854154d1d9aaaf7b566a51f4a88f76b64cd")].Details.Name, rootCA01.Details.Name) // expired cert, no valid certs - ppp, err := NewCAPoolFromBytes([]byte(expired)) - assert.Equal(t, ErrExpired, err) - assert.Equal(t, ppp.CAs[string("152070be6bb19bc9e3bde4c2f0e7d8f4ff5448b4c9856b8eccb314fade0229b0")].Details.Name, "expired") + ppp, warn, err := NewCAPoolFromBytes([]byte(expired)) + assert.Error(t, err, "no valid CA certificates present") + assert.Len(t, warn, 1) + assert.Error(t, warn[0], ErrExpired) + assert.Nil(t, ppp) // expired cert, with valid certs - pppp, err := NewCAPoolFromBytes(append([]byte(expired), noNewLines...)) - assert.Equal(t, ErrExpired, err) + pppp, warn, err := NewCAPoolFromBytes(append([]byte(expired), noNewLines...)) + assert.Len(t, warn, 1) + assert.Nil(t, err) + assert.Error(t, warn[0], ErrExpired) assert.Equal(t, pppp.CAs[string("c9bfaf7ce8e84b2eeda2e27b469f4b9617bde192efd214b68891ecda6ed49522")].Details.Name, rootCA.Details.Name) assert.Equal(t, pppp.CAs[string("5c9c3f23e7ee7fe97637cbd3a0a5b854154d1d9aaaf7b566a51f4a88f76b64cd")].Details.Name, rootCA01.Details.Name) assert.Equal(t, pppp.CAs[string("152070be6bb19bc9e3bde4c2f0e7d8f4ff5448b4c9856b8eccb314fade0229b0")].Details.Name, "expired") assert.Equal(t, len(pppp.CAs), 3) - ppppp, err := NewCAPoolFromBytes([]byte(p256)) + ppppp, warn, err := NewCAPoolFromBytes([]byte(p256)) assert.Nil(t, err) + assert.Nil(t, warn) assert.Equal(t, ppppp.CAs[string("a7938893ec8c4ef769b06d7f425e5e46f7a7f5ffa49c3bcf4a86b608caba9159")].Details.Name, rootCAP256.Details.Name) assert.Equal(t, len(ppppp.CAs), 1) + + pppppp, warn, err := NewCAPoolFromBytes(append([]byte(p256), []byte(v2)...)) + assert.Nil(t, err) + assert.True(t, errors.Is(warn[0], ErrInvalidPEMCertificateUnsupported)) + assert.Equal(t, pppppp.CAs[string("a7938893ec8c4ef769b06d7f425e5e46f7a7f5ffa49c3bcf4a86b608caba9159")].Details.Name, rootCAP256.Details.Name) + assert.Equal(t, len(pppppp.CAs), 1) } func appendByteSlices(b ...[]byte) []byte { diff --git a/cert/errors.go b/cert/errors.go index 05b42d10..df2dd4e5 100644 --- a/cert/errors.go +++ b/cert/errors.go @@ -5,10 +5,11 @@ import ( ) var ( - ErrRootExpired = errors.New("root certificate is expired") - ErrExpired = errors.New("certificate is expired") - ErrNotCA = errors.New("certificate is not a CA") - ErrNotSelfSigned = errors.New("certificate is not self-signed") - ErrBlockListed = errors.New("certificate is in the block list") - ErrSignatureMismatch = errors.New("certificate signature did not match") + ErrRootExpired = errors.New("root certificate is expired") + ErrExpired = errors.New("certificate is expired") + ErrNotCA = errors.New("certificate is not a CA") + ErrNotSelfSigned = errors.New("certificate is not self-signed") + ErrBlockListed = errors.New("certificate is in the block list") + ErrSignatureMismatch = errors.New("certificate signature did not match") + ErrInvalidPEMCertificateUnsupported = errors.New("bytes contain an unsupported certificate format") ) diff --git a/pki.go b/pki.go index ab95a047..e5845d1c 100644 --- a/pki.go +++ b/pki.go @@ -223,22 +223,13 @@ func loadCAPoolFromConfig(l *logrus.Logger, c *config.C) (*cert.NebulaCAPool, er } } - caPool, err := cert.NewCAPoolFromBytes(rawCA) - if errors.Is(err, cert.ErrExpired) { - var expired int - for _, crt := range caPool.CAs { - if crt.Expired(time.Now()) { - expired++ - l.WithField("cert", crt).Warn("expired certificate present in CA pool") - } - } + caPool, warnings, err := cert.NewCAPoolFromBytes(rawCA) + for _, w := range warnings { + l.WithError(w).Warn("parsing a CA certificate failed") + } - if expired >= len(caPool.CAs) { - return nil, errors.New("no valid CA certificates present") - } - - } else if err != nil { - return nil, fmt.Errorf("error while adding CA certificate to CA trust store: %s", err) + if err != nil { + return nil, fmt.Errorf("could not create CA certificate pool: %s", err) } for _, fp := range c.GetStringSlice("pki.blocklist", []string{}) { From b55b9019a72c7320b5c6dd54d2c31ad89f1e8990 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Fri, 6 Dec 2024 08:50:24 -0600 Subject: [PATCH 03/15] v1.9.5 (#1285) Update CHANGELOG for Nebula v1.9.5 --- CHANGELOG.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad171478..b33b0ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.9.5] - 2024-12-05 + +### Added + +- Gracefully ignore v2 certificates. (#1282) + +### Fixed + +- Fix relays that refuse to re-establish after one of the remote tunnel pairs breaks. (#1277) + ## [1.9.4] - 2024-09-09 ### Added @@ -664,7 +674,8 @@ created.) - Initial public release. -[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.4...HEAD +[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.5...HEAD +[1.9.5]: https://github.com/slackhq/nebula/releases/tag/v1.9.5 [1.9.4]: https://github.com/slackhq/nebula/releases/tag/v1.9.4 [1.9.3]: https://github.com/slackhq/nebula/releases/tag/v1.9.3 [1.9.2]: https://github.com/slackhq/nebula/releases/tag/v1.9.2 From 04d7a8ccba01974f1d476923eb38747bb9a5ad61 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Tue, 13 May 2025 14:58:37 -0400 Subject: [PATCH 04/15] Retry UDP receive on Windows in some receive error cases (#1404) --- udp/udp_rio_windows.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index ee7e1e00..87cf655b 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -14,6 +14,7 @@ import ( "sync" "sync/atomic" "syscall" + "time" "unsafe" "github.com/sirupsen/logrus" @@ -125,14 +126,28 @@ func (u *RIOConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firew fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) + consecutiveErrors := 0 for { // Just read one packet at a time n, rua, err := u.receive(buffer) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + if errors.Is(err, net.ErrClosed) { + u.l.WithError(err).Debug("udp socket is closed, exiting read loop") + return + } + // Try to suss out whether this is a transient error or something more permanent + consecutiveErrors++ + u.l.WithError(err).WithField("consecutiveErrors", consecutiveErrors).Error("unexpected udp socket recieve error") + if consecutiveErrors > 15 { + panic("too many consecutive UDP receive errors") + } else if consecutiveErrors > 10 { + time.Sleep(100 * time.Millisecond) + } + continue } + consecutiveErrors = 0 + r( netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), plaintext[:0], From 8c29b15c6da49bd2c671d85f3b6447706ce36b33 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Tue, 13 May 2025 14:58:58 -0400 Subject: [PATCH 05/15] fix relay migration panic (#1403) --- connection_manager.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/connection_manager.go b/connection_manager.go index d2e86164..3b099f0b 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "net/netip" "sync" "time" @@ -227,21 +228,25 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) var relayFrom netip.Addr var relayTo netip.Addr switch { - case ok && existing.State == Established: - // This relay already exists in newhostinfo, then do nothing. - continue - case ok && existing.State == Requested: - // The relay exists in a Requested state; re-send the request - index = existing.LocalIndex - switch r.Type { - case TerminalType: - relayFrom = n.intf.myVpnNet.Addr() - relayTo = existing.PeerIp - case ForwardingType: - relayFrom = existing.PeerIp - relayTo = newhostinfo.vpnIp - default: - // should never happen + case ok: + switch existing.State { + case Established, PeerRequested, Disestablished: + // This relay already exists in newhostinfo, then do nothing. + continue + case Requested: + // The relayed connection exists in a Requested state; re-send the request + index = existing.LocalIndex + switch r.Type { + case TerminalType: + relayFrom = n.intf.myVpnNet.Addr() + relayTo = existing.PeerIp + case ForwardingType: + relayFrom = existing.PeerIp + relayTo = newhostinfo.vpnIp + default: + // should never happen + panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type)) + } } case !ok: n.relayUsedLock.RLock() @@ -267,6 +272,7 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) relayTo = newhostinfo.vpnIp default: // should never happen + panic(fmt.Sprintf("Migrating unknown relay type: %v", r.Type)) } } From 8e0a7bcbb7d24a3a3b123d47a2e2e65f17aef352 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Thu, 22 May 2025 08:55:45 -0400 Subject: [PATCH 06/15] Disable UDP receive error returns due to ICMP messages on Windows. (#1412) --- udp/udp_rio_windows.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index 87cf655b..045ae8e7 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -14,7 +14,6 @@ import ( "sync" "sync/atomic" "syscall" - "time" "unsafe" "github.com/sirupsen/logrus" @@ -96,6 +95,25 @@ func (u *RIOConn) bind(sa windows.Sockaddr) error { // Enable v4 for this socket syscall.SetsockoptInt(syscall.Handle(u.sock), syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, 0) + // Disable reporting of PORT_UNREACHABLE and NET_UNREACHABLE errors from the UDP socket receive call. + // These errors are returned on Windows during UDP receives based on the receipt of ICMP packets. Disable + // the UDP receive error returns with these ioctl calls. + ret := uint32(0) + flag := uint32(0) + size := uint32(unsafe.Sizeof(flag)) + err = syscall.WSAIoctl(syscall.Handle(u.sock), syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) + if err != nil { + return err + } + ret = 0 + flag = 0 + size = uint32(unsafe.Sizeof(flag)) + SIO_UDP_NETRESET := uint32(syscall.IOC_IN | syscall.IOC_VENDOR | 15) + err = syscall.WSAIoctl(syscall.Handle(u.sock), SIO_UDP_NETRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) + if err != nil { + return err + } + err = u.rx.Open() if err != nil { return err @@ -126,7 +144,6 @@ func (u *RIOConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firew fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) - consecutiveErrors := 0 for { // Just read one packet at a time n, rua, err := u.receive(buffer) @@ -135,19 +152,10 @@ func (u *RIOConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firew u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return } - // Try to suss out whether this is a transient error or something more permanent - consecutiveErrors++ - u.l.WithError(err).WithField("consecutiveErrors", consecutiveErrors).Error("unexpected udp socket recieve error") - if consecutiveErrors > 15 { - panic("too many consecutive UDP receive errors") - } else if consecutiveErrors > 10 { - time.Sleep(100 * time.Millisecond) - } + u.l.WithError(err).Error("unexpected udp socket receive error") continue } - consecutiveErrors = 0 - r( netip.AddrPortFrom(netip.AddrFrom16(rua.Addr).Unmap(), (rua.Port>>8)|((rua.Port&0xff)<<8)), plaintext[:0], From 9877648da91c3b34ea8e2ba2ae1aa2e9b2043262 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Mon, 23 Jun 2025 11:32:50 -0500 Subject: [PATCH 07/15] Drop inactive tunnels (#1413) --- connection_manager.go | 377 ++++++++++++++++++++----------------- connection_manager_test.go | 175 +++++++++++++---- control.go | 20 +- e2e/handshakes_test.go | 8 +- e2e/router/router.go | 1 + e2e/tunnels_test.go | 55 ++++++ examples/config.yml | 12 ++ handshake_ix.go | 4 +- hostmap.go | 8 + inside.go | 4 +- interface.go | 41 ++-- main.go | 46 +++-- outside.go | 6 +- 13 files changed, 480 insertions(+), 277 deletions(-) create mode 100644 e2e/tunnels_test.go diff --git a/connection_manager.go b/connection_manager.go index 3b099f0b..b22dfc08 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -7,11 +7,13 @@ import ( "fmt" "net/netip" "sync" + "sync/atomic" "time" "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/cert" + "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/header" ) @@ -28,130 +30,124 @@ const ( ) type connectionManager struct { - in map[uint32]struct{} - inLock *sync.RWMutex - - out map[uint32]struct{} - outLock *sync.RWMutex - // relayUsed holds which relay localIndexs are in use relayUsed map[uint32]struct{} relayUsedLock *sync.RWMutex - hostMap *HostMap - trafficTimer *LockingTimerWheel[uint32] - intf *Interface - pendingDeletion map[uint32]struct{} - punchy *Punchy + hostMap *HostMap + trafficTimer *LockingTimerWheel[uint32] + intf *Interface + punchy *Punchy + + // Configuration settings checkInterval time.Duration pendingDeletionInterval time.Duration - metricsTxPunchy metrics.Counter + inactivityTimeout atomic.Int64 + dropInactive atomic.Bool + + metricsTxPunchy metrics.Counter l *logrus.Logger } -func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval time.Duration, punchy *Punchy) *connectionManager { - var max time.Duration - if checkInterval < pendingDeletionInterval { - max = pendingDeletionInterval - } else { - max = checkInterval +func newConnectionManagerFromConfig(l *logrus.Logger, c *config.C, hm *HostMap, p *Punchy) *connectionManager { + cm := &connectionManager{ + hostMap: hm, + l: l, + punchy: p, + relayUsed: make(map[uint32]struct{}), + relayUsedLock: &sync.RWMutex{}, + metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil), } - nc := &connectionManager{ - hostMap: intf.hostMap, - in: make(map[uint32]struct{}), - inLock: &sync.RWMutex{}, - out: make(map[uint32]struct{}), - outLock: &sync.RWMutex{}, - relayUsed: make(map[uint32]struct{}), - relayUsedLock: &sync.RWMutex{}, - trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max), - intf: intf, - pendingDeletion: make(map[uint32]struct{}), - checkInterval: checkInterval, - pendingDeletionInterval: pendingDeletionInterval, - punchy: punchy, - metricsTxPunchy: metrics.GetOrRegisterCounter("messages.tx.punchy", nil), - l: l, - } + cm.reload(c, true) + c.RegisterReloadCallback(func(c *config.C) { + cm.reload(c, false) + }) - nc.Start(ctx) - return nc + return cm } -func (n *connectionManager) In(localIndex uint32) { - n.inLock.RLock() - // If this already exists, return - if _, ok := n.in[localIndex]; ok { - n.inLock.RUnlock() - return +func (cm *connectionManager) reload(c *config.C, initial bool) { + if initial { + cm.checkInterval = time.Duration(c.GetInt("timers.connection_alive_interval", 5)) * time.Second + cm.pendingDeletionInterval = time.Duration(c.GetInt("timers.pending_deletion_interval", 10)) * time.Second + + // We want at least a minimum resolution of 500ms per tick so that we can hit these intervals + // pretty close to their configured duration. + // The inactivity duration is checked each time a hostinfo ticks through so we don't need the wheel to contain it. + minDuration := min(time.Millisecond*500, cm.checkInterval, cm.pendingDeletionInterval) + maxDuration := max(cm.checkInterval, cm.pendingDeletionInterval) + cm.trafficTimer = NewLockingTimerWheel[uint32](minDuration, maxDuration) + } + + if initial || c.HasChanged("tunnels.inactivity_timeout") { + old := cm.getInactivityTimeout() + cm.inactivityTimeout.Store((int64)(c.GetDuration("tunnels.inactivity_timeout", 10*time.Minute))) + if !initial { + cm.l.WithField("oldDuration", old). + WithField("newDuration", cm.getInactivityTimeout()). + Info("Inactivity timeout has changed") + } + } + + if initial || c.HasChanged("tunnels.drop_inactive") { + old := cm.dropInactive.Load() + cm.dropInactive.Store(c.GetBool("tunnels.drop_inactive", false)) + if !initial { + cm.l.WithField("oldBool", old). + WithField("newBool", cm.dropInactive.Load()). + Info("Drop inactive setting has changed") + } } - n.inLock.RUnlock() - n.inLock.Lock() - n.in[localIndex] = struct{}{} - n.inLock.Unlock() } -func (n *connectionManager) Out(localIndex uint32) { - n.outLock.RLock() - // If this already exists, return - if _, ok := n.out[localIndex]; ok { - n.outLock.RUnlock() - return - } - n.outLock.RUnlock() - n.outLock.Lock() - n.out[localIndex] = struct{}{} - n.outLock.Unlock() +func (cm *connectionManager) getInactivityTimeout() time.Duration { + return (time.Duration)(cm.inactivityTimeout.Load()) } -func (n *connectionManager) RelayUsed(localIndex uint32) { - n.relayUsedLock.RLock() +func (cm *connectionManager) In(h *HostInfo) { + h.in.Store(true) +} + +func (cm *connectionManager) Out(h *HostInfo) { + h.out.Store(true) +} + +func (cm *connectionManager) RelayUsed(localIndex uint32) { + cm.relayUsedLock.RLock() // If this already exists, return - if _, ok := n.relayUsed[localIndex]; ok { - n.relayUsedLock.RUnlock() + if _, ok := cm.relayUsed[localIndex]; ok { + cm.relayUsedLock.RUnlock() return } - n.relayUsedLock.RUnlock() - n.relayUsedLock.Lock() - n.relayUsed[localIndex] = struct{}{} - n.relayUsedLock.Unlock() + cm.relayUsedLock.RUnlock() + cm.relayUsedLock.Lock() + cm.relayUsed[localIndex] = struct{}{} + cm.relayUsedLock.Unlock() } // getAndResetTrafficCheck returns if there was any inbound or outbound traffic within the last tick and // resets the state for this local index -func (n *connectionManager) getAndResetTrafficCheck(localIndex uint32) (bool, bool) { - n.inLock.Lock() - n.outLock.Lock() - _, in := n.in[localIndex] - _, out := n.out[localIndex] - delete(n.in, localIndex) - delete(n.out, localIndex) - n.inLock.Unlock() - n.outLock.Unlock() +func (cm *connectionManager) getAndResetTrafficCheck(h *HostInfo, now time.Time) (bool, bool) { + in := h.in.Swap(false) + out := h.out.Swap(false) + if in || out { + h.lastUsed = now + } return in, out } -func (n *connectionManager) AddTrafficWatch(localIndex uint32) { - // Use a write lock directly because it should be incredibly rare that we are ever already tracking this index - n.outLock.Lock() - if _, ok := n.out[localIndex]; ok { - n.outLock.Unlock() - return +// AddTrafficWatch must be called for every new HostInfo. +// We will continue to monitor the HostInfo until the tunnel is dropped. +func (cm *connectionManager) AddTrafficWatch(h *HostInfo) { + if h.out.Swap(true) == false { + cm.trafficTimer.Add(h.localIndexId, cm.checkInterval) } - n.out[localIndex] = struct{}{} - n.trafficTimer.Add(localIndex, n.checkInterval) - n.outLock.Unlock() } -func (n *connectionManager) Start(ctx context.Context) { - go n.Run(ctx) -} - -func (n *connectionManager) Run(ctx context.Context) { - //TODO: this tick should be based on the min wheel tick? Check firewall - clockSource := time.NewTicker(500 * time.Millisecond) +func (cm *connectionManager) Start(ctx context.Context) { + clockSource := time.NewTicker(cm.trafficTimer.t.tickDuration) defer clockSource.Stop() p := []byte("") @@ -164,61 +160,61 @@ func (n *connectionManager) Run(ctx context.Context) { return case now := <-clockSource.C: - n.trafficTimer.Advance(now) + cm.trafficTimer.Advance(now) for { - localIndex, has := n.trafficTimer.Purge() + localIndex, has := cm.trafficTimer.Purge() if !has { break } - n.doTrafficCheck(localIndex, p, nb, out, now) + cm.doTrafficCheck(localIndex, p, nb, out, now) } } } } -func (n *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) { - decision, hostinfo, primary := n.makeTrafficDecision(localIndex, now) +func (cm *connectionManager) doTrafficCheck(localIndex uint32, p, nb, out []byte, now time.Time) { + decision, hostinfo, primary := cm.makeTrafficDecision(localIndex, now) switch decision { case deleteTunnel: - if n.hostMap.DeleteHostInfo(hostinfo) { + if cm.hostMap.DeleteHostInfo(hostinfo) { // Only clearing the lighthouse cache if this is the last hostinfo for this vpn ip in the hostmap - n.intf.lightHouse.DeleteVpnIp(hostinfo.vpnIp) + cm.intf.lightHouse.DeleteVpnIp(hostinfo.vpnIp) } case closeTunnel: - n.intf.sendCloseTunnel(hostinfo) - n.intf.closeTunnel(hostinfo) + cm.intf.sendCloseTunnel(hostinfo) + cm.intf.closeTunnel(hostinfo) case swapPrimary: - n.swapPrimary(hostinfo, primary) + cm.swapPrimary(hostinfo, primary) case migrateRelays: - n.migrateRelayUsed(hostinfo, primary) + cm.migrateRelayUsed(hostinfo, primary) case tryRehandshake: - n.tryRehandshake(hostinfo) + cm.tryRehandshake(hostinfo) case sendTestPacket: - n.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out) + cm.intf.SendMessageToHostInfo(header.Test, header.TestRequest, hostinfo, p, nb, out) } - n.resetRelayTrafficCheck(hostinfo) + cm.resetRelayTrafficCheck(hostinfo) } -func (n *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) { +func (cm *connectionManager) resetRelayTrafficCheck(hostinfo *HostInfo) { if hostinfo != nil { - n.relayUsedLock.Lock() - defer n.relayUsedLock.Unlock() + cm.relayUsedLock.Lock() + defer cm.relayUsedLock.Unlock() // No need to migrate any relays, delete usage info now. for _, idx := range hostinfo.relayState.CopyRelayForIdxs() { - delete(n.relayUsed, idx) + delete(cm.relayUsed, idx) } } } -func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) { +func (cm *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) { relayFor := oldhostinfo.relayState.CopyAllRelayFor() for _, r := range relayFor { @@ -238,7 +234,7 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) index = existing.LocalIndex switch r.Type { case TerminalType: - relayFrom = n.intf.myVpnNet.Addr() + relayFrom = cm.intf.myVpnNet.Addr() relayTo = existing.PeerIp case ForwardingType: relayFrom = existing.PeerIp @@ -249,23 +245,23 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) } } case !ok: - n.relayUsedLock.RLock() - if _, relayUsed := n.relayUsed[r.LocalIndex]; !relayUsed { + cm.relayUsedLock.RLock() + if _, relayUsed := cm.relayUsed[r.LocalIndex]; !relayUsed { // The relay hasn't been used; don't migrate it. - n.relayUsedLock.RUnlock() + cm.relayUsedLock.RUnlock() continue } - n.relayUsedLock.RUnlock() + cm.relayUsedLock.RUnlock() // The relay doesn't exist at all; create some relay state and send the request. var err error - index, err = AddRelay(n.l, newhostinfo, n.hostMap, r.PeerIp, nil, r.Type, Requested) + index, err = AddRelay(cm.l, newhostinfo, cm.hostMap, r.PeerIp, nil, r.Type, Requested) if err != nil { - n.l.WithError(err).Error("failed to migrate relay to new hostinfo") + cm.l.WithError(err).Error("failed to migrate relay to new hostinfo") continue } switch r.Type { case TerminalType: - relayFrom = n.intf.myVpnNet.Addr() + relayFrom = cm.intf.myVpnNet.Addr() relayTo = r.PeerIp case ForwardingType: relayFrom = r.PeerIp @@ -289,10 +285,10 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) } msg, err := req.Marshal() if err != nil { - n.l.WithError(err).Error("failed to marshal Control message to migrate relay") + cm.l.WithError(err).Error("failed to marshal Control message to migrate relay") } else { - n.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu)) - n.l.WithFields(logrus.Fields{ + cm.intf.SendMessageToHostInfo(header.Control, 0, newhostinfo, msg, make([]byte, 12), make([]byte, mtu)) + cm.l.WithFields(logrus.Fields{ "relayFrom": req.RelayFromIp, "relayTo": req.RelayToIp, "initiatorRelayIndex": req.InitiatorRelayIndex, @@ -303,46 +299,45 @@ func (n *connectionManager) migrateRelayUsed(oldhostinfo, newhostinfo *HostInfo) } } -func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) { - n.hostMap.RLock() - defer n.hostMap.RUnlock() +func (cm *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time) (trafficDecision, *HostInfo, *HostInfo) { + // Read lock the main hostmap to order decisions based on tunnels being the primary tunnel + cm.hostMap.RLock() + defer cm.hostMap.RUnlock() - hostinfo := n.hostMap.Indexes[localIndex] + hostinfo := cm.hostMap.Indexes[localIndex] if hostinfo == nil { - n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap") - delete(n.pendingDeletion, localIndex) + cm.l.WithField("localIndex", localIndex).Debugln("Not found in hostmap") return doNothing, nil, nil } - if n.isInvalidCertificate(now, hostinfo) { - delete(n.pendingDeletion, hostinfo.localIndexId) + if cm.isInvalidCertificate(now, hostinfo) { return closeTunnel, hostinfo, nil } - primary := n.hostMap.Hosts[hostinfo.vpnIp] + primary := cm.hostMap.Hosts[hostinfo.vpnIp] mainHostInfo := true if primary != nil && primary != hostinfo { mainHostInfo = false } // Check for traffic on this hostinfo - inTraffic, outTraffic := n.getAndResetTrafficCheck(localIndex) + inTraffic, outTraffic := cm.getAndResetTrafficCheck(hostinfo, now) // A hostinfo is determined alive if there is incoming traffic if inTraffic { decision := doNothing - if n.l.Level >= logrus.DebugLevel { - hostinfo.logger(n.l). + if cm.l.Level >= logrus.DebugLevel { + hostinfo.logger(cm.l). WithField("tunnelCheck", m{"state": "alive", "method": "passive"}). Debug("Tunnel status") } - delete(n.pendingDeletion, hostinfo.localIndexId) + hostinfo.pendingDeletion.Store(false) if mainHostInfo { decision = tryRehandshake } else { - if n.shouldSwapPrimary(hostinfo, primary) { + if cm.shouldSwapPrimary(hostinfo, primary) { decision = swapPrimary } else { // migrate the relays to the primary, if in use. @@ -350,46 +345,55 @@ func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time } } - n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval) + cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval) if !outTraffic { // Send a punch packet to keep the NAT state alive - n.sendPunch(hostinfo) + cm.sendPunch(hostinfo) } return decision, hostinfo, primary } - if _, ok := n.pendingDeletion[hostinfo.localIndexId]; ok { + if hostinfo.pendingDeletion.Load() { // We have already sent a test packet and nothing was returned, this hostinfo is dead - hostinfo.logger(n.l). + hostinfo.logger(cm.l). WithField("tunnelCheck", m{"state": "dead", "method": "active"}). Info("Tunnel status") - delete(n.pendingDeletion, hostinfo.localIndexId) return deleteTunnel, hostinfo, nil } decision := doNothing if hostinfo != nil && hostinfo.ConnectionState != nil && mainHostInfo { if !outTraffic { + inactiveFor, isInactive := cm.isInactive(hostinfo, now) + if isInactive { + // Tunnel is inactive, tear it down + hostinfo.logger(cm.l). + WithField("inactiveDuration", inactiveFor). + WithField("primary", mainHostInfo). + Info("Dropping tunnel due to inactivity") + + return closeTunnel, hostinfo, primary + } + // If we aren't sending or receiving traffic then its an unused tunnel and we don't to test the tunnel. // Just maintain NAT state if configured to do so. - n.sendPunch(hostinfo) - n.trafficTimer.Add(hostinfo.localIndexId, n.checkInterval) + cm.sendPunch(hostinfo) + cm.trafficTimer.Add(hostinfo.localIndexId, cm.checkInterval) return doNothing, nil, nil - } - if n.punchy.GetTargetEverything() { + if cm.punchy.GetTargetEverything() { // This is similar to the old punchy behavior with a slight optimization. // We aren't receiving traffic but we are sending it, punch on all known // ips in case we need to re-prime NAT state - n.sendPunch(hostinfo) + cm.sendPunch(hostinfo) } - if n.l.Level >= logrus.DebugLevel { - hostinfo.logger(n.l). + if cm.l.Level >= logrus.DebugLevel { + hostinfo.logger(cm.l). WithField("tunnelCheck", m{"state": "testing", "method": "active"}). Debug("Tunnel status") } @@ -398,95 +402,118 @@ func (n *connectionManager) makeTrafficDecision(localIndex uint32, now time.Time decision = sendTestPacket } else { - if n.l.Level >= logrus.DebugLevel { - hostinfo.logger(n.l).Debugf("Hostinfo sadness") + if cm.l.Level >= logrus.DebugLevel { + hostinfo.logger(cm.l).Debugf("Hostinfo sadness") } } - n.pendingDeletion[hostinfo.localIndexId] = struct{}{} - n.trafficTimer.Add(hostinfo.localIndexId, n.pendingDeletionInterval) + hostinfo.pendingDeletion.Store(true) + cm.trafficTimer.Add(hostinfo.localIndexId, cm.pendingDeletionInterval) return decision, hostinfo, nil } -func (n *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool { +func (cm *connectionManager) isInactive(hostinfo *HostInfo, now time.Time) (time.Duration, bool) { + if cm.dropInactive.Load() == false { + // We aren't configured to drop inactive tunnels + return 0, false + } + + inactiveDuration := now.Sub(hostinfo.lastUsed) + if inactiveDuration < cm.getInactivityTimeout() { + // It's not considered inactive + return inactiveDuration, false + } + + // The tunnel is inactive + return inactiveDuration, true +} + +func (cm *connectionManager) shouldSwapPrimary(current, primary *HostInfo) bool { // The primary tunnel is the most recent handshake to complete locally and should work entirely fine. // If we are here then we have multiple tunnels for a host pair and neither side believes the same tunnel is primary. // Let's sort this out. - if current.vpnIp.Compare(n.intf.myVpnNet.Addr()) < 0 { + if current.vpnIp.Compare(cm.intf.myVpnNet.Addr()) < 0 { // Only one side should flip primary because if both flip then we may never resolve to a single tunnel. // vpn ip is static across all tunnels for this host pair so lets use that to determine who is flipping. // The remotes vpn ip is lower than mine. I will not flip. return false } - certState := n.intf.pki.GetCertState() + certState := cm.intf.pki.GetCertState() return bytes.Equal(current.ConnectionState.myCert.Signature, certState.Certificate.Signature) } -func (n *connectionManager) swapPrimary(current, primary *HostInfo) { - n.hostMap.Lock() +func (cm *connectionManager) swapPrimary(current, primary *HostInfo) { + cm.hostMap.Lock() // Make sure the primary is still the same after the write lock. This avoids a race with a rehandshake. - if n.hostMap.Hosts[current.vpnIp] == primary { - n.hostMap.unlockedMakePrimary(current) + if cm.hostMap.Hosts[current.vpnIp] == primary { + cm.hostMap.unlockedMakePrimary(current) } - n.hostMap.Unlock() + cm.hostMap.Unlock() } // isInvalidCertificate will check if we should destroy a tunnel if pki.disconnect_invalid is true and // the certificate is no longer valid. Block listed certificates will skip the pki.disconnect_invalid // check and return true. -func (n *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool { +func (cm *connectionManager) isInvalidCertificate(now time.Time, hostinfo *HostInfo) bool { remoteCert := hostinfo.GetCert() if remoteCert == nil { return false } - valid, err := remoteCert.VerifyWithCache(now, n.intf.pki.GetCAPool()) + valid, err := remoteCert.VerifyWithCache(now, cm.intf.pki.GetCAPool()) if valid { return false } - if !n.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed { + if !cm.intf.disconnectInvalid.Load() && err != cert.ErrBlockListed { // Block listed certificates should always be disconnected return false } fingerprint, _ := remoteCert.Sha256Sum() - hostinfo.logger(n.l).WithError(err). + hostinfo.logger(cm.l).WithError(err). WithField("fingerprint", fingerprint). Info("Remote certificate is no longer valid, tearing down the tunnel") return true } -func (n *connectionManager) sendPunch(hostinfo *HostInfo) { - if !n.punchy.GetPunch() { +func (cm *connectionManager) sendPunch(hostinfo *HostInfo) { + if !cm.punchy.GetPunch() { // Punching is disabled return } - if n.punchy.GetTargetEverything() { - hostinfo.remotes.ForEach(n.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) { - n.metricsTxPunchy.Inc(1) - n.intf.outside.WriteTo([]byte{1}, addr) + if cm.intf.lightHouse.IsLighthouseIP(hostinfo.vpnIp) { + // Do not punch to lighthouses, we assume our lighthouse update interval is good enough. + // In the event the update interval is not sufficient to maintain NAT state then a publicly available lighthouse + // would lose the ability to notify us and punchy.respond would become unreliable. + return + } + + if cm.punchy.GetTargetEverything() { + hostinfo.remotes.ForEach(cm.hostMap.GetPreferredRanges(), func(addr netip.AddrPort, preferred bool) { + cm.metricsTxPunchy.Inc(1) + cm.intf.outside.WriteTo([]byte{1}, addr) }) } else if hostinfo.remote.IsValid() { - n.metricsTxPunchy.Inc(1) - n.intf.outside.WriteTo([]byte{1}, hostinfo.remote) + cm.metricsTxPunchy.Inc(1) + cm.intf.outside.WriteTo([]byte{1}, hostinfo.remote) } } -func (n *connectionManager) tryRehandshake(hostinfo *HostInfo) { - certState := n.intf.pki.GetCertState() +func (cm *connectionManager) tryRehandshake(hostinfo *HostInfo) { + certState := cm.intf.pki.GetCertState() if bytes.Equal(hostinfo.ConnectionState.myCert.Signature, certState.Certificate.Signature) { return } - n.l.WithField("vpnIp", hostinfo.vpnIp). + cm.l.WithField("vpnIp", hostinfo.vpnIp). WithField("reason", "local certificate is not current"). Info("Re-handshaking with remote") - n.intf.handshakeManager.StartHandshake(hostinfo.vpnIp, nil) + cm.intf.handshakeManager.StartHandshake(hostinfo.vpnIp, nil) } diff --git a/connection_manager_test.go b/connection_manager_test.go index 5f97cad9..1916f892 100644 --- a/connection_manager_test.go +++ b/connection_manager_test.go @@ -1,7 +1,6 @@ package nebula import ( - "context" "crypto/ed25519" "crypto/rand" "net" @@ -65,10 +64,10 @@ func Test_NewConnectionManagerTest(t *testing.T) { ifce.pki.cs.Store(cs) // Create manager - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - punchy := NewPunchyFromConfig(l, config.NewC(l)) - nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy) + conf := config.NewC(l) + punchy := NewPunchyFromConfig(l, conf) + nc := newConnectionManagerFromConfig(l, conf, hostMap, punchy) + nc.intf = ifce p := []byte("") nb := make([]byte, 12, 12) out := make([]byte, mtu) @@ -86,31 +85,32 @@ func Test_NewConnectionManagerTest(t *testing.T) { nc.hostMap.unlockedAddHostInfo(hostinfo, ifce) // We saw traffic out to vpnIp - nc.Out(hostinfo.localIndexId) - nc.In(hostinfo.localIndexId) - assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId) + nc.Out(hostinfo) + nc.In(hostinfo) + assert.False(t, hostinfo.pendingDeletion.Load()) assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) - assert.Contains(t, nc.out, hostinfo.localIndexId) + assert.True(t, hostinfo.out.Load()) + assert.True(t, hostinfo.in.Load()) // Do a traffic check tick, should not be pending deletion but should not have any in/out packets recorded nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId) - assert.NotContains(t, nc.out, hostinfo.localIndexId) - assert.NotContains(t, nc.in, hostinfo.localIndexId) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) // Do another traffic check tick, this host should be pending deletion now - nc.Out(hostinfo.localIndexId) + nc.Out(hostinfo) + assert.True(t, hostinfo.out.Load()) nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.Contains(t, nc.pendingDeletion, hostinfo.localIndexId) - assert.NotContains(t, nc.out, hostinfo.localIndexId) - assert.NotContains(t, nc.in, hostinfo.localIndexId) + assert.True(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) // Do a final traffic check tick, the host should now be removed nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId) assert.NotContains(t, nc.hostMap.Hosts, hostinfo.vpnIp) assert.NotContains(t, nc.hostMap.Indexes, hostinfo.localIndexId) } @@ -148,10 +148,10 @@ func Test_NewConnectionManagerTest2(t *testing.T) { ifce.pki.cs.Store(cs) // Create manager - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - punchy := NewPunchyFromConfig(l, config.NewC(l)) - nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy) + conf := config.NewC(l) + punchy := NewPunchyFromConfig(l, conf) + nc := newConnectionManagerFromConfig(l, conf, hostMap, punchy) + nc.intf = ifce p := []byte("") nb := make([]byte, 12, 12) out := make([]byte, mtu) @@ -169,33 +169,130 @@ func Test_NewConnectionManagerTest2(t *testing.T) { nc.hostMap.unlockedAddHostInfo(hostinfo, ifce) // We saw traffic out to vpnIp - nc.Out(hostinfo.localIndexId) - nc.In(hostinfo.localIndexId) - assert.NotContains(t, nc.pendingDeletion, hostinfo.vpnIp) + nc.Out(hostinfo) + nc.In(hostinfo) + assert.True(t, hostinfo.in.Load()) + assert.True(t, hostinfo.out.Load()) + assert.False(t, hostinfo.pendingDeletion.Load()) assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) // Do a traffic check tick, should not be pending deletion but should not have any in/out packets recorded nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId) - assert.NotContains(t, nc.out, hostinfo.localIndexId) - assert.NotContains(t, nc.in, hostinfo.localIndexId) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) // Do another traffic check tick, this host should be pending deletion now - nc.Out(hostinfo.localIndexId) + nc.Out(hostinfo) nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.Contains(t, nc.pendingDeletion, hostinfo.localIndexId) - assert.NotContains(t, nc.out, hostinfo.localIndexId) - assert.NotContains(t, nc.in, hostinfo.localIndexId) + assert.True(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) // We saw traffic, should no longer be pending deletion - nc.In(hostinfo.localIndexId) + nc.In(hostinfo) nc.doTrafficCheck(hostinfo.localIndexId, p, nb, out, time.Now()) - assert.NotContains(t, nc.pendingDeletion, hostinfo.localIndexId) - assert.NotContains(t, nc.out, hostinfo.localIndexId) - assert.NotContains(t, nc.in, hostinfo.localIndexId) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) + assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) + assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) +} + +func Test_NewConnectionManager_DisconnectInactive(t *testing.T) { + l := test.NewLogger() + vpncidr := netip.MustParsePrefix("172.1.1.1/24") + localrange := netip.MustParsePrefix("10.1.1.1/24") + vpnIp := netip.MustParseAddr("172.1.1.2") + preferredRanges := []netip.Prefix{localrange} + + // Very incomplete mock objects + hostMap := newHostMap(l, vpncidr) + hostMap.preferredRanges.Store(&preferredRanges) + + cs := &CertState{ + RawCertificate: []byte{}, + PrivateKey: []byte{}, + Certificate: &cert.NebulaCertificate{}, + RawCertificateNoKey: []byte{}, + } + + lh := newTestLighthouse() + ifce := &Interface{ + hostMap: hostMap, + inside: &test.NoopTun{}, + outside: &udp.NoopConn{}, + firewall: &Firewall{}, + lightHouse: lh, + pki: &PKI{}, + handshakeManager: NewHandshakeManager(l, hostMap, lh, &udp.NoopConn{}, defaultHandshakeConfig), + l: l, + } + ifce.pki.cs.Store(cs) + + // Create manager + conf := config.NewC(l) + conf.Settings["tunnels"] = map[interface{}]interface{}{ + "drop_inactive": true, + } + punchy := NewPunchyFromConfig(l, conf) + nc := newConnectionManagerFromConfig(l, conf, hostMap, punchy) + assert.True(t, nc.dropInactive.Load()) + nc.intf = ifce + + // Add an ip we have established a connection w/ to hostmap + hostinfo := &HostInfo{ + vpnIp: vpnIp, + localIndexId: 1099, + remoteIndexId: 9901, + } + hostinfo.ConnectionState = &ConnectionState{ + myCert: &cert.NebulaCertificate{}, + H: &noise.HandshakeState{}, + } + nc.hostMap.unlockedAddHostInfo(hostinfo, ifce) + + // Do a traffic check tick, in and out should be cleared but should not be pending deletion + nc.Out(hostinfo) + nc.In(hostinfo) + assert.True(t, hostinfo.out.Load()) + assert.True(t, hostinfo.in.Load()) + + now := time.Now() + decision, _, _ := nc.makeTrafficDecision(hostinfo.localIndexId, now) + assert.Equal(t, tryRehandshake, decision) + assert.Equal(t, now, hostinfo.lastUsed) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) + + decision, _, _ = nc.makeTrafficDecision(hostinfo.localIndexId, now.Add(time.Second*5)) + assert.Equal(t, doNothing, decision) + assert.Equal(t, now, hostinfo.lastUsed) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) + + // Do another traffic check tick, should still not be pending deletion + decision, _, _ = nc.makeTrafficDecision(hostinfo.localIndexId, now.Add(time.Second*10)) + assert.Equal(t, doNothing, decision) + assert.Equal(t, now, hostinfo.lastUsed) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) + assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) + assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) + + // Finally advance beyond the inactivity timeout + decision, _, _ = nc.makeTrafficDecision(hostinfo.localIndexId, now.Add(time.Minute*10)) + assert.Equal(t, closeTunnel, decision) + assert.Equal(t, now, hostinfo.lastUsed) + assert.False(t, hostinfo.pendingDeletion.Load()) + assert.False(t, hostinfo.out.Load()) + assert.False(t, hostinfo.in.Load()) assert.Contains(t, nc.hostMap.Indexes, hostinfo.localIndexId) assert.Contains(t, nc.hostMap.Hosts, hostinfo.vpnIp) } @@ -273,10 +370,10 @@ func Test_NewConnectionManagerTest_DisconnectInvalid(t *testing.T) { ifce.disconnectInvalid.Store(true) // Create manager - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - punchy := NewPunchyFromConfig(l, config.NewC(l)) - nc := newConnectionManager(ctx, l, ifce, 5, 10, punchy) + conf := config.NewC(l) + punchy := NewPunchyFromConfig(l, conf) + nc := newConnectionManagerFromConfig(l, conf, hostMap, punchy) + nc.intf = ifce ifce.connectionManager = nc hostinfo := &HostInfo{ diff --git a/control.go b/control.go index 3468b353..8c9123e0 100644 --- a/control.go +++ b/control.go @@ -26,14 +26,15 @@ type controlHostLister interface { } type Control struct { - f *Interface - l *logrus.Logger - ctx context.Context - cancel context.CancelFunc - sshStart func() - statsStart func() - dnsStart func() - lighthouseStart func() + f *Interface + l *logrus.Logger + ctx context.Context + cancel context.CancelFunc + sshStart func() + statsStart func() + dnsStart func() + lighthouseStart func() + connectionManagerStart func(context.Context) } type ControlHostInfo struct { @@ -63,6 +64,9 @@ func (c *Control) Start() { if c.dnsStart != nil { go c.dnsStart() } + if c.connectionManagerStart != nil { + go c.connectionManagerStart(c.ctx) + } if c.lighthouseStart != nil { c.lighthouseStart() } diff --git a/e2e/handshakes_test.go b/e2e/handshakes_test.go index 7267dd26..58705a77 100644 --- a/e2e/handshakes_test.go +++ b/e2e/handshakes_test.go @@ -4,7 +4,6 @@ package e2e import ( - "fmt" "net/netip" "slices" "testing" @@ -414,7 +413,7 @@ func TestReestablishRelays(t *testing.T) { curIndexes := len(myControl.GetHostmap().Indexes) for curIndexes >= start { curIndexes = len(myControl.GetHostmap().Indexes) - r.Logf("Wait for the dead index to go away:start=%v indexes, currnet=%v indexes", start, curIndexes) + r.Logf("Wait for the dead index to go away:start=%v indexes, current=%v indexes", start, curIndexes) myControl.InjectTunUDPPacket(theirVpnIpNet.Addr(), 80, 80, []byte("Hi from me should fail")) r.RouteForAllExitFunc(func(p *udp.Packet, c *nebula.Control) router.ExitType { @@ -964,9 +963,8 @@ func TestRehandshakingLoser(t *testing.T) { t.Log("Stand up a tunnel between me and them") assertTunnel(t, myVpnIpNet.Addr(), theirVpnIpNet.Addr(), myControl, theirControl, r) - tt1 := myControl.GetHostInfoByVpnIp(theirVpnIpNet.Addr(), false) - tt2 := theirControl.GetHostInfoByVpnIp(myVpnIpNet.Addr(), false) - fmt.Println(tt1.LocalIndex, tt2.LocalIndex) + myControl.GetHostInfoByVpnIp(theirVpnIpNet.Addr(), false) + theirControl.GetHostInfoByVpnIp(myVpnIpNet.Addr(), false) r.RenderHostmaps("Starting hostmaps", myControl, theirControl) diff --git a/e2e/router/router.go b/e2e/router/router.go index 08905705..1e679401 100644 --- a/e2e/router/router.go +++ b/e2e/router/router.go @@ -690,6 +690,7 @@ func (r *R) FlushAll() { r.Unlock() panic("Can't FlushAll for host: " + p.To.String()) } + receiver.InjectUDPPacket(p) r.Unlock() } } diff --git a/e2e/tunnels_test.go b/e2e/tunnels_test.go new file mode 100644 index 00000000..b7a6b04d --- /dev/null +++ b/e2e/tunnels_test.go @@ -0,0 +1,55 @@ +//go:build e2e_testing +// +build e2e_testing + +package e2e + +import ( + "testing" + "time" + + "github.com/slackhq/nebula/e2e/router" +) + +func TestDropInactiveTunnels(t *testing.T) { + // The goal of this test is to ensure the shortest inactivity timeout will close the tunnel on both sides + // under ideal conditions + ca, _, caKey, _ := NewTestCaCert(time.Now(), time.Now().Add(10*time.Minute), nil, nil, []string{}) + myControl, myVpnIpNet, myUdpAddr, _ := newSimpleServer(ca, caKey, "me", "10.128.0.1/24", m{"tunnels": m{"drop_inactive": true, "inactivity_timeout": "5s"}}) + theirControl, theirVpnIpNet, theirUdpAddr, _ := newSimpleServer(ca, caKey, "them", "10.128.0.2/24", m{"tunnels": m{"drop_inactive": true, "inactivity_timeout": "10m"}}) + + // Share our underlay information + myControl.InjectLightHouseAddr(theirVpnIpNet.Addr(), theirUdpAddr) + theirControl.InjectLightHouseAddr(myVpnIpNet.Addr(), myUdpAddr) + + // Start the servers + myControl.Start() + theirControl.Start() + + r := router.NewR(t, myControl, theirControl) + + r.Log("Assert the tunnel between me and them works") + assertTunnel(t, myVpnIpNet.Addr(), theirVpnIpNet.Addr(), myControl, theirControl, r) + + r.Log("Go inactive and wait for the tunnels to get dropped") + waitStart := time.Now() + for { + myIndexes := len(myControl.GetHostmap().Indexes) + theirIndexes := len(theirControl.GetHostmap().Indexes) + if myIndexes == 0 && theirIndexes == 0 { + break + } + + since := time.Since(waitStart) + r.Logf("my tunnels: %v; their tunnels: %v; duration: %v", myIndexes, theirIndexes, since) + if since > time.Second*30 { + t.Fatal("Tunnel should have been declared inactive after 5 seconds and before 30 seconds") + } + + time.Sleep(1 * time.Second) + r.FlushAll() + } + + r.Logf("Inactive tunnels were dropped within %v", time.Since(waitStart)) + myControl.Stop() + theirControl.Stop() +} diff --git a/examples/config.yml b/examples/config.yml index c74ffc68..b5be25f3 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -303,6 +303,18 @@ logging: # after receiving the response for lighthouse queries #trigger_buffer: 64 +# Tunnel manager settings +#tunnels: + # drop_inactive controls whether inactive tunnels are maintained or dropped after the inactive_timeout period has + # elapsed. + # In general, it is a good idea to enable this setting. It will be enabled by default in a future release. + # This setting is reloadable + #drop_inactive: false + + # inactivity_timeout controls how long a tunnel MUST NOT see any inbound or outbound traffic before being considered + # inactive and eligible to be dropped. + # This setting is reloadable + #inactivity_timeout: 10m # Nebula security group configuration firewall: diff --git a/handshake_ix.go b/handshake_ix.go index 3e701b0d..150e1290 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -335,7 +335,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet Info("Handshake message sent") } - f.connectionManager.AddTrafficWatch(hostinfo.localIndexId) + f.connectionManager.AddTrafficWatch(hostinfo) hostinfo.remotes.ResetBlockedRemotes() @@ -493,7 +493,7 @@ func ixHandshakeStage2(f *Interface, addr netip.AddrPort, via *ViaSender, hh *Ha // Complete our handshake and update metrics, this will replace any existing tunnels for this vpnIp f.handshakeManager.Complete(hostinfo, f) - f.connectionManager.AddTrafficWatch(hostinfo.localIndexId) + f.connectionManager.AddTrafficWatch(hostinfo) if f.l.Level >= logrus.DebugLevel { hostinfo.logger(f.l).Debugf("Sending %d stored packets", len(hh.packetStore)) diff --git a/hostmap.go b/hostmap.go index 7258282d..f7da0ad1 100644 --- a/hostmap.go +++ b/hostmap.go @@ -242,6 +242,14 @@ type HostInfo struct { // Used to track other hostinfos for this vpn ip since only 1 can be primary // Synchronised via hostmap lock and not the hostinfo lock. next, prev *HostInfo + + //TODO: in, out, and others might benefit from being an atomic.Int32. We could collapse connectionManager pendingDeletion, relayUsed, and in/out into this 1 thing + in, out, pendingDeletion atomic.Bool + + // lastUsed tracks the last time ConnectionManager checked the tunnel and it was in use. + // This value will be behind against actual tunnel utilization in the hot path. + // This should only be used by the ConnectionManagers ticker routine. + lastUsed time.Time } type ViaSender struct { diff --git a/inside.go b/inside.go index 0ccd1790..3da3fa09 100644 --- a/inside.go +++ b/inside.go @@ -213,7 +213,7 @@ func (f *Interface) SendVia(via *HostInfo, c := via.ConnectionState.messageCounter.Add(1) out = header.Encode(out, header.Version, header.Message, header.MessageRelay, relay.RemoteIndex, c) - f.connectionManager.Out(via.localIndexId) + f.connectionManager.Out(via) // Authenticate the header and payload, but do not encrypt for this message type. // The payload consists of the inner, unencrypted Nebula header, as well as the end-to-end encrypted payload. @@ -282,7 +282,7 @@ func (f *Interface) sendNoMetrics(t header.MessageType, st header.MessageSubType //l.WithField("trace", string(debug.Stack())).Error("out Header ", &Header{Version, t, st, 0, hostinfo.remoteIndexId, c}, p) out = header.Encode(out, header.Version, t, st, hostinfo.remoteIndexId, c) - f.connectionManager.Out(hostinfo.localIndexId) + f.connectionManager.Out(hostinfo) // Query our LH if we haven't since the last time we've been rebound, this will cause the remote to punch against // all our IPs and enable a faster roaming. diff --git a/interface.go b/interface.go index f2519076..578e83e3 100644 --- a/interface.go +++ b/interface.go @@ -24,24 +24,23 @@ import ( const mtu = 9001 type InterfaceConfig struct { - HostMap *HostMap - Outside udp.Conn - Inside overlay.Device - pki *PKI - Cipher string - Firewall *Firewall - ServeDns bool - HandshakeManager *HandshakeManager - lightHouse *LightHouse - checkInterval time.Duration - pendingDeletionInterval time.Duration - DropLocalBroadcast bool - DropMulticast bool - routines int - MessageMetrics *MessageMetrics - version string - relayManager *relayManager - punchy *Punchy + HostMap *HostMap + Outside udp.Conn + Inside overlay.Device + pki *PKI + Cipher string + Firewall *Firewall + ServeDns bool + HandshakeManager *HandshakeManager + lightHouse *LightHouse + connectionManager *connectionManager + DropLocalBroadcast bool + DropMulticast bool + routines int + MessageMetrics *MessageMetrics + version string + relayManager *relayManager + punchy *Punchy tryPromoteEvery uint32 reQueryEvery uint32 @@ -154,6 +153,9 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { if c.Firewall == nil { return nil, errors.New("no firewall rules") } + if c.connectionManager == nil { + return nil, errors.New("no connection manager") + } certificate := c.pki.GetCertState().Certificate @@ -196,6 +198,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { readers: make([]io.ReadWriteCloser, c.routines), myVpnNet: myVpnNet, relayManager: c.relayManager, + connectionManager: c.connectionManager, conntrackCacheTimeout: c.ConntrackCacheTimeout, @@ -219,7 +222,7 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) { ifce.reQueryEvery.Store(c.reQueryEvery) ifce.reQueryWait.Store(int64(c.reQueryWait)) - ifce.connectionManager = newConnectionManager(ctx, c.l, ifce, c.checkInterval, c.pendingDeletionInterval, c.punchy) + ifce.connectionManager.intf = ifce return ifce, nil } diff --git a/main.go b/main.go index c6edc913..02d02f5d 100644 --- a/main.go +++ b/main.go @@ -199,6 +199,7 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg hostMap := NewHostMapFromConfig(l, tunCidr, c) punchy := NewPunchyFromConfig(l, c) + connManager := newConnectionManagerFromConfig(l, c, hostMap, punchy) lightHouse, err := NewLightHouseFromConfig(ctx, l, c, tunCidr, udpConns[0], punchy) if err != nil { return nil, util.ContextualizeIfNeeded("Failed to initialize lighthouse handler", err) @@ -234,31 +235,27 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg } } - checkInterval := c.GetInt("timers.connection_alive_interval", 5) - pendingDeletionInterval := c.GetInt("timers.pending_deletion_interval", 10) - ifConfig := &InterfaceConfig{ - HostMap: hostMap, - Inside: tun, - Outside: udpConns[0], - pki: pki, - Cipher: c.GetString("cipher", "aes"), - Firewall: fw, - ServeDns: serveDns, - HandshakeManager: handshakeManager, - lightHouse: lightHouse, - checkInterval: time.Second * time.Duration(checkInterval), - pendingDeletionInterval: time.Second * time.Duration(pendingDeletionInterval), - tryPromoteEvery: c.GetUint32("counters.try_promote", defaultPromoteEvery), - reQueryEvery: c.GetUint32("counters.requery_every_packets", defaultReQueryEvery), - reQueryWait: c.GetDuration("timers.requery_wait_duration", defaultReQueryWait), - DropLocalBroadcast: c.GetBool("tun.drop_local_broadcast", false), - DropMulticast: c.GetBool("tun.drop_multicast", false), - routines: routines, - MessageMetrics: messageMetrics, - version: buildVersion, - relayManager: NewRelayManager(ctx, l, hostMap, c), - punchy: punchy, + HostMap: hostMap, + Inside: tun, + Outside: udpConns[0], + pki: pki, + Cipher: c.GetString("cipher", "aes"), + Firewall: fw, + ServeDns: serveDns, + HandshakeManager: handshakeManager, + connectionManager: connManager, + lightHouse: lightHouse, + tryPromoteEvery: c.GetUint32("counters.try_promote", defaultPromoteEvery), + reQueryEvery: c.GetUint32("counters.requery_every_packets", defaultReQueryEvery), + reQueryWait: c.GetDuration("timers.requery_wait_duration", defaultReQueryWait), + DropLocalBroadcast: c.GetBool("tun.drop_local_broadcast", false), + DropMulticast: c.GetBool("tun.drop_multicast", false), + routines: routines, + MessageMetrics: messageMetrics, + version: buildVersion, + relayManager: NewRelayManager(ctx, l, hostMap, c), + punchy: punchy, ConntrackCacheTimeout: conntrackCacheTimeout, l: l, @@ -325,5 +322,6 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg statsStart, dnsStart, lightHouse.StartUpdateWorker, + connManager.Start, }, nil } diff --git a/outside.go b/outside.go index be60294d..3da37f98 100644 --- a/outside.go +++ b/outside.go @@ -102,7 +102,7 @@ func (f *Interface) readOutsidePackets(ip netip.AddrPort, via *ViaSender, out [] // Pull the Roaming parts up here, and return in all call paths. f.handleHostRoaming(hostinfo, ip) // Track usage of both the HostInfo and the Relay for the received & authenticated packet - f.connectionManager.In(hostinfo.localIndexId) + f.connectionManager.In(hostinfo) f.connectionManager.RelayUsed(h.RemoteIndex) relay, ok := hostinfo.relayState.QueryRelayForByIdx(h.RemoteIndex) @@ -246,7 +246,7 @@ func (f *Interface) readOutsidePackets(ip netip.AddrPort, via *ViaSender, out [] f.handleHostRoaming(hostinfo, ip) - f.connectionManager.In(hostinfo.localIndexId) + f.connectionManager.In(hostinfo) } // closeTunnel closes a tunnel locally, it does not send a closeTunnel packet to the remote @@ -418,7 +418,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out return false } - f.connectionManager.In(hostinfo.localIndexId) + f.connectionManager.In(hostinfo) _, err = f.readers[q].Write(out) if err != nil { f.l.WithError(err).Error("Failed to write to tun") From a1498ca8f8f4d98dd0af9f089f6e09a26851b3d6 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Tue, 24 Jun 2025 12:04:00 -0400 Subject: [PATCH 08/15] Store relay states in a slice for consistent ordering (#1422) --- control_test.go | 4 ++-- handshake_ix.go | 2 +- handshake_manager.go | 2 +- hostmap.go | 24 +++++++++++++++--------- hostmap_test.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 48 insertions(+), 13 deletions(-) diff --git a/control_test.go b/control_test.go index fbf29c06..66a118a8 100644 --- a/control_test.go +++ b/control_test.go @@ -66,7 +66,7 @@ func TestControl_GetHostInfoByVpnIp(t *testing.T) { localIndexId: 201, vpnIp: vpnIp, relayState: RelayState{ - relays: map[netip.Addr]struct{}{}, + relays: nil, relayForByIp: map[netip.Addr]*Relay{}, relayForByIdx: map[uint32]*Relay{}, }, @@ -85,7 +85,7 @@ func TestControl_GetHostInfoByVpnIp(t *testing.T) { localIndexId: 201, vpnIp: vpnIp2, relayState: RelayState{ - relays: map[netip.Addr]struct{}{}, + relays: nil, relayForByIp: map[netip.Addr]*Relay{}, relayForByIdx: map[uint32]*Relay{}, }, diff --git a/handshake_ix.go b/handshake_ix.go index 150e1290..0e9c62ab 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -151,7 +151,7 @@ func ixHandshakeStage1(f *Interface, addr netip.AddrPort, via *ViaSender, packet HandshakePacket: make(map[uint8][]byte, 0), lastHandshakeTime: hs.Details.Time, relayState: RelayState{ - relays: map[netip.Addr]struct{}{}, + relays: nil, relayForByIp: map[netip.Addr]*Relay{}, relayForByIdx: map[uint32]*Relay{}, }, diff --git a/handshake_manager.go b/handshake_manager.go index d87ff02a..56472dd4 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -403,7 +403,7 @@ func (hm *HandshakeManager) StartHandshake(vpnIp netip.Addr, cacheCb func(*Hands vpnIp: vpnIp, HandshakePacket: make(map[uint8][]byte, 0), relayState: RelayState{ - relays: map[netip.Addr]struct{}{}, + relays: nil, relayForByIp: map[netip.Addr]*Relay{}, relayForByIdx: map[uint32]*Relay{}, }, diff --git a/hostmap.go b/hostmap.go index f7da0ad1..0b34de4c 100644 --- a/hostmap.go +++ b/hostmap.go @@ -4,6 +4,7 @@ import ( "errors" "net" "net/netip" + "slices" "sync" "sync/atomic" "time" @@ -69,15 +70,20 @@ type HostMap struct { type RelayState struct { sync.RWMutex - relays map[netip.Addr]struct{} // Set of VpnIp's of Hosts to use as relays to access this peer - relayForByIp map[netip.Addr]*Relay // Maps VpnIps of peers for which this HostInfo is a relay to some Relay info - relayForByIdx map[uint32]*Relay // Maps a local index to some Relay info + relays []netip.Addr // Ordered set of VpnIp's of Hosts to use as relays to access this peer + relayForByIp map[netip.Addr]*Relay // Maps VpnIps of peers for which this HostInfo is a relay to some Relay info + relayForByIdx map[uint32]*Relay // Maps a local index to some Relay info } func (rs *RelayState) DeleteRelay(ip netip.Addr) { rs.Lock() defer rs.Unlock() - delete(rs.relays, ip) + for idx, val := range rs.relays { + if val == ip { + rs.relays = append(rs.relays[:idx], rs.relays[idx+1:]...) + return + } + } } func (rs *RelayState) UpdateRelayForByIpState(vpnIp netip.Addr, state int) { @@ -122,16 +128,16 @@ func (rs *RelayState) GetRelayForByIp(ip netip.Addr) (*Relay, bool) { func (rs *RelayState) InsertRelayTo(ip netip.Addr) { rs.Lock() defer rs.Unlock() - rs.relays[ip] = struct{}{} + if !slices.Contains(rs.relays, ip) { + rs.relays = append(rs.relays, ip) + } } func (rs *RelayState) CopyRelayIps() []netip.Addr { + ret := make([]netip.Addr, len(rs.relays)) rs.RLock() defer rs.RUnlock() - ret := make([]netip.Addr, 0, len(rs.relays)) - for ip := range rs.relays { - ret = append(ret, ip) - } + copy(ret, rs.relays) return ret } diff --git a/hostmap_test.go b/hostmap_test.go index 7e2feb81..6eb8751c 100644 --- a/hostmap_test.go +++ b/hostmap_test.go @@ -7,6 +7,7 @@ import ( "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHostMap_MakePrimary(t *testing.T) { @@ -225,3 +226,31 @@ func TestHostMap_reload(t *testing.T) { c.ReloadConfigString("preferred_ranges: [1.1.1.1/32]") assert.EqualValues(t, []string{"1.1.1.1/32"}, toS(hm.GetPreferredRanges())) } + +func TestHostMap_RelayState(t *testing.T) { + h1 := &HostInfo{vpnIp: netip.MustParseAddr("0.0.0.1"), localIndexId: 1} + a1 := netip.MustParseAddr("::1") + a2 := netip.MustParseAddr("2001::1") + + h1.relayState.InsertRelayTo(a1) + assert.Equal(t, h1.relayState.relays, []netip.Addr{a1}) + h1.relayState.InsertRelayTo(a2) + assert.Equal(t, h1.relayState.relays, []netip.Addr{a1, a2}) + // Ensure that the first relay added is the first one returned in the copy + currentRelays := h1.relayState.CopyRelayIps() + require.Len(t, currentRelays, 2) + assert.Equal(t, currentRelays[0], a1) + + // Deleting the last one in the list works ok + h1.relayState.DeleteRelay(a2) + assert.Equal(t, h1.relayState.relays, []netip.Addr{a1}) + + // Deleting an element not in the list works ok + h1.relayState.DeleteRelay(a2) + assert.Equal(t, h1.relayState.relays, []netip.Addr{a1}) + + // Deleting the only element in the list works ok + h1.relayState.DeleteRelay(a1) + assert.Equal(t, h1.relayState.relays, []netip.Addr{}) + +} From 4870bb680d45997f57995085c62b56a351a18b3e Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Tue, 1 Jul 2025 16:41:29 -0500 Subject: [PATCH 09/15] Darwin udp fix (#1426) --- udp/errors.go | 5 ++ udp/udp_darwin.go | 180 ++++++++++++++++++++++++++++++++++++++++++--- udp/udp_generic.go | 3 +- udp/udp_linux.go | 2 +- 4 files changed, 178 insertions(+), 12 deletions(-) create mode 100644 udp/errors.go diff --git a/udp/errors.go b/udp/errors.go new file mode 100644 index 00000000..12a84879 --- /dev/null +++ b/udp/errors.go @@ -0,0 +1,5 @@ +package udp + +import "errors" + +var ErrInvalidIPv6RemoteForSocket = errors.New("listener is IPv4, but writing to IPv6 remote") diff --git a/udp/udp_darwin.go b/udp/udp_darwin.go index 183ac7af..74041ca5 100644 --- a/udp/udp_darwin.go +++ b/udp/udp_darwin.go @@ -6,17 +6,63 @@ package udp // Darwin support is primarily implemented in udp_generic, besides NewListenConfig import ( + "context" + "encoding/binary" + "errors" "fmt" "net" "net/netip" "syscall" + "unsafe" "github.com/sirupsen/logrus" + "github.com/slackhq/nebula/config" + "github.com/slackhq/nebula/firewall" + "github.com/slackhq/nebula/header" "golang.org/x/sys/unix" ) +type StdConn struct { + *net.UDPConn + isV4 bool + sysFd uintptr + l *logrus.Logger +} + +var _ Conn = &StdConn{} + func NewListener(l *logrus.Logger, ip netip.Addr, port int, multi bool, batch int) (Conn, error) { - return NewGenericListener(l, ip, port, multi, batch) + lc := NewListenConfig(multi) + pc, err := lc.ListenPacket(context.TODO(), "udp", net.JoinHostPort(ip.String(), fmt.Sprintf("%v", port))) + if err != nil { + return nil, err + } + + if uc, ok := pc.(*net.UDPConn); ok { + c := &StdConn{UDPConn: uc, l: l} + + rc, err := uc.SyscallConn() + if err != nil { + return nil, fmt.Errorf("failed to open udp socket: %w", err) + } + + err = rc.Control(func(fd uintptr) { + c.sysFd = fd + }) + if err != nil { + return nil, fmt.Errorf("failed to get udp fd: %w", err) + } + + la, err := c.LocalAddr() + if err != nil { + return nil, err + } + c.isV4 = la.Addr().Is4() + + return c, nil + } + + return nil, fmt.Errorf("unexpected PacketConn: %T %#v", pc, pc) } func NewListenConfig(multi bool) net.ListenConfig { @@ -43,16 +89,130 @@ func NewListenConfig(multi bool) net.ListenConfig { } } -func (u *GenericConn) Rebind() error { - rc, err := u.UDPConn.SyscallConn() - if err != nil { - return err +//go:linkname sendto golang.org/x/sys/unix.sendto +//go:noescape +func sendto(s int, buf []byte, flags int, to unsafe.Pointer, addrlen int32) (err error) + +func (u *StdConn) WriteTo(b []byte, ap netip.AddrPort) error { + var sa unsafe.Pointer + var addrLen int32 + + if u.isV4 { + if ap.Addr().Is6() { + return ErrInvalidIPv6RemoteForSocket + } + + var rsa unix.RawSockaddrInet6 + rsa.Family = unix.AF_INET6 + rsa.Addr = ap.Addr().As16() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ap.Port()) + sa = unsafe.Pointer(&rsa) + addrLen = syscall.SizeofSockaddrInet4 + } else { + var rsa unix.RawSockaddrInet6 + rsa.Family = unix.AF_INET6 + rsa.Addr = ap.Addr().As16() + binary.BigEndian.PutUint16((*[2]byte)(unsafe.Pointer(&rsa.Port))[:], ap.Port()) + sa = unsafe.Pointer(&rsa) + addrLen = syscall.SizeofSockaddrInet6 } - return rc.Control(func(fd uintptr) { - err := syscall.SetsockoptInt(int(fd), unix.IPPROTO_IPV6, unix.IPV6_BOUND_IF, 0) - if err != nil { - u.l.WithError(err).Error("Failed to rebind udp socket") + // Golang stdlib doesn't handle EAGAIN correctly in some situations so we do writes ourselves + // See https://github.com/golang/go/issues/73919 + for { + //_, _, err := unix.Syscall6(unix.SYS_SENDTO, u.sysFd, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), 0, sa, addrLen) + err := sendto(int(u.sysFd), b, 0, sa, addrLen) + if err == nil { + // Written, get out before the error handling + return nil } - }) + + if errors.Is(err, syscall.EINTR) { + // Write was interrupted, retry + continue + } + + if errors.Is(err, syscall.EAGAIN) { + return &net.OpError{Op: "sendto", Err: unix.EWOULDBLOCK} + } + + if errors.Is(err, syscall.EBADF) { + return net.ErrClosed + } + + return &net.OpError{Op: "sendto", Err: err} + } +} + +func (u *StdConn) LocalAddr() (netip.AddrPort, error) { + a := u.UDPConn.LocalAddr() + + switch v := a.(type) { + case *net.UDPAddr: + addr, ok := netip.AddrFromSlice(v.IP) + if !ok { + return netip.AddrPort{}, fmt.Errorf("LocalAddr returned invalid IP address: %s", v.IP) + } + return netip.AddrPortFrom(addr, uint16(v.Port)), nil + + default: + return netip.AddrPort{}, fmt.Errorf("LocalAddr returned: %#v", a) + } +} + +func (u *StdConn) ReloadConfig(c *config.C) { + // TODO +} + +func NewUDPStatsEmitter(udpConns []Conn) func() { + // No UDP stats for non-linux + return func() {} +} + +func (u *StdConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firewall.ConntrackCacheTicker, q int) { + plaintext := make([]byte, MTU) + buffer := make([]byte, MTU) + h := &header.H{} + fwPacket := &firewall.Packet{} + nb := make([]byte, 12, 12) + + for { + // Just read one packet at a time + n, rua, err := u.ReadFromUDPAddrPort(buffer) + if err != nil { + if errors.Is(err, net.ErrClosed) { + u.l.WithError(err).Debug("udp socket is closed, exiting read loop") + return + } + + u.l.WithError(err).Error("unexpected udp socket receive error") + } + + r( + netip.AddrPortFrom(rua.Addr().Unmap(), rua.Port()), + plaintext[:0], + buffer[:n], + h, + fwPacket, + lhf, + nb, + q, + cache.Get(u.l), + ) + } +} + +func (u *StdConn) Rebind() error { + var err error + if u.isV4 { + err = syscall.SetsockoptInt(int(u.sysFd), syscall.IPPROTO_IP, syscall.IP_BOUND_IF, 0) + } else { + err = syscall.SetsockoptInt(int(u.sysFd), syscall.IPPROTO_IPV6, syscall.IPV6_BOUND_IF, 0) + } + + if err != nil { + u.l.WithError(err).Error("Failed to rebind udp socket") + } + + return nil } diff --git a/udp/udp_generic.go b/udp/udp_generic.go index 2d845369..74b7d29c 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -1,6 +1,7 @@ -//go:build (!linux || android) && !e2e_testing +//go:build (!linux || android) && !e2e_testing && !darwin // +build !linux android // +build !e2e_testing +// +build !darwin // udp_generic implements the nebula UDP interface in pure Go stdlib. This // means it can be used on platforms like Darwin and Windows. diff --git a/udp/udp_linux.go b/udp/udp_linux.go index 2eee76ee..eee83cf2 100644 --- a/udp/udp_linux.go +++ b/udp/udp_linux.go @@ -243,7 +243,7 @@ func (u *StdConn) writeTo6(b []byte, ip netip.AddrPort) error { func (u *StdConn) writeTo4(b []byte, ip netip.AddrPort) error { if !ip.Addr().Is4() { - return fmt.Errorf("Listener is IPv4, but writing to IPv6 remote") + return ErrInvalidIPv6RemoteForSocket } var rsa unix.RawSockaddrInet4 From 105e0ec66c11d57c06c5f1c52a064be3b50e4d25 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Fri, 18 Jul 2025 08:39:33 -0400 Subject: [PATCH 10/15] v1.9.6 (#1434) Update CHANGELOG for Nebula v1.9.6 --- CHANGELOG.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b33b0ca7..4311a6f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.9.6] - 2025-7-15 + +### Added + +- Support dropping inactive tunnels. This is disabled by default in this release but can be enabled with `tunnels.drop_inactive`. See example config for more details. (#1413) + +### Fixed + +- Fix Darwin freeze due to presence of some Network Extensions (#1426) +- Ensure the same relay tunnel is always used when multiple relay tunnels are present (#1422) +- Fix Windows freeze due to ICMP error handling (#1412) +- Fix relay migration panic (#1403) + ## [1.9.5] - 2024-12-05 ### Added @@ -674,7 +687,8 @@ created.) - Initial public release. -[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.5...HEAD +[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.6...HEAD +[1.9.6]: https://github.com/slackhq/nebula/releases/tag/v1.9.6 [1.9.5]: https://github.com/slackhq/nebula/releases/tag/v1.9.5 [1.9.4]: https://github.com/slackhq/nebula/releases/tag/v1.9.4 [1.9.3]: https://github.com/slackhq/nebula/releases/tag/v1.9.3 From 1d73e463cd32840130a5244bf28475e12168f8a4 Mon Sep 17 00:00:00 2001 From: brad-defined <77982333+brad-defined@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:33:31 -0400 Subject: [PATCH 11/15] Quietly log error on UDP_NETRESET ioctl on Windows. (#1453) * Quietly log error on UDP_NETRESET ioctl on Windows. * dampen unexpected error warnings --- udp/udp_generic.go | 16 ++++++++++++++-- udp/udp_rio_windows.go | 33 +++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/udp/udp_generic.go b/udp/udp_generic.go index 74b7d29c..eebabd33 100644 --- a/udp/udp_generic.go +++ b/udp/udp_generic.go @@ -10,9 +10,11 @@ package udp import ( "context" + "errors" "fmt" "net" "net/netip" + "time" "github.com/sirupsen/logrus" "github.com/slackhq/nebula/config" @@ -80,12 +82,22 @@ func (u *GenericConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *f fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) + var lastRecvErr time.Time + for { // Just read one packet at a time n, rua, err := u.ReadFromUDPAddrPort(buffer) if err != nil { - u.l.WithError(err).Debug("udp socket is closed, exiting read loop") - return + if errors.Is(err, net.ErrClosed) { + u.l.WithError(err).Debug("udp socket is closed, exiting read loop") + return + } + // Dampen unexpected message warns to once per minute + if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute { + lastRecvErr = time.Now() + u.l.WithError(err).Warn("unexpected udp socket receive error") + } + continue } r( diff --git a/udp/udp_rio_windows.go b/udp/udp_rio_windows.go index 045ae8e7..cd5f750c 100644 --- a/udp/udp_rio_windows.go +++ b/udp/udp_rio_windows.go @@ -14,6 +14,7 @@ import ( "sync" "sync/atomic" "syscall" + "time" "unsafe" "github.com/sirupsen/logrus" @@ -69,7 +70,7 @@ func NewRIOListener(l *logrus.Logger, addr netip.Addr, port int) (*RIOConn, erro u := &RIOConn{l: l} - err := u.bind(&windows.SockaddrInet6{Addr: addr.As16(), Port: port}) + err := u.bind(l, &windows.SockaddrInet6{Addr: addr.As16(), Port: port}) if err != nil { return nil, fmt.Errorf("bind: %w", err) } @@ -85,11 +86,11 @@ func NewRIOListener(l *logrus.Logger, addr netip.Addr, port int) (*RIOConn, erro return u, nil } -func (u *RIOConn) bind(sa windows.Sockaddr) error { +func (u *RIOConn) bind(l *logrus.Logger, sa windows.Sockaddr) error { var err error u.sock, err = winrio.Socket(windows.AF_INET6, windows.SOCK_DGRAM, windows.IPPROTO_UDP) if err != nil { - return err + return fmt.Errorf("winrio.Socket error: %w", err) } // Enable v4 for this socket @@ -103,35 +104,40 @@ func (u *RIOConn) bind(sa windows.Sockaddr) error { size := uint32(unsafe.Sizeof(flag)) err = syscall.WSAIoctl(syscall.Handle(u.sock), syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) if err != nil { - return err + // This is a best-effort to prevent errors from being returned by the udp recv operation. + // Quietly log a failure and continue. + l.WithError(err).Debug("failed to set UDP_CONNRESET ioctl") } + ret = 0 flag = 0 size = uint32(unsafe.Sizeof(flag)) SIO_UDP_NETRESET := uint32(syscall.IOC_IN | syscall.IOC_VENDOR | 15) err = syscall.WSAIoctl(syscall.Handle(u.sock), SIO_UDP_NETRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) if err != nil { - return err + // This is a best-effort to prevent errors from being returned by the udp recv operation. + // Quietly log a failure and continue. + l.WithError(err).Debug("failed to set UDP_NETRESET ioctl") } err = u.rx.Open() if err != nil { - return err + return fmt.Errorf("error rx.Open(): %w", err) } err = u.tx.Open() if err != nil { - return err + return fmt.Errorf("error tx.Open(): %w", err) } u.rq, err = winrio.CreateRequestQueue(u.sock, packetsPerRing, 1, packetsPerRing, 1, u.rx.cq, u.tx.cq, 0) if err != nil { - return err + return fmt.Errorf("error CreateRequestQueue: %w", err) } err = windows.Bind(u.sock, sa) if err != nil { - return err + return fmt.Errorf("error windows.Bind(): %w", err) } return nil @@ -144,15 +150,22 @@ func (u *RIOConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firew fwPacket := &firewall.Packet{} nb := make([]byte, 12, 12) + var lastRecvErr time.Time + for { // Just read one packet at a time n, rua, err := u.receive(buffer) + if err != nil { if errors.Is(err, net.ErrClosed) { u.l.WithError(err).Debug("udp socket is closed, exiting read loop") return } - u.l.WithError(err).Error("unexpected udp socket receive error") + // Dampen unexpected message warns to once per minute + if lastRecvErr.IsZero() || time.Since(lastRecvErr) > time.Minute { + lastRecvErr = time.Now() + u.l.WithError(err).Warn("unexpected udp socket receive error") + } continue } From 22af56f1568347bea187d03207eff22d420a1938 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Thu, 4 Sep 2025 15:52:32 -0500 Subject: [PATCH 12/15] Fix `recv_error` receipt limit allowance for v1.9.x (#1459) * Fix recv_error receipt limit allowance * backport #1463 recv_error behavior changes --------- Co-authored-by: JackDoan --- hostmap.go | 9 --------- outside.go | 18 ++++++++---------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/hostmap.go b/hostmap.go index 0b34de4c..9731b019 100644 --- a/hostmap.go +++ b/hostmap.go @@ -22,7 +22,6 @@ const defaultPromoteEvery = 1000 // Count of packets sent before we try mo const defaultReQueryEvery = 5000 // Count of packets sent before re-querying a hostinfo to the lighthouse const defaultReQueryWait = time.Minute // Minimum amount of seconds to wait before re-querying a hostinfo the lighthouse. Evaluated every ReQueryEvery const MaxRemotes = 10 -const maxRecvError = 4 // MaxHostInfosPerVpnIp is the max number of hostinfos we will track for a given vpn ip // 5 allows for an initial handshake and each host pair re-handshaking twice @@ -220,7 +219,6 @@ type HostInfo struct { remoteIndexId uint32 localIndexId uint32 vpnIp netip.Addr - recvError atomic.Uint32 remoteCidr *bart.Table[struct{}] relayState RelayState @@ -705,13 +703,6 @@ func (i *HostInfo) SetRemoteIfPreferred(hm *HostMap, newRemote netip.AddrPort) b return false } -func (i *HostInfo) RecvErrorExceeded() bool { - if i.recvError.Add(1) >= maxRecvError { - return true - } - return true -} - func (i *HostInfo) CreateRemoteCIDR(c *cert.NebulaCertificate) { if len(c.Details.Ips) == 1 && len(c.Details.Subnets) == 0 { // Simple case, no CIDRTree needed diff --git a/outside.go b/outside.go index 3da37f98..c5c15c52 100644 --- a/outside.go +++ b/outside.go @@ -286,16 +286,18 @@ func (f *Interface) handleHostRoaming(hostinfo *HostInfo, ip netip.AddrPort) { } +// handleEncrypted returns true if a packet should be processed, false otherwise func (f *Interface) handleEncrypted(ci *ConnectionState, addr netip.AddrPort, h *header.H) bool { - // If connectionstate exists and the replay protector allows, process packet - // Else, send recv errors for 300 seconds after a restart to allow fast reconnection. - if ci == nil || !ci.window.Check(f.l, h.MessageCounter) { + // If connectionstate does not exist, send a recv error, if possible, to encourage a fast reconnect + if ci == nil { if addr.IsValid() { f.maybeSendRecvError(addr, h.RemoteIndex) - return false - } else { - return false } + return false + } + // If the window check fails, refuse to process the packet, but don't send a recv error + if !ci.window.Check(f.l, h.MessageCounter) { + return false } return true @@ -458,10 +460,6 @@ func (f *Interface) handleRecvError(addr netip.AddrPort, h *header.H) { return } - if !hostinfo.RecvErrorExceeded() { - return - } - if hostinfo.remote.IsValid() && hostinfo.remote != addr { f.l.Infoln("Someone spoofing recv_errors? ", addr, hostinfo.remote) return From 9f692175e17ed6981a720f9ab70928e41d07f618 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Tue, 7 Oct 2025 18:35:58 -0400 Subject: [PATCH 13/15] HostInfo.remoteCidr should only be populated with the entire vpn ip address issued in the certificate (#1494) --- hostmap.go | 3 +-- hostmap_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/hostmap.go b/hostmap.go index 9731b019..b9bdd4c1 100644 --- a/hostmap.go +++ b/hostmap.go @@ -714,8 +714,7 @@ func (i *HostInfo) CreateRemoteCIDR(c *cert.NebulaCertificate) { //TODO: IPV6-WORK what to do when ip is invalid? nip, _ := netip.AddrFromSlice(ip.IP) nip = nip.Unmap() - bits, _ := ip.Mask.Size() - remoteCidr.Insert(netip.PrefixFrom(nip, bits), struct{}{}) + remoteCidr.Insert(netip.PrefixFrom(nip, nip.BitLen()), struct{}{}) } for _, n := range c.Details.Subnets { diff --git a/hostmap_test.go b/hostmap_test.go index 6eb8751c..7f19fe4d 100644 --- a/hostmap_test.go +++ b/hostmap_test.go @@ -1,9 +1,11 @@ package nebula import ( + "net" "net/netip" "testing" + "github.com/slackhq/nebula/cert" "github.com/slackhq/nebula/config" "github.com/slackhq/nebula/test" "github.com/stretchr/testify/assert" @@ -87,6 +89,40 @@ func TestHostMap_MakePrimary(t *testing.T) { assert.Nil(t, h2.next) } +func TestHostInfo_CreateRemoteCIDR(t *testing.T) { + h := HostInfo{} + c := &cert.NebulaCertificate{ + Details: cert.NebulaCertificateDetails{ + Ips: []*net.IPNet{ + { + IP: net.IPv4(1, 2, 3, 4), + Mask: net.IPv4Mask(255, 255, 255, 0), + }, + }, + }, + } + + // remoteCidr should be empty with only 1 ip address present in the certificate + h.CreateRemoteCIDR(c) + assert.Empty(t, h.remoteCidr) + + // remoteCidr should be populated if there is also a subnet in the certificate + c.Details.Subnets = []*net.IPNet{ + { + IP: net.IPv4(9, 2, 3, 4), + Mask: net.IPv4Mask(255, 255, 255, 0), + }, + } + h.CreateRemoteCIDR(c) + assert.NotEmpty(t, h.remoteCidr) + _, ok := h.remoteCidr.Lookup(netip.MustParseAddr("1.2.3.0")) + assert.False(t, ok, "An ip address within the certificates network should not be found") + _, ok = h.remoteCidr.Lookup(netip.MustParseAddr("1.2.3.4")) + assert.True(t, ok, "An exact ip address match should be found") + _, ok = h.remoteCidr.Lookup(netip.MustParseAddr("9.2.3.4")) + assert.True(t, ok, "An ip address within the subnets should be found") +} + func TestHostMap_DeleteHostInfo(t *testing.T) { l := test.NewLogger() hm := newHostMap( From 824cd3f0d6bc2c36a6515bc93a2c56c3fa8b90b5 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Tue, 7 Oct 2025 21:10:16 -0500 Subject: [PATCH 14/15] Update CHANGELOG for Nebula v1.9.7 --- CHANGELOG.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4311a6f4..318981d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.9.7] - 2025-10-8 + +### Security + +- Fix an opportunity for emitting a packet that was sent with a spoofed source address within the configured vpn network. (#1494) + +### Changed + +- Disable sending `recv_error` messages when a packet is received outside the allowable counter window. (#1459) +- Improve error messages and remove some unnecessary fatal conditions in the Windows and generic udp listener. (#1543) + ## [1.9.6] - 2025-7-15 ### Added @@ -687,7 +698,8 @@ created.) - Initial public release. -[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.6...HEAD +[Unreleased]: https://github.com/slackhq/nebula/compare/v1.9.7...HEAD +[1.9.7]: https://github.com/slackhq/nebula/releases/tag/v1.9.7 [1.9.6]: https://github.com/slackhq/nebula/releases/tag/v1.9.6 [1.9.5]: https://github.com/slackhq/nebula/releases/tag/v1.9.5 [1.9.4]: https://github.com/slackhq/nebula/releases/tag/v1.9.4 From 7c3f5339508c7dbc74eea7f925d31b28bafa1b45 Mon Sep 17 00:00:00 2001 From: Nate Brown Date: Fri, 10 Oct 2025 11:31:46 -0400 Subject: [PATCH 15/15] Better words (#1497) --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 318981d7..79790a12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [1.9.7] - 2025-10-8 +## [1.9.7] - 2025-10-10 ### Security -- Fix an opportunity for emitting a packet that was sent with a spoofed source address within the configured vpn network. (#1494) +- Fix an issue where Nebula could incorrectly accept and process a packet from an erroneous source IP when the sender's + certificate is configured with unsafe_routes (cert v1/v2) or multiple IPs (cert v2). (#1494) ### Changed