mirror of
https://github.com/slackhq/nebula.git
synced 2025-12-06 02:30:57 -08:00
Generic timerwheel (#804)
This commit is contained in:
parent
c177126ed0
commit
5278b6f926
8 changed files with 116 additions and 431 deletions
95
timeout.go
95
timeout.go
|
|
@ -1,17 +1,14 @@
|
|||
package nebula
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/slackhq/nebula/firewall"
|
||||
)
|
||||
|
||||
// How many timer objects should be cached
|
||||
const timerCacheMax = 50000
|
||||
|
||||
var emptyFWPacket = firewall.Packet{}
|
||||
|
||||
type TimerWheel struct {
|
||||
type TimerWheel[T any] struct {
|
||||
// Current tick
|
||||
current int
|
||||
|
||||
|
|
@ -26,31 +23,38 @@ type TimerWheel struct {
|
|||
wheelDuration time.Duration
|
||||
|
||||
// The actual wheel which is just a set of singly linked lists, head/tail pointers
|
||||
wheel []*TimeoutList
|
||||
wheel []*TimeoutList[T]
|
||||
|
||||
// Singly linked list of items that have timed out of the wheel
|
||||
expired *TimeoutList
|
||||
expired *TimeoutList[T]
|
||||
|
||||
// Item cache to avoid garbage collect
|
||||
itemCache *TimeoutItem
|
||||
itemCache *TimeoutItem[T]
|
||||
itemsCached int
|
||||
}
|
||||
|
||||
type LockingTimerWheel[T any] struct {
|
||||
m sync.Mutex
|
||||
t *TimerWheel[T]
|
||||
}
|
||||
|
||||
// TimeoutList Represents a tick in the wheel
|
||||
type TimeoutList struct {
|
||||
Head *TimeoutItem
|
||||
Tail *TimeoutItem
|
||||
type TimeoutList[T any] struct {
|
||||
Head *TimeoutItem[T]
|
||||
Tail *TimeoutItem[T]
|
||||
}
|
||||
|
||||
// TimeoutItem Represents an item within a tick
|
||||
type TimeoutItem struct {
|
||||
Packet firewall.Packet
|
||||
Next *TimeoutItem
|
||||
type TimeoutItem[T any] struct {
|
||||
Item T
|
||||
Next *TimeoutItem[T]
|
||||
}
|
||||
|
||||
// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
|
||||
// Purge must be called once per entry to actually remove anything
|
||||
func NewTimerWheel(min, max time.Duration) *TimerWheel {
|
||||
// The TimerWheel does not handle concurrency on its own.
|
||||
// Locks around access to it must be used if multiple routines are manipulating it.
|
||||
func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
|
||||
//TODO provide an error
|
||||
//if min >= max {
|
||||
// return nil
|
||||
|
|
@ -61,26 +65,31 @@ func NewTimerWheel(min, max time.Duration) *TimerWheel {
|
|||
// timeout
|
||||
wLen := int((max / min) + 2)
|
||||
|
||||
tw := TimerWheel{
|
||||
tw := TimerWheel[T]{
|
||||
wheelLen: wLen,
|
||||
wheel: make([]*TimeoutList, wLen),
|
||||
wheel: make([]*TimeoutList[T], wLen),
|
||||
tickDuration: min,
|
||||
wheelDuration: max,
|
||||
expired: &TimeoutList{},
|
||||
expired: &TimeoutList[T]{},
|
||||
}
|
||||
|
||||
for i := range tw.wheel {
|
||||
tw.wheel[i] = &TimeoutList{}
|
||||
tw.wheel[i] = &TimeoutList[T]{}
|
||||
}
|
||||
|
||||
return &tw
|
||||
}
|
||||
|
||||
// Add will add a firewall.Packet to the wheel in it's proper timeout
|
||||
func (tw *TimerWheel) Add(v firewall.Packet, timeout time.Duration) *TimeoutItem {
|
||||
// Check and see if we should progress the tick
|
||||
tw.advance(time.Now())
|
||||
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
|
||||
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
|
||||
return &LockingTimerWheel[T]{
|
||||
t: NewTimerWheel[T](min, max),
|
||||
}
|
||||
}
|
||||
|
||||
// Add will add an item to the wheel in its proper timeout.
|
||||
// Caller should Advance the wheel prior to ensure the proper slot is used.
|
||||
func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
|
||||
i := tw.findWheel(timeout)
|
||||
|
||||
// Try to fetch off the cache
|
||||
|
|
@ -90,11 +99,11 @@ func (tw *TimerWheel) Add(v firewall.Packet, timeout time.Duration) *TimeoutItem
|
|||
tw.itemsCached--
|
||||
ti.Next = nil
|
||||
} else {
|
||||
ti = &TimeoutItem{}
|
||||
ti = &TimeoutItem[T]{}
|
||||
}
|
||||
|
||||
// Relink and return
|
||||
ti.Packet = v
|
||||
ti.Item = v
|
||||
if tw.wheel[i].Tail == nil {
|
||||
tw.wheel[i].Head = ti
|
||||
tw.wheel[i].Tail = ti
|
||||
|
|
@ -106,9 +115,12 @@ func (tw *TimerWheel) Add(v firewall.Packet, timeout time.Duration) *TimeoutItem
|
|||
return ti
|
||||
}
|
||||
|
||||
func (tw *TimerWheel) Purge() (firewall.Packet, bool) {
|
||||
// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
|
||||
// If no item is available then an empty T is returned and the 2nd argument is false.
|
||||
func (tw *TimerWheel[T]) Purge() (T, bool) {
|
||||
if tw.expired.Head == nil {
|
||||
return emptyFWPacket, false
|
||||
var na T
|
||||
return na, false
|
||||
}
|
||||
|
||||
ti := tw.expired.Head
|
||||
|
|
@ -128,11 +140,11 @@ func (tw *TimerWheel) Purge() (firewall.Packet, bool) {
|
|||
tw.itemsCached++
|
||||
}
|
||||
|
||||
return ti.Packet, true
|
||||
return ti.Item, true
|
||||
}
|
||||
|
||||
// advance will move the wheel forward by proper number of ticks. The caller _should_ lock the wheel before calling this
|
||||
func (tw *TimerWheel) findWheel(timeout time.Duration) (i int) {
|
||||
// findWheel find the next position in the wheel for the provided timeout given the current tick
|
||||
func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
|
||||
if timeout < tw.tickDuration {
|
||||
// Can't track anything below the set resolution
|
||||
timeout = tw.tickDuration
|
||||
|
|
@ -154,8 +166,9 @@ func (tw *TimerWheel) findWheel(timeout time.Duration) (i int) {
|
|||
return tick
|
||||
}
|
||||
|
||||
// advance will lock and move the wheel forward by proper number of ticks.
|
||||
func (tw *TimerWheel) advance(now time.Time) {
|
||||
// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
|
||||
// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
|
||||
func (tw *TimerWheel[T]) Advance(now time.Time) {
|
||||
if tw.lastTick == nil {
|
||||
tw.lastTick = &now
|
||||
}
|
||||
|
|
@ -192,3 +205,21 @@ func (tw *TimerWheel) advance(now time.Time) {
|
|||
newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
|
||||
tw.lastTick = &newTick
|
||||
}
|
||||
|
||||
func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
|
||||
lw.m.Lock()
|
||||
defer lw.m.Unlock()
|
||||
return lw.t.Add(v, timeout)
|
||||
}
|
||||
|
||||
func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
|
||||
lw.m.Lock()
|
||||
defer lw.m.Unlock()
|
||||
return lw.t.Purge()
|
||||
}
|
||||
|
||||
func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
|
||||
lw.m.Lock()
|
||||
defer lw.m.Unlock()
|
||||
lw.t.Advance(now)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue