nebula/scheduler.go

84 lines
2.6 KiB
Go

package nebula
import (
"context"
"sync"
"time"
)
// Scheduler is an allocation-conscious dispatch primitive for delayed work.
// Pending items are handed to time.AfterFunc, and ready items land on a worker
// channel for centralized dispatch in fire-time order.
//
// Pick a Scheduler when fire timing matters (exact deadlines, no bucketing) or when the scheduling
// rate is uneven enough that idle CPU matters. Each fire is a runtime-spawned goroutine running the callback before
// delivering to the worker, which is fine at sparse rates but adds up at line rate.
//
// Pick a TimerWheel when scheduling is high-rate and uniform: its O(1) insert, internal item cache,
// and bucket-batched dispatch are cheaper at scale.
// The caller drives the tick loop (Advance/Purge) and pays for fires at bucket boundaries rather than exact deadlines.
type Scheduler[T any] struct {
queue chan T
pool sync.Pool
}
type schedItem[T any] struct {
val T
ctx context.Context
s *Scheduler[T]
timer *time.Timer
fire func()
}
// NewScheduler builds a Scheduler whose worker channel is sized to queueSize.
// The buffer absorbs bursts of timers firing close together without
// blocking the runtime's callback goroutines on the worker.
func NewScheduler[T any](queueSize int) *Scheduler[T] {
s := &Scheduler[T]{
queue: make(chan T, queueSize),
}
s.pool.New = func() any {
si := &schedItem[T]{s: s}
// fire is allocated exactly once per pool-resident item.
// The closure captures only `si`, which stays stable for the item's lifetime.
si.fire = func() {
select {
case si.s.queue <- si.val:
case <-si.ctx.Done():
}
var zero T
si.val = zero
si.ctx = nil
si.s.pool.Put(si)
}
return si
}
return s
}
// Schedule arranges item to be delivered to the worker after delay.
// The runtime's timer heap handles the wait, so the scheduler itself burns no CPU while idle.
// The callback observes ctx: if ctx is cancelled before the timer fires, the item is dropped instead of queued.
func (s *Scheduler[T]) Schedule(ctx context.Context, item T, delay time.Duration) {
si := s.pool.Get().(*schedItem[T])
si.val = item
si.ctx = ctx
if si.timer == nil {
si.timer = time.AfterFunc(delay, si.fire)
} else {
si.timer.Reset(delay)
}
}
// Run drains the worker queue, calling fn for each item. Returns when ctx is cancelled.
// Tests that want deterministic timing should drive the queue directly rather than going through Schedule + Run.
func (s *Scheduler[T]) Run(ctx context.Context, fn func(T)) {
for {
select {
case <-ctx.Done():
return
case item := <-s.queue:
fn(item)
}
}
}