mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-09 21:51:39 -07:00
67 lines
1.3 KiB
Go
67 lines
1.3 KiB
Go
package firewall
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// ConntrackCache is used as a local routine cache to know if a given flow
|
|
// has been seen in the conntrack table.
|
|
type ConntrackCache map[Packet]struct{}
|
|
|
|
type ConntrackCacheTicker struct {
|
|
cacheV uint64
|
|
cacheTick atomic.Uint64
|
|
|
|
l *slog.Logger
|
|
cache ConntrackCache
|
|
}
|
|
|
|
func NewConntrackCacheTicker(ctx context.Context, l *slog.Logger, d time.Duration) *ConntrackCacheTicker {
|
|
if d == 0 {
|
|
return nil
|
|
}
|
|
|
|
c := &ConntrackCacheTicker{
|
|
l: l,
|
|
cache: ConntrackCache{},
|
|
}
|
|
|
|
go c.tick(ctx, d)
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *ConntrackCacheTicker) tick(ctx context.Context, d time.Duration) {
|
|
t := time.NewTicker(d)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
c.cacheTick.Add(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get checks if the cache ticker has moved to the next version before returning
|
|
// the map. If it has moved, we reset the map.
|
|
func (c *ConntrackCacheTicker) Get() ConntrackCache {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
if tick := c.cacheTick.Load(); tick != c.cacheV {
|
|
c.cacheV = tick
|
|
if ll := len(c.cache); ll > 0 {
|
|
if c.l.Enabled(context.Background(), slog.LevelDebug) {
|
|
c.l.Debug("resetting conntrack cache", "len", ll)
|
|
}
|
|
c.cache = make(ConntrackCache, ll)
|
|
}
|
|
}
|
|
|
|
return c.cache
|
|
}
|