nebula/stats_test.go
Nate Brown 1ab1f71dba
Some checks failed
gofmt / Run gofmt (push) Has been cancelled
smoke-extra / Run extra smoke tests (push) Has been cancelled
smoke / Run multi node smoke test (push) Has been cancelled
Build and test / Build all and test on ubuntu-linux (push) Has been cancelled
Build and test / Build and test on linux with boringcrypto (push) Has been cancelled
Build and test / Build and test on linux with pkcs11 (push) Has been cancelled
Build and test / Build and test on macos-latest (push) Has been cancelled
Build and test / Build and test on windows-latest (push) Has been cancelled
Make stats a server we can reconfigure and start/stop (#1670)
2026-04-27 12:25:24 -05:00

410 lines
9.4 KiB
Go

package nebula
import (
"context"
"io"
"log/slog"
"net"
"strconv"
"testing"
"time"
"github.com/slackhq/nebula/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newTestStatsServer(t *testing.T) (*statsServer, *config.C) {
t.Helper()
l := slog.New(slog.DiscardHandler)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
return &statsServer{
l: l,
ctx: ctx,
}, config.NewC(l)
}
func setStatsConfig(c *config.C, m map[string]any) {
c.Settings["stats"] = m
}
func currentRuntime(s *statsServer) *statsRuntime {
s.runMu.Lock()
defer s.runMu.Unlock()
return s.run
}
func TestStatsServer_reload_initial_disabled(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{"type": "none"})
require.NoError(t, s.reload(c, true))
assert.False(t, s.enabled.Load())
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_reload_initial_invalidInterval(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "graphite",
"host": "127.0.0.1:0",
"prefix": "test",
})
err := s.reload(c, true)
require.Error(t, err)
assert.False(t, s.enabled.Load())
}
func TestStatsServer_reload_initial_unknownType(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "carbon",
"interval": "1s",
})
err := s.reload(c, true)
require.Error(t, err)
assert.False(t, s.enabled.Load())
}
func TestStatsServer_reload_unchanged_noOp(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{"type": "none"})
require.NoError(t, s.reload(c, true))
require.NoError(t, s.reload(c, false))
assert.False(t, s.enabled.Load())
}
func TestStatsServer_reload_initial_graphite(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "graphite",
"interval": "1s",
"protocol": "tcp",
"host": "127.0.0.1:2003",
"prefix": "test",
})
require.NoError(t, s.reload(c, true))
assert.True(t, s.enabled.Load())
// reload only records config; Start builds the runtime.
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_reload_initial_prometheus(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:0",
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
assert.True(t, s.enabled.Load())
// reload only records config; Start builds the runtime.
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_Start_graphite_blocksUntilStop(t *testing.T) {
sink := newGraphiteSink(t)
defer sink.Close()
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "graphite",
"interval": "1s",
"protocol": "tcp",
"host": sink.Addr(),
"prefix": "test",
})
require.NoError(t, s.reload(c, true))
done := make(chan struct{})
go func() {
s.Start()
close(done)
}()
// Wait for Start to publish runtime state.
waitFor(t, func() bool { return currentRuntime(s) != nil })
rt := currentRuntime(s)
require.NotNil(t, rt)
assert.Nil(t, rt.listener, "graphite has no listener")
s.Stop()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("graphite Start did not return after Stop")
}
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_StartStop_lifecycle(t *testing.T) {
port := freeTCPPort(t)
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port,
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
done := make(chan struct{})
go func() {
s.Start()
close(done)
}()
waitForListening(t, "127.0.0.1:"+port)
rt := currentRuntime(s)
require.NotNil(t, rt)
require.NotNil(t, rt.listener)
s.Stop()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Start did not return after Stop")
}
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_reload_disable_stopsRunningRuntime(t *testing.T) {
port := freeTCPPort(t)
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port,
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
done := make(chan struct{})
go func() {
s.Start()
close(done)
}()
waitForListening(t, "127.0.0.1:"+port)
setStatsConfig(c, map[string]any{"type": "none"})
require.NoError(t, s.reload(c, false))
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Start did not return after reload disabled stats")
}
assert.False(t, s.enabled.Load())
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_reload_changeListener_restartsListener(t *testing.T) {
port1 := freeTCPPort(t)
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port1,
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
firstDone := make(chan struct{})
go func() {
s.Start()
close(firstDone)
}()
waitForListening(t, "127.0.0.1:"+port1)
first := currentRuntime(s)
require.NotNil(t, first)
port2 := freeTCPPort(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port2,
"path": "/metrics",
})
require.NoError(t, s.reload(c, false))
select {
case <-firstDone:
case <-time.After(5 * time.Second):
t.Fatal("old Start did not return after reload")
}
waitForListening(t, "127.0.0.1:"+port2)
second := currentRuntime(s)
require.NotNil(t, second)
assert.NotSame(t, first, second, "expected a new runtime after listen address change")
s.Stop()
}
func TestStatsServer_Stop_beforeStart_doesNotBlock(t *testing.T) {
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:0",
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
stopped := make(chan struct{})
go func() {
s.Stop()
close(stopped)
}()
select {
case <-stopped:
case <-time.After(time.Second):
t.Fatal("Stop hung with no runtime started")
}
}
func TestStatsServer_configTest_validatesWithoutSpawning(t *testing.T) {
s, c := newTestStatsServer(t)
s.configTest = true
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:0",
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
s.Start()
assert.Nil(t, currentRuntime(s))
}
func TestStatsServer_ctxCancel_unblocksStart(t *testing.T) {
// Ensures ctx cancellation alone (no explicit Stop) tears down both
// graphite and prom Start invocations.
port := freeTCPPort(t)
l := slog.New(slog.DiscardHandler)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &statsServer{l: l, ctx: ctx}
c := config.NewC(l)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port,
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
done := make(chan struct{})
go func() {
s.Start()
close(done)
}()
waitForListening(t, "127.0.0.1:"+port)
cancel()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Start did not return after ctx cancel")
}
}
func TestStatsServer_listenerBindFailure_sameCfgReloadRetries(t *testing.T) {
// Hold the port so ListenAndServe will fail on first Start.
blocker, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := strconv.Itoa(blocker.Addr().(*net.TCPAddr).Port)
s, c := newTestStatsServer(t)
setStatsConfig(c, map[string]any{
"type": "prometheus",
"interval": "1s",
"listen": "127.0.0.1:" + port,
"path": "/metrics",
})
require.NoError(t, s.reload(c, true))
done := make(chan struct{})
go func() {
s.Start()
close(done)
}()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Start did not return after bind failure")
}
// Bind failure should have dropped the cached config so a same-cfg
// SIGHUP can retry.
s.runMu.Lock()
cfgAfterFailure := s.runCfg
s.runMu.Unlock()
assert.Nil(t, cfgAfterFailure)
// Free the port and reload with the same config; Start should fire again.
require.NoError(t, blocker.Close())
require.NoError(t, s.reload(c, false))
waitForListening(t, "127.0.0.1:"+port)
require.NotNil(t, currentRuntime(s))
s.Stop()
}
func waitForListening(t *testing.T, addr string) {
t.Helper()
waitFor(t, func() bool {
conn, err := net.DialTimeout("tcp", addr, 200*time.Millisecond)
if err != nil {
return false
}
_ = conn.Close()
return true
})
}
// graphiteSink is a minimal TCP accept-and-discard server so graphite.Once
// calls in tests don't spam error logs or wedge on connection refused.
type graphiteSink struct {
ln net.Listener
}
func newGraphiteSink(t *testing.T) *graphiteSink {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
g := &graphiteSink{ln: ln}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func(c net.Conn) {
_, _ = io.Copy(io.Discard, c)
_ = c.Close()
}(conn)
}
}()
return g
}
func (g *graphiteSink) Addr() string { return g.ln.Addr().String() }
func (g *graphiteSink) Close() { _ = g.ln.Close() }
func freeTCPPort(t *testing.T) string {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := ln.Addr().(*net.TCPAddr).Port
require.NoError(t, ln.Close())
return strconv.Itoa(port)
}