From 75f09e77d93c508ace86119021206cde09fdc3d7 Mon Sep 17 00:00:00 2001 From: Tnze Date: Fri, 9 Dec 2022 22:35:00 +0800 Subject: [PATCH] move PacketQueue --- internal/queue/queue.go | 84 +++++++++++++++++++++++++++++++++++++++++ server/client.go | 81 +-------------------------------------- 2 files changed, 86 insertions(+), 79 deletions(-) create mode 100644 internal/queue/queue.go diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..51270fc --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,84 @@ +package queue + +import ( + "container/list" + "sync" + "sync/atomic" +) + +type Queue[T any] interface { + Push(v T) + Pull() (v T, ok bool) + Close() +} + +func NewLinkedQueue[T any]() (p Queue[T]) { + p = &LinkedListPacketQueue[T]{ + queue: list.New(), + cond: sync.Cond{L: new(sync.Mutex)}, + } + return p +} + +type LinkedListPacketQueue[T any] struct { + queue *list.List + closed bool + cond sync.Cond +} + +func (p *LinkedListPacketQueue[T]) Push(v T) { + p.cond.L.Lock() + if !p.closed { + p.queue.PushBack(v) + } + p.cond.Signal() + p.cond.L.Unlock() +} + +func (p *LinkedListPacketQueue[T]) Pull() (v T, ok bool) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + for p.queue.Front() == nil && !p.closed { + p.cond.Wait() + } + if p.closed { + return + } + v = p.queue.Remove(p.queue.Front()).(T) + ok = true + return +} + +func (p *LinkedListPacketQueue[T]) Close() { + p.cond.L.Lock() + p.closed = true + p.cond.Broadcast() + p.cond.L.Unlock() +} + +type ChannelPacketQueue[T any] struct { + c chan T + closed atomic.Bool +} + +func (c ChannelPacketQueue[T]) Push(v T) { + if c.closed.Load() { + return + } + select { + case c.c <- v: + default: + c.closed.Store(true) + } +} + +func (c ChannelPacketQueue[T]) Pull() (v T, ok bool) { + if !c.closed.Load() { + v, ok = <-c.c + } + return +} + +func (c ChannelPacketQueue[T]) Close() { + c.closed.Store(true) +} diff --git a/server/client.go b/server/client.go index 244d8bd..8b29207 100644 --- a/server/client.go +++ b/server/client.go @@ -1,11 +1,9 @@ package server import ( - "container/list" "strconv" - "sync" - "sync/atomic" + "github.com/Tnze/go-mc/internal/queue" pk "github.com/Tnze/go-mc/net/packet" ) @@ -29,79 +27,4 @@ func (s WritePacketError) Unwrap() error { return s.Err } -type PacketQueue interface { - Push(packet pk.Packet) - Pull() (packet pk.Packet, ok bool) - Close() -} - -func NewPacketQueue() (p PacketQueue) { - p = &LinkedListPacketQueue{ - queue: list.New(), - cond: sync.Cond{L: new(sync.Mutex)}, - } - return p -} - -type LinkedListPacketQueue struct { - queue *list.List - closed bool - cond sync.Cond -} - -func (p *LinkedListPacketQueue) Push(packet pk.Packet) { - p.cond.L.Lock() - if !p.closed { - p.queue.PushBack(packet) - } - p.cond.Signal() - p.cond.L.Unlock() -} - -func (p *LinkedListPacketQueue) Pull() (packet pk.Packet, ok bool) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - for p.queue.Front() == nil && !p.closed { - p.cond.Wait() - } - if p.closed { - return pk.Packet{}, false - } - packet = p.queue.Remove(p.queue.Front()).(pk.Packet) - ok = true - return -} - -func (p *LinkedListPacketQueue) Close() { - p.cond.L.Lock() - p.closed = true - p.cond.Broadcast() - p.cond.L.Unlock() -} - -type ChannelPacketQueue struct { - c chan pk.Packet - closed atomic.Bool -} - -func (c ChannelPacketQueue) Push(packet pk.Packet) { - if c.closed.Load() { - return - } - select { - case c.c <- packet: - default: - c.closed.Store(true) - } -} - -func (c ChannelPacketQueue) Pull() (packet pk.Packet, ok bool) { - if !c.closed.Load() { - packet, ok = <-c.c - } - return -} - -func (c ChannelPacketQueue) Close() { - c.closed.Store(true) -} +type PacketQueue = queue.Queue[pk.Packet]