生产者与消费者是一个广义的概念,可以代表一类具有相同属性的进程。
生产者和消费者进程共享一个大小固定的缓冲区,其中,一个或多个生产者生产数据,并将生产的数据存入缓冲区,并有一个消费者从缓冲区中取数据。
假设缓冲区的大小为n(存储单元的个数),它可以被生产者和消费者循环使用
分别设置两个指针in和out,指向生产者将存放数据的存储单元和消费者将取数据的存储单元,如图
生产者-消费者执行流程图
变量定义、所需函数及主程序如下:
typedef struct {int value;struct process_control_block * list;
} semaphore;int in = 0;
int out = 0;
semaphore mutex, empty, full;
mutex->value = 1; //由于缓冲池是一种临界资源,因此使用互斥型信号量
empty->value = n; //代表空缓冲区的数量,最初存储单元均为空
full->value = 0; //代表满缓冲区的数量,最初存储单元均为空void producer(); //生产者进程
void consumer(); //消费者进程void main() {cobegin //并发执行producer(); consumer();coend
}
wait() 和 signal() 描述如下:
wait(semaphore * S) {S->value--;if(S->value < 0) block(S->list);//S->value == 0时代表刚好取走最后一个资源
}signal(semaphore * S) {S->value++;if(S->value <= 0) wakeup(S->list);//S->value == 0时代表刚好还有一个进程被阻塞
}
生产者进程如下:
void producer() {do {//produce an item in nextpwait(&empty); //判断是否有空存储单元wait(&mutex); //判断是否可用缓冲池buffer[in] = nextp;in = (in + 1) % n;signal(&mutex); //唤醒一个使用者signal(&full); //唤醒一个消费者} while(true);
}
消费者进程如下:
void producer() {do {wait(&full); //判断是否有空存储单元wait(&mutex); //判断是否可用缓冲池nextc = buffer[out];out = (out + 1) % n;signal(&mutex); //唤醒一个使用者signal(&empty); //唤醒一个消费者//consume the item in nextc} while(true);
}
1)wait(&mutex) 必须放在 wait(&full) 后面,避免抢占到了缓冲池的使用权,结果发现没有空存储单元可用,从而导致资源浪费。
2)mutex 是互斥型信号量,因此对其的 wait 和 signal 操作必须成对出现。
3)进程一旦被阻塞,不管这个时间片是否用完,都立即进入下一个时间片。
t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8 | t9 | |
running | c1: wait(full) | p1: item->nextp p1: wait(empty) p1: wait(mutex) | p2: item->nextp p2: wait(empty) p2: wait(mutex) | p3: item->nextp p3: wait(empty) | p1: CS p1: signal(mutex) p1: signal(full) finish | p2: CS p2: signal(mutex) p2: signal(full) finish | c1: wait(mutex) c1: CS | c1: signal(mutex) c1: signal(empty) finish | p3: wait(mutex) p3: CS p3: signal(mutex) p3: signal(full) finish |
full->value | 0-1=-1(<0?) | -1+1=0(<=0?) | 0+1=1(<=0?) | 1+1=2(<=0?) | |||||
full->list | {c1} | wakeup(full->list) | {} | {} | |||||
empty->value | 2-1=1(<0?) | 1-1=0(<0?) | 0-1=-1(<0?) | -1+1=0(<=0?) | |||||
empty->list | {} | {} | {p3} | wakeup(empty->list) | |||||
mutex->value | 1-1=0(<0?) | 0-1=-1(<0?) | -1+1=0(<=0?) | 0+1=1(<=0?) | 1-1=0(<0?) | 0+1=1(<0?) | 1-1=0(<0?) 0+1=0(<=0?) | ||
mutex->list | {} | {p2} | wakeup(mutex->list) | {} | {} | {} | {} | ||
ready | {p1, p2, p3} | {p2, p3, p1} | {p3, p1} | {p1} | {p2, c1} | {c1} | {} | {p3} | {} |