Golang中通过sync.Mutex和sync.Cond结合container/list实现并发队列,确保多Goroutine下安全存取。

Golang中实现并发队列,核心在于确保多个Goroutine在同时存取数据时,不会发生竞态条件导致数据损坏或逻辑错误。我们通常会借助Go语言自带的并发原语,如
sync.Mutex
结合标准库的
container/list
,或者更Go风格的
chan
(通道)来实现。无论是哪种方式,目标都是提供一个线程安全的数据结构,让生产者和消费者可以安全地进行入队和出队操作。
一个基于
sync.Mutex
和
container/list
的并发队列实现,能提供更灵活的容量控制和阻塞/非阻塞行为,特别是当我们需要一个能明确判断队列空满状态,并能根据需要进行等待或立即返回的队列时。
package mainimport ( "container/list" "fmt" "sync" "time")// ConcurrentQueue 定义了一个并发安全的队列type ConcurrentQueue struct { queue *list.List // 实际存储数据的链表 mutex sync.Mutex // 保护队列的互斥锁 cond *sync.Cond // 条件变量,用于等待和通知 cap int // 队列容量,0表示无限制}// NewConcurrentQueue 创建一个新的并发队列// capacity为0表示无限制容量,否则为固定容量func NewConcurrentQueue(capacity int) *ConcurrentQueue { q := &ConcurrentQueue{ queue: list.New(), cap: capacity, } q.cond = sync.NewCond(&q.mutex) // 条件变量需要一个Locker return q}// Enqueue 将元素加入队列func (q *ConcurrentQueue) Enqueue(item interface{}) error { q.mutex.Lock() defer q.mutex.Unlock() // 如果有容量限制,且队列已满,则等待 for q.cap > 0 && q.queue.Len() >= q.cap { q.cond.Wait() // 释放锁并等待,被唤醒后重新获取锁 } q.queue.PushBack(item) q.cond.Signal() // 通知一个等待的消费者 return nil}// Dequeue 从队列中取出元素func (q *ConcurrentQueue) Dequeue() (interface{}, error) { q.mutex.Lock() defer q.mutex.Unlock() // 如果队列为空,则等待 for q.queue.Len() == 0 { q.cond.Wait() // 释放锁并等待,被唤醒后重新获取锁 } element := q.queue.Front() q.queue.Remove(element) q.cond.Signal() // 通知一个等待的生产者(如果队列曾满而等待) return element.Value, nil}// TryEnqueue 尝试将元素加入队列,如果队列满则立即返回错误func (q *ConcurrentQueue) TryEnqueue(item interface{}) error { q.mutex.Lock() defer q.mutex.Unlock() if q.cap > 0 && q.queue.Len() >= q.cap { return fmt.Errorf("queue is full") } q.queue.PushBack(item) q.cond.Signal() return nil}// TryDequeue 尝试从队列中取出元素,如果队列空则立即返回错误func (q *ConcurrentQueue) TryDequeue() (interface{}, error) { q.mutex.Lock() defer q.mutex.Unlock() if q.queue.Len() == 0 { return nil, fmt.Errorf("queue is empty") } element := q.queue.Front() q.queue.Remove(element) q.cond.Signal() return element.Value, nil}// Len 返回队列当前长度func (q *ConcurrentQueue) Len() int { q.mutex.Lock() defer q.mutex.Unlock() return q.queue.Len()}// IsEmpty 判断队列是否为空func (q *ConcurrentQueue) IsEmpty() bool { return q.Len() == 0}func main() { queue := NewConcurrentQueue(5) // 创建一个容量为5的并发队列 var wg sync.WaitGroup // 生产者 for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() item := fmt.Sprintf("数据-%d", id) if err := queue.Enqueue(item); err != nil { fmt.Printf("生产者%d: 尝试入队 %s 失败: %vn", id, item, err) } else { fmt.Printf("生产者%d: 入队 %s, 当前队列长度: %dn", id, item, queue.Len()) } }(i) } // 消费者 for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() // 稍微延迟一下,让生产者有机会先生产一些数据 time.Sleep(time.Millisecond * 50) item, err := queue.Dequeue() if err != nil { fmt.Printf("消费者%d: 尝试出队失败: %vn", id, err) } else { fmt.Printf("消费者%d: 出队 %v, 当前队列长度: %dn",
以上就是Golang并发队列实现与操作示例的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1403092.html
微信扫一扫
支付宝扫一扫