最近遇到一个很秀的并发问题:两个线程,一个生产者线程一个消费者线程,如果在消费者里面去生产会有什么问题(生产者和消费者是对一个先进先出有长度限制的队列进行操作)?
看你用的什么队列,一般受内存限制,如能正常生产、消费,不用考虑长度限制。消费者里面可以再生产,但要避免死循环,一般是消费处理失败的情况下,且有限次数的重新生产。
(a) 指针类型(Pointer)
(b) 数组类型
(c) 结构类型 (struct)
(d) Channel 类型
(e) 函数类型
(f) 切片类型
(g) 接口类型(interface)
(h) Map 类型
这些类型大多是python所没有的,可以找到数组、结构、函数、切片、Map等相近的用法。
生产者线程和消费者线程对同一个先进先出的有长度限制的队列进行操作的情况下,如果在消费者线程中去生产,可能会出现以下问题:
针对问题1,可以在生产者线程生产之前加锁,当队列满时就释放锁并阻塞当前线程,等待队列有空闲位置时再继续生产。当消费者线程消费完一个元素时,可以唤醒一个生产者线程,继续进行生产。这里需要用到条件变量(cond),具体实现代码如下:
package main
import (
"fmt"
"sync"
)
type Queue struct {
data []int
size int
lock *sync.Mutex
cond *sync.Cond
}
func NewQueue(size int) *Queue {
q := &Queue{
data: make([]int, 0, size),
size: size,
lock: &sync.Mutex{},
}
q.cond = sync.NewCond(q.lock)
return q
}
func (q *Queue) Push(n int) {
q.lock.Lock()
defer q.lock.Unlock()
for len(q.data) == q.size {
q.cond.Wait()
}
q.data = append(q.data, n)
q.cond.Signal()
}
func (q *Queue) Pop() int {
q.lock.Lock()
defer q.lock.Unlock()
for len(q.data) == 0 {
q.cond.Wait()
}
n := q.data[0]
q.data = q.data[1:]
q.cond.Signal()
return n
}
func main() {
q := NewQueue(10)
var wg sync.WaitGroup
wg.Add(2)
go func() {
for i := 0; i < 20; i++ {
q.Push(i)
fmt.Printf("生产者生产了: %d\n", i)
}
wg.Done()
}()
go func() {
for i := 0; i < 20; i++ {
n := q.Pop()
fmt.Printf("---消费者消费了: %d\n", n)
}
wg.Done()
}()
wg.Wait()
}
针对问题2,可以在对队列进行操作时加锁,防止多个线程同时访问。在上面的代码中,可以看到在Push和Pop方法中都有加锁操作,保证了线程安全。
在实际场景中,可以根据具体业务需求提出不同的优化方案。比如如果队列长度不变,可以使用无锁队列实现,提高并发性能。如果队列长度比较大,可以采用多个消费者和生产者,提高系统吞吐量。