mirror of
https://github.com/slackhq/nebula.git
synced 2025-12-06 02:30:57 -08:00
Implement ECMP for unsafe_routes (#1332)
Some checks are pending
gofmt / Run gofmt (push) Waiting to run
smoke-extra / Run extra smoke tests (push) Waiting to run
smoke / Run multi node smoke test (push) Waiting to run
Build and test / Build all and test on ubuntu-linux (push) Waiting to run
Build and test / Build and test on linux with boringcrypto (push) Waiting to run
Build and test / Build and test on linux with pkcs11 (push) Waiting to run
Build and test / Build and test on ${{ matrix.os }} (macos-latest) (push) Waiting to run
Build and test / Build and test on ${{ matrix.os }} (windows-latest) (push) Waiting to run
Some checks are pending
gofmt / Run gofmt (push) Waiting to run
smoke-extra / Run extra smoke tests (push) Waiting to run
smoke / Run multi node smoke test (push) Waiting to run
Build and test / Build all and test on ubuntu-linux (push) Waiting to run
Build and test / Build and test on linux with boringcrypto (push) Waiting to run
Build and test / Build and test on linux with pkcs11 (push) Waiting to run
Build and test / Build and test on ${{ matrix.os }} (macos-latest) (push) Waiting to run
Build and test / Build and test on ${{ matrix.os }} (windows-latest) (push) Waiting to run
This commit is contained in:
parent
3de36c99b6
commit
f86953ca56
21 changed files with 690 additions and 82 deletions
95
inside.go
95
inside.go
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/slackhq/nebula/header"
|
||||
"github.com/slackhq/nebula/iputil"
|
||||
"github.com/slackhq/nebula/noiseutil"
|
||||
"github.com/slackhq/nebula/routing"
|
||||
)
|
||||
|
||||
func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
|
||||
|
|
@ -49,7 +50,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet
|
|||
return
|
||||
}
|
||||
|
||||
hostinfo, ready := f.getOrHandshake(fwPacket.RemoteAddr, func(hh *HandshakeHostInfo) {
|
||||
hostinfo, ready := f.getOrHandshakeConsiderRouting(fwPacket, func(hh *HandshakeHostInfo) {
|
||||
hh.cachePacket(f.l, header.Message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics)
|
||||
})
|
||||
|
||||
|
|
@ -121,22 +122,94 @@ func (f *Interface) rejectOutside(packet []byte, ci *ConnectionState, hostinfo *
|
|||
f.sendNoMetrics(header.Message, 0, ci, hostinfo, netip.AddrPort{}, out, nb, packet, q)
|
||||
}
|
||||
|
||||
// Handshake will attempt to initiate a tunnel with the provided vpn address if it is within our vpn networks. This is a no-op if the tunnel is already established or being established
|
||||
func (f *Interface) Handshake(vpnAddr netip.Addr) {
|
||||
f.getOrHandshake(vpnAddr, nil)
|
||||
f.getOrHandshakeNoRouting(vpnAddr, nil)
|
||||
}
|
||||
|
||||
// getOrHandshake returns nil if the vpnAddr is not routable.
|
||||
// getOrHandshakeNoRouting returns nil if the vpnAddr is not routable.
|
||||
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel
|
||||
func (f *Interface) getOrHandshake(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
|
||||
func (f *Interface) getOrHandshakeNoRouting(vpnAddr netip.Addr, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
|
||||
_, found := f.myVpnNetworksTable.Lookup(vpnAddr)
|
||||
if !found {
|
||||
vpnAddr = f.inside.RouteFor(vpnAddr)
|
||||
if !vpnAddr.IsValid() {
|
||||
return nil, false
|
||||
}
|
||||
if found {
|
||||
return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// getOrHandshakeConsiderRouting will try to find the HostInfo to handle this packet, starting a handshake if necessary.
|
||||
// If the 2nd return var is false then the hostinfo is not ready to be used in a tunnel.
|
||||
func (f *Interface) getOrHandshakeConsiderRouting(fwPacket *firewall.Packet, cacheCallback func(*HandshakeHostInfo)) (*HostInfo, bool) {
|
||||
|
||||
destinationAddr := fwPacket.RemoteAddr
|
||||
|
||||
hostinfo, ready := f.getOrHandshakeNoRouting(destinationAddr, cacheCallback)
|
||||
|
||||
// Host is inside the mesh, no routing required
|
||||
if hostinfo != nil {
|
||||
return hostinfo, ready
|
||||
}
|
||||
|
||||
gateways := f.inside.RoutesFor(destinationAddr)
|
||||
|
||||
switch len(gateways) {
|
||||
case 0:
|
||||
return nil, false
|
||||
case 1:
|
||||
// Single gateway route
|
||||
return f.handshakeManager.GetOrHandshake(gateways[0].Addr(), cacheCallback)
|
||||
default:
|
||||
// Multi gateway route, perform ECMP categorization
|
||||
gatewayAddr, balancingOk := routing.BalancePacket(fwPacket, gateways)
|
||||
|
||||
if !balancingOk {
|
||||
// This happens if the gateway buckets were not calculated, this _should_ never happen
|
||||
f.l.Error("Gateway buckets not calculated, fallback from ECMP to random routing. Please report this bug.")
|
||||
}
|
||||
|
||||
var handshakeInfoForChosenGateway *HandshakeHostInfo
|
||||
var hhReceiver = func(hh *HandshakeHostInfo) {
|
||||
handshakeInfoForChosenGateway = hh
|
||||
}
|
||||
|
||||
// Store the handshakeHostInfo for later.
|
||||
// If this node is not reachable we will attempt other nodes, if none are reachable we will
|
||||
// cache the packet for this gateway.
|
||||
if hostinfo, ready = f.handshakeManager.GetOrHandshake(gatewayAddr, hhReceiver); ready {
|
||||
return hostinfo, true
|
||||
}
|
||||
|
||||
// It appears the selected gateway cannot be reached, find another gateway to fallback on.
|
||||
// The current implementation breaks ECMP but that seems better than no connectivity.
|
||||
// If ECMP is also required when a gateway is down then connectivity status
|
||||
// for each gateway needs to be kept and the weights recalculated when they go up or down.
|
||||
// This would also need to interact with unsafe_route updates through reloading the config or
|
||||
// use of the use_system_route_table option
|
||||
|
||||
if f.l.Level >= logrus.DebugLevel {
|
||||
f.l.WithField("destination", destinationAddr).
|
||||
WithField("originalGateway", gatewayAddr).
|
||||
Debugln("Calculated gateway for ECMP not available, attempting other gateways")
|
||||
}
|
||||
|
||||
for i := range gateways {
|
||||
// Skip the gateway that failed previously
|
||||
if gateways[i].Addr() == gatewayAddr {
|
||||
continue
|
||||
}
|
||||
|
||||
// We do not need the HandshakeHostInfo since we cache the packet in the originally chosen gateway
|
||||
if hostinfo, ready = f.handshakeManager.GetOrHandshake(gateways[i].Addr(), nil); ready {
|
||||
return hostinfo, true
|
||||
}
|
||||
}
|
||||
|
||||
// No gateways reachable, cache the packet in the originally chosen gateway
|
||||
cacheCallback(handshakeInfoForChosenGateway)
|
||||
return hostinfo, false
|
||||
}
|
||||
|
||||
return f.handshakeManager.GetOrHandshake(vpnAddr, cacheCallback)
|
||||
}
|
||||
|
||||
func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubType, hostinfo *HostInfo, p, nb, out []byte) {
|
||||
|
|
@ -163,7 +236,7 @@ func (f *Interface) sendMessageNow(t header.MessageType, st header.MessageSubTyp
|
|||
|
||||
// SendMessageToVpnAddr handles real addr:port lookup and sends to the current best known address for vpnAddr
|
||||
func (f *Interface) SendMessageToVpnAddr(t header.MessageType, st header.MessageSubType, vpnAddr netip.Addr, p, nb, out []byte) {
|
||||
hostInfo, ready := f.getOrHandshake(vpnAddr, func(hh *HandshakeHostInfo) {
|
||||
hostInfo, ready := f.getOrHandshakeNoRouting(vpnAddr, func(hh *HandshakeHostInfo) {
|
||||
hh.cachePacket(f.l, t, st, p, f.SendMessageToHostInfo, f.cachedPacketMetrics)
|
||||
})
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue