C生产者 - 消费者使用PThreads
问题描述:
我正在研究一个问题,我正在实现一个模仿生产者 - 消费者范例的程序。当我只有一个生产者和一个消费者时,我使用的代码有效,但当我添加另一个生产者和另一个消费者时,它不起作用。C生产者 - 消费者使用PThreads
我花了一段时间在这个,似乎无法弄清楚为什么我得到错误Synchronization Error: Producer x Just overwrote x from Slot x
。我通过各种测试跟踪了这个问题,问题在于生产者在注意到另一个生产者处于关键部分时没有被阻止。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
void *producer (void *) ;
void *consumer(void *) ;
sem_t empty, full, mutex ;
int buffer[10] /*note extra long space!*/ ;
int ID[10] ;
int in = 0 ; int out = 0 ;
int BUFFER_SIZE = 10 ;
int nextProduced = 0 ;
main() {
int i ;
pthread_t TID[10] ;
sem_init(&empty, 0, 10) ;
sem_init(&full, 0, 0) ;
sem_init(&mutex, 0, 1) ;
for(i = 0; i < 10; i++) {
ID[i] = i ;
buffer[i] = -1 ;
}
//for(i = 0; i < 5000; i += 2) {
pthread_create(&TID[0], NULL, producer, (void *) &ID[0]) ;
printf("Producer ID = %d created!\n", 0) ;
pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]) ;
printf("Consumer ID = %d created!\n", 1) ;
pthread_create(&TID[2], NULL, producer, (void *) &ID[2]) ;
printf("Producer ID = %d created!\n", 2) ;
pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]) ;
printf("Consumer ID = %d created!\n", 3) ;
//}
for(i = 0; i < 10 ; i++) {
pthread_join(TID[i], NULL) ;
}
}
void *producer(void *Boo) {
int *ptr;
int ID;
ptr = (int *) Boo;
ID = *ptr;
while (1) {
nextProduced++; //Producing Integers
/* Check to see if Overwriting unread slot */
sem_wait(&empty);
sem_wait(&mutex);
if (buffer[in] != -1) {
printf("Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in);
exit(0);
}
/* Looks like we are OK */
buffer[in] = nextProduced;
printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in);
in = (in + 1) % BUFFER_SIZE;
printf("incremented in!\n");
sem_post(&mutex);
sem_post(&full);
}
}
void *consumer (void *Boo) {
static int nextConsumed = 0 ;
int *ptr ;
int ID ;
ptr = (int *) Boo ;
ID = *ptr ;
while (1) {
sem_wait(&full);
sem_wait(&mutex);
nextConsumed = buffer[out];
/*Check to make sure we did not read from an empty slot*/
if (nextConsumed == -1) {
printf("Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out) ;
exit(0) ;
}
/* We must be OK */
printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out) ;
buffer[out] = -1 ;
out = (out + 1) % BUFFER_SIZE;
sem_post(&mutex);
sem_post(&empty);
}
}
输出继电器:
Producer ID = 0 created!
Producer 0. Put 1 in slot 0
Consumer ID = 1 created!
incremented in!
Consumer 1 Just consumed item 1 from slot 0
Producer ID = 2 created!
Producer 0. Put 2 in slot 1
Synchronization Error: Producer 2 Just overwrote 2 from Slot 1
Consumer 1 Just consumed item 2 from slot 1
Consumer ID = 3 created!
incremented in!
Consumer 3 Just consumed item 2 from slot 1
Synch Error: Consumer 1 Just Read from empty slot 2
Producer 0. Put 4 in slot 2
正如你所看到的,生产者0设法把2插槽1中。然而,生产者0之前可以增加in
,制片人2次尝试将数据写入插槽1因为in
未增加。
出于某种原因,似乎我的sem_waits()
不起作用。有人可以帮我吗?
答
我重写了你的代码在我的系统上运行,它似乎工作正常。
具体变化:因为我在OSX上,所以我需要从sem_init()
更改为sem_open()
和sem_unlink()
;为了适应这种变化和一般的代码,我添加了一个interupt hander,这样打字^ C将使消费者和生产者完成并允许pthread_join()
和任何下面的清理代码运行;你会将进程的数量与看起来独立的缓冲区数量相联系(请参阅你的ID和缓冲区初始化代码) - 我把它们分开了;在互斥体内移动nextProduced++
;各种随机风格调整:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <signal.h>
#define MAX_THREADS 5
#define BUFFER_SIZE 10
sem_t *empty, *full, *mutex;
int buffer[BUFFER_SIZE];
int in = 0, out = 0;
static volatile int keepRunning = 1;
void intHandler(int dummy) {
keepRunning = 0;
}
void *producer(void * id_ptr) {
int ID = *((int *) id_ptr);
static int nextProduced = 0;
while (keepRunning) {
(void) sem_wait(empty);
(void) sem_wait(mutex);
/* Check to see if Overwriting unread slot */
if (buffer[in] != -1) {
fprintf(stderr, "Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in);
exit(1);
}
nextProduced++; // Producing Integers
/* Looks like we are OK */
buffer[in] = nextProduced;
printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in);
in = (in + 1) % BUFFER_SIZE;
printf("incremented in!\n");
(void) sem_post(mutex);
(void) sem_post(full);
}
return NULL;
}
void *consumer (void *id_ptr) {
int ID = *((int *) id_ptr);
static int nextConsumed = 0;
while (keepRunning) {
(void) sem_wait(full);
(void) sem_wait(mutex);
nextConsumed = buffer[out];
/* Check to make sure we did not read from an empty slot */
if (nextConsumed == -1) {
fprintf(stderr, "Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out);
exit(1);
}
/* We must be OK */
printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out);
buffer[out] = -1;
out = (out + 1) % BUFFER_SIZE;
printf("incremented out!\n");
(void) sem_post(mutex);
(void) sem_post(empty);
}
return NULL;
}
int main() {
int ID[MAX_THREADS];
pthread_t TID[MAX_THREADS];
empty = sem_open("/empty", O_CREAT, 0644, 10);
full = sem_open("/full", O_CREAT, 0644, 0);
mutex = sem_open("/mutex", O_CREAT, 0644, 1);
signal(SIGINT, intHandler);
for (int i = 0; i < MAX_THREADS; i++) {
ID[i] = i;
}
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer[i] = -1;
}
pthread_create(&TID[0], NULL, producer, (void *) &ID[0]);
printf("Producer ID = %d created!\n", 0);
pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]);
printf("Consumer ID = %d created!\n", 1);
pthread_create(&TID[2], NULL, producer, (void *) &ID[2]);
printf("Producer ID = %d created!\n", 2);
pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]);
printf("Consumer ID = %d created!\n", 3);
for (int i = 0; i < 4; i++) {
pthread_join(TID[i], NULL);
}
(void) sem_unlink("/empty");
(void) sem_unlink("/full");
(void) sem_unlink("/mutex");
return 0;
}
输出
> ./a.out
Producer ID = 0 created!
Producer 0. Put 1 in slot 0
Consumer ID = 1 created!
incremented in!
Producer ID = 2 created!
Producer 0. Put 2 in slot 1
incremented in!
Producer 2. Put 3 in slot 2
incremented in!
Consumer ID = 3 created!
Producer 0. Put 4 in slot 3
incremented in!
Consumer 1 Just consumed item 1 from slot 0
incremented out!
Consumer 3 Just consumed item 2 from slot 1
incremented out!
Producer 2. Put 5 in slot 4
incremented in!
Producer 0. Put 6 in slot 5
incremented in!
Consumer 1 Just consumed item 3 from slot 2
incremented out!
Consumer 3 Just consumed item 4 from slot 3
incremented out!
Producer 2. Put 7 in slot 6
incremented in!
Producer 0. Put 8 in slot 7
incremented in!
Consumer 1 Just consumed item 5 from slot 4
incremented out!
Consumer 3 Just consumed item 6 from slot 5
...