mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-10 14:12:43 -07:00
Some checks failed
gofmt / Run gofmt (push) Has been cancelled
smoke-extra / Run extra smoke tests (push) Has been cancelled
smoke / Run multi node smoke test (push) Has been cancelled
Build and test / Build all and test on ubuntu-linux (push) Has been cancelled
Build and test / Build and test on linux with boringcrypto (push) Has been cancelled
Build and test / Build and test on linux with pkcs11 (push) Has been cancelled
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled
410 lines
9.4 KiB
Go
410 lines
9.4 KiB
Go
package nebula
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/slackhq/nebula/config"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func newTestStatsServer(t *testing.T) (*statsServer, *config.C) {
|
|
t.Helper()
|
|
l := slog.New(slog.DiscardHandler)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
return &statsServer{
|
|
l: l,
|
|
ctx: ctx,
|
|
}, config.NewC(l)
|
|
}
|
|
|
|
func setStatsConfig(c *config.C, m map[string]any) {
|
|
c.Settings["stats"] = m
|
|
}
|
|
|
|
func currentRuntime(s *statsServer) *statsRuntime {
|
|
s.runMu.Lock()
|
|
defer s.runMu.Unlock()
|
|
return s.run
|
|
}
|
|
|
|
func TestStatsServer_reload_initial_disabled(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{"type": "none"})
|
|
|
|
require.NoError(t, s.reload(c, true))
|
|
assert.False(t, s.enabled.Load())
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_reload_initial_invalidInterval(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "graphite",
|
|
"host": "127.0.0.1:0",
|
|
"prefix": "test",
|
|
})
|
|
|
|
err := s.reload(c, true)
|
|
require.Error(t, err)
|
|
assert.False(t, s.enabled.Load())
|
|
}
|
|
|
|
func TestStatsServer_reload_initial_unknownType(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "carbon",
|
|
"interval": "1s",
|
|
})
|
|
|
|
err := s.reload(c, true)
|
|
require.Error(t, err)
|
|
assert.False(t, s.enabled.Load())
|
|
}
|
|
|
|
func TestStatsServer_reload_unchanged_noOp(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{"type": "none"})
|
|
|
|
require.NoError(t, s.reload(c, true))
|
|
require.NoError(t, s.reload(c, false))
|
|
assert.False(t, s.enabled.Load())
|
|
}
|
|
|
|
func TestStatsServer_reload_initial_graphite(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "graphite",
|
|
"interval": "1s",
|
|
"protocol": "tcp",
|
|
"host": "127.0.0.1:2003",
|
|
"prefix": "test",
|
|
})
|
|
|
|
require.NoError(t, s.reload(c, true))
|
|
assert.True(t, s.enabled.Load())
|
|
// reload only records config; Start builds the runtime.
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_reload_initial_prometheus(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:0",
|
|
"path": "/metrics",
|
|
})
|
|
|
|
require.NoError(t, s.reload(c, true))
|
|
assert.True(t, s.enabled.Load())
|
|
// reload only records config; Start builds the runtime.
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_Start_graphite_blocksUntilStop(t *testing.T) {
|
|
sink := newGraphiteSink(t)
|
|
defer sink.Close()
|
|
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "graphite",
|
|
"interval": "1s",
|
|
"protocol": "tcp",
|
|
"host": sink.Addr(),
|
|
"prefix": "test",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(done)
|
|
}()
|
|
|
|
// Wait for Start to publish runtime state.
|
|
waitFor(t, func() bool { return currentRuntime(s) != nil })
|
|
rt := currentRuntime(s)
|
|
require.NotNil(t, rt)
|
|
assert.Nil(t, rt.listener, "graphite has no listener")
|
|
|
|
s.Stop()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("graphite Start did not return after Stop")
|
|
}
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_StartStop_lifecycle(t *testing.T) {
|
|
port := freeTCPPort(t)
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(done)
|
|
}()
|
|
|
|
waitForListening(t, "127.0.0.1:"+port)
|
|
rt := currentRuntime(s)
|
|
require.NotNil(t, rt)
|
|
require.NotNil(t, rt.listener)
|
|
|
|
s.Stop()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Start did not return after Stop")
|
|
}
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_reload_disable_stopsRunningRuntime(t *testing.T) {
|
|
port := freeTCPPort(t)
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(done)
|
|
}()
|
|
waitForListening(t, "127.0.0.1:"+port)
|
|
|
|
setStatsConfig(c, map[string]any{"type": "none"})
|
|
require.NoError(t, s.reload(c, false))
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Start did not return after reload disabled stats")
|
|
}
|
|
assert.False(t, s.enabled.Load())
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_reload_changeListener_restartsListener(t *testing.T) {
|
|
port1 := freeTCPPort(t)
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port1,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
firstDone := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(firstDone)
|
|
}()
|
|
waitForListening(t, "127.0.0.1:"+port1)
|
|
first := currentRuntime(s)
|
|
require.NotNil(t, first)
|
|
|
|
port2 := freeTCPPort(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port2,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, false))
|
|
|
|
select {
|
|
case <-firstDone:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("old Start did not return after reload")
|
|
}
|
|
|
|
waitForListening(t, "127.0.0.1:"+port2)
|
|
second := currentRuntime(s)
|
|
require.NotNil(t, second)
|
|
assert.NotSame(t, first, second, "expected a new runtime after listen address change")
|
|
|
|
s.Stop()
|
|
}
|
|
|
|
func TestStatsServer_Stop_beforeStart_doesNotBlock(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:0",
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
stopped := make(chan struct{})
|
|
go func() {
|
|
s.Stop()
|
|
close(stopped)
|
|
}()
|
|
select {
|
|
case <-stopped:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Stop hung with no runtime started")
|
|
}
|
|
}
|
|
|
|
func TestStatsServer_configTest_validatesWithoutSpawning(t *testing.T) {
|
|
s, c := newTestStatsServer(t)
|
|
s.configTest = true
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:0",
|
|
"path": "/metrics",
|
|
})
|
|
|
|
require.NoError(t, s.reload(c, true))
|
|
s.Start()
|
|
assert.Nil(t, currentRuntime(s))
|
|
}
|
|
|
|
func TestStatsServer_ctxCancel_unblocksStart(t *testing.T) {
|
|
// Ensures ctx cancellation alone (no explicit Stop) tears down both
|
|
// graphite and prom Start invocations.
|
|
port := freeTCPPort(t)
|
|
l := slog.New(slog.DiscardHandler)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
s := &statsServer{l: l, ctx: ctx}
|
|
c := config.NewC(l)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(done)
|
|
}()
|
|
waitForListening(t, "127.0.0.1:"+port)
|
|
|
|
cancel()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Start did not return after ctx cancel")
|
|
}
|
|
}
|
|
|
|
func TestStatsServer_listenerBindFailure_sameCfgReloadRetries(t *testing.T) {
|
|
// Hold the port so ListenAndServe will fail on first Start.
|
|
blocker, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
port := strconv.Itoa(blocker.Addr().(*net.TCPAddr).Port)
|
|
|
|
s, c := newTestStatsServer(t)
|
|
setStatsConfig(c, map[string]any{
|
|
"type": "prometheus",
|
|
"interval": "1s",
|
|
"listen": "127.0.0.1:" + port,
|
|
"path": "/metrics",
|
|
})
|
|
require.NoError(t, s.reload(c, true))
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.Start()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Start did not return after bind failure")
|
|
}
|
|
// Bind failure should have dropped the cached config so a same-cfg
|
|
// SIGHUP can retry.
|
|
s.runMu.Lock()
|
|
cfgAfterFailure := s.runCfg
|
|
s.runMu.Unlock()
|
|
assert.Nil(t, cfgAfterFailure)
|
|
|
|
// Free the port and reload with the same config; Start should fire again.
|
|
require.NoError(t, blocker.Close())
|
|
require.NoError(t, s.reload(c, false))
|
|
|
|
waitForListening(t, "127.0.0.1:"+port)
|
|
require.NotNil(t, currentRuntime(s))
|
|
|
|
s.Stop()
|
|
}
|
|
|
|
func waitForListening(t *testing.T, addr string) {
|
|
t.Helper()
|
|
waitFor(t, func() bool {
|
|
conn, err := net.DialTimeout("tcp", addr, 200*time.Millisecond)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
_ = conn.Close()
|
|
return true
|
|
})
|
|
}
|
|
|
|
// graphiteSink is a minimal TCP accept-and-discard server so graphite.Once
|
|
// calls in tests don't spam error logs or wedge on connection refused.
|
|
type graphiteSink struct {
|
|
ln net.Listener
|
|
}
|
|
|
|
func newGraphiteSink(t *testing.T) *graphiteSink {
|
|
t.Helper()
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
g := &graphiteSink{ln: ln}
|
|
go func() {
|
|
for {
|
|
conn, err := ln.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
go func(c net.Conn) {
|
|
_, _ = io.Copy(io.Discard, c)
|
|
_ = c.Close()
|
|
}(conn)
|
|
}
|
|
}()
|
|
return g
|
|
}
|
|
|
|
func (g *graphiteSink) Addr() string { return g.ln.Addr().String() }
|
|
func (g *graphiteSink) Close() { _ = g.ln.Close() }
|
|
|
|
func freeTCPPort(t *testing.T) string {
|
|
t.Helper()
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
port := ln.Addr().(*net.TCPAddr).Port
|
|
require.NoError(t, ln.Close())
|
|
return strconv.Itoa(port)
|
|
}
|