mirror of
https://github.com/slackhq/nebula.git
synced 2026-05-10 22:22:27 -07:00
84 lines
2.6 KiB
Go
84 lines
2.6 KiB
Go
package nebula
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Scheduler is an allocation-conscious dispatch primitive for delayed work.
|
|
// Pending items are handed to time.AfterFunc, and ready items land on a worker
|
|
// channel for centralized dispatch in fire-time order.
|
|
//
|
|
// Pick a Scheduler when fire timing matters (exact deadlines, no bucketing) or when the scheduling
|
|
// rate is uneven enough that idle CPU matters. Each fire is a runtime-spawned goroutine running the callback before
|
|
// delivering to the worker, which is fine at sparse rates but adds up at line rate.
|
|
//
|
|
// Pick a TimerWheel when scheduling is high-rate and uniform: its O(1) insert, internal item cache,
|
|
// and bucket-batched dispatch are cheaper at scale.
|
|
// The caller drives the tick loop (Advance/Purge) and pays for fires at bucket boundaries rather than exact deadlines.
|
|
type Scheduler[T any] struct {
|
|
queue chan T
|
|
pool sync.Pool
|
|
}
|
|
|
|
type schedItem[T any] struct {
|
|
val T
|
|
ctx context.Context
|
|
s *Scheduler[T]
|
|
timer *time.Timer
|
|
fire func()
|
|
}
|
|
|
|
// NewScheduler builds a Scheduler whose worker channel is sized to queueSize.
|
|
// The buffer absorbs bursts of timers firing close together without
|
|
// blocking the runtime's callback goroutines on the worker.
|
|
func NewScheduler[T any](queueSize int) *Scheduler[T] {
|
|
s := &Scheduler[T]{
|
|
queue: make(chan T, queueSize),
|
|
}
|
|
s.pool.New = func() any {
|
|
si := &schedItem[T]{s: s}
|
|
// fire is allocated exactly once per pool-resident item.
|
|
// The closure captures only `si`, which stays stable for the item's lifetime.
|
|
si.fire = func() {
|
|
select {
|
|
case si.s.queue <- si.val:
|
|
case <-si.ctx.Done():
|
|
}
|
|
var zero T
|
|
si.val = zero
|
|
si.ctx = nil
|
|
si.s.pool.Put(si)
|
|
}
|
|
return si
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Schedule arranges item to be delivered to the worker after delay.
|
|
// The runtime's timer heap handles the wait, so the scheduler itself burns no CPU while idle.
|
|
// The callback observes ctx: if ctx is cancelled before the timer fires, the item is dropped instead of queued.
|
|
func (s *Scheduler[T]) Schedule(ctx context.Context, item T, delay time.Duration) {
|
|
si := s.pool.Get().(*schedItem[T])
|
|
si.val = item
|
|
si.ctx = ctx
|
|
if si.timer == nil {
|
|
si.timer = time.AfterFunc(delay, si.fire)
|
|
} else {
|
|
si.timer.Reset(delay)
|
|
}
|
|
}
|
|
|
|
// Run drains the worker queue, calling fn for each item. Returns when ctx is cancelled.
|
|
// Tests that want deterministic timing should drive the queue directly rather than going through Schedule + Run.
|
|
func (s *Scheduler[T]) Run(ctx context.Context, fn func(T)) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case item := <-s.queue:
|
|
fn(item)
|
|
}
|
|
}
|
|
}
|