diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 51270fc..0000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,84 +0,0 @@ -package queue - -import ( - "container/list" - "sync" - "sync/atomic" -) - -type Queue[T any] interface { - Push(v T) - Pull() (v T, ok bool) - Close() -} - -func NewLinkedQueue[T any]() (p Queue[T]) { - p = &LinkedListPacketQueue[T]{ - queue: list.New(), - cond: sync.Cond{L: new(sync.Mutex)}, - } - return p -} - -type LinkedListPacketQueue[T any] struct { - queue *list.List - closed bool - cond sync.Cond -} - -func (p *LinkedListPacketQueue[T]) Push(v T) { - p.cond.L.Lock() - if !p.closed { - p.queue.PushBack(v) - } - p.cond.Signal() - p.cond.L.Unlock() -} - -func (p *LinkedListPacketQueue[T]) Pull() (v T, ok bool) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - for p.queue.Front() == nil && !p.closed { - p.cond.Wait() - } - if p.closed { - return - } - v = p.queue.Remove(p.queue.Front()).(T) - ok = true - return -} - -func (p *LinkedListPacketQueue[T]) Close() { - p.cond.L.Lock() - p.closed = true - p.cond.Broadcast() - p.cond.L.Unlock() -} - -type ChannelPacketQueue[T any] struct { - c chan T - closed atomic.Bool -} - -func (c ChannelPacketQueue[T]) Push(v T) { - if c.closed.Load() { - return - } - select { - case c.c <- v: - default: - c.closed.Store(true) - } -} - -func (c ChannelPacketQueue[T]) Pull() (v T, ok bool) { - if !c.closed.Load() { - v, ok = <-c.c - } - return -} - -func (c ChannelPacketQueue[T]) Close() { - c.closed.Store(true) -} diff --git a/net/queue/queue.go b/net/queue/queue.go new file mode 100644 index 0000000..2e2a678 --- /dev/null +++ b/net/queue/queue.go @@ -0,0 +1,83 @@ +package queue + +import ( + "container/list" + "sync" +) + +type Queue[T any] interface { + Push(v T) (ok bool) + Pull() (v T, ok bool) + Close() +} + +func NewLinkedQueue[T any]() (q Queue[T]) { + return &LinkedListQueue[T]{ + queue: list.New(), + cond: sync.Cond{L: new(sync.Mutex)}, + } +} + +type LinkedListQueue[T any] struct { + queue *list.List + closed bool + cond sync.Cond +} + +func (p *LinkedListQueue[T]) Push(v T) bool { + p.cond.L.Lock() + if p.closed { + panic("push on closed queue") + } + p.queue.PushBack(v) + p.cond.Signal() + p.cond.L.Unlock() + return true +} + +func (p *LinkedListQueue[T]) Pull() (v T, ok bool) { + p.cond.L.Lock() + for { + if elem := p.queue.Front(); elem != nil { + v = p.queue.Remove(elem).(T) + ok = true + break + } else if p.closed { + break + } + p.cond.Wait() + } + p.cond.L.Unlock() + return +} + +func (p *LinkedListQueue[T]) Close() { + p.cond.L.Lock() + p.closed = true + p.cond.Broadcast() + p.cond.L.Unlock() +} + +func NewChannelQueue[T any](n int) (q Queue[T]) { + return make(ChannelQueue[T], n) +} + +type ChannelQueue[T any] chan T + +func (c ChannelQueue[T]) Push(v T) bool { + select { + case c <- v: + return true + default: + return false + } +} + +func (c ChannelQueue[T]) Pull() (v T, ok bool) { + v, ok = <-c + return +} + +func (c ChannelQueue[T]) Close() { + close(c) +}