linux通过有名信号量和消息队列机制实现三个线程间通信

以下是我的代码
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/msg.h>
#include<semaphore.h>

pthread_t s,r1,r2;
sem_t Write, Read1, Read2;
int flag1=0,flag2=0;
struct msgbuf{
long mtype ;
char mtext[100];
};

struct MyMsqids{
int id1;
int id2;
};
int mymsgrcv( int msqid, struct msgbuf *msgp, int msgsz, long mtype,int msgflg)
{

}
int mymsgsnd( int msqid, struct msgbuf *msgp, int msgsz, int msgflg )
{

}
void *receiver1(void *arg)
{
printf("receiver1 is running\n");
struct msgbuf buf;
int ret;
while(1)
{
sem_wait(&Read1);

    memset(&buf,0,sizeof(buf));
    sem_wait(&Write);
    ret=msgrcv(*(int *)arg,&buf,sizeof(buf.mtext),2,0);
    if (ret==-1)
    {
        perror("msgrcv error");
        sem_post(&Write);
        exit(1);
    }
            else if(!strncmp(buf.mtext,"exit",4))
    {
                    flag1++;
        printf("receiver 1 exit !!\n");
        sleep(1);
                   sem_post(&Write);
        pthread_cancel(r1);
        sleep(1);
    }
    else
    {
        printf("receiver 1 :%s\n",buf.mtext);
        sem_post(&Write);
        sleep(1);
    }
}

}

void *receiver2(void arg)
{
printf("receiver2 is running\n");
struct msgbuf buf;
int ret;
while(1)
{
sem_wait(&Read2);
memset(&buf,0,sizeof(buf));
sem_wait(&Write);
ret=msgrcv(
(int *)arg,&buf,sizeof(buf.mtext),2,0);
if (ret==-1)
{
printf("msgrcv error");
sem_post(&Write);
exit(1);
}
else if(!strncmp(buf.mtext,"exit",4))
{
flag2++;
printf("receiver 2 exit !!\n");
sleep(1);
sem_post(&Write);
pthread_cancel(r2);
sleep(1);
}
else
{
printf("receiver 2 :%s\n",buf.mtext);
sem_post(&Write);
sleep(1);
}
}
}

void *sender(void *arg)
{
printf("sender is running\n");
char t[100];
struct msgbuf buf;
int choose;
int ret, msqid1, msqid2;
int x=1;
struct MyMsqids *msgids;
msgids = (struct MyMsqids *)arg;
msqid1=(*msgids).id1;
msqid2=(*msgids).id2;
printf("sender-msqid1 : %d\n",msqid1);
printf("sender-msqid2 : %d\n",msqid2);

while(x==1)
{
            if(flag1+flag2==2)
    {
                    printf("both receiver exit, sender exit !!\n");
                    sem_post(&Write);
                    pthread_cancel(s);
                    x=0;
    }
            else if(flag1+flag2!=2)
            {
        memset(&buf,0,sizeof(buf));
        sem_wait(&Write);
        printf("please enter some words to send !!\n");
        scanf("%s",t);
        buf.mtype=2;

        strcat(buf.mtext,t);

                    choose=rand()%2;

                    if(flag2==1)choose=0;
                    else if(flag1==1)choose=1;

                    if(choose==0)
                    {
                           ret=msgsnd(msqid1,&buf,sizeof(buf.mtext),0);
                        printf("send to receiver 1 !!\n");
                        sem_post(&Read1);
                    }
                    else if(choose==1)
                    {
                        ret=msgsnd(msqid2,&buf,sizeof(buf.mtext),0);
                        printf("send to receiver 2 !!\n");
                        sem_post(&Read2);
                    }
        if(ret==-1)
        {
            perror("msgsnd error");
        }
        sem_post(&Write);
        sleep(1);
    }
    }

}
int main()
{
printf("main is running\n");
int ret,msqid1,msqid2;
struct MyMsqids msqids;
int x;

sem_init(&Write,0,1);
sem_getvalue(&Write,&x);
if(x==0) sem_post(&Write);
printf("---semaphore_Write: %d\n",x);
sem_init(&Read1,0,0);
sem_getvalue(&Read1,&x);
if(x==1) sem_wait(&Read1);
printf("---semaphore_Read1: %d\n",x);
sem_init(&Read2,0,0);
sem_getvalue(&Read2,&x);
if(x==1) sem_wait(&Read2);
printf("---semaphore_Read2: %d\n",x);
key_t key;
key=100;
msqid1=msgget(key,IPC_CREAT|0666);
key=101;
msqid2=msgget(key,IPC_CREAT|0666);
msqids.id1=msqid1;
msqids.id2=msqid2;
printf("main-msqid1 : %d\n",msqid1);
printf("main-msqid2 : %d\n",msqid2);
if(msqid1==-1)
{
    perror("msgid 1 error");
    printf("---msgid 1 error\n");
    exit(1);
}
if(msqid2==-1)
{
    perror("msgid 2 error");
    printf("---msgid 2 error\n");
    exit(1);
}

ret=pthread_create(&s,NULL,sender,(void *)&msqids);
if (ret!=0)
{
    perror("sender create error");
    printf("sender create error\n");
    exit(1);
}

ret=pthread_create(&r1,NULL,receiver1,(void *)&msqid1);
if (ret!=0)
{
    perror("receiver 1 create error");
    printf("---receiver 1 create error\n");
    exit(1);
}

ret=pthread_create(&r2,NULL,receiver2,(void *)&msqid2);
if (ret!=0)
{
    perror("receiver 2 create error");
            printf("receiver 2 create error\n");
    exit(1);
}

pthread_join(s,NULL);
pthread_join(r1,NULL);
pthread_join(r2,NULL);

printf("MsgQ Done !!\n");
msgctl(msqid1,IPC_RMID,NULL);
    msgctl(msqid2,IPC_RMID,NULL);
return 0;

}
但是要求说是不能使用自带的msgrcv函数和msgsnd函数,需要自己写.有谁能帮帮我吗

msgrcv.c可以修改为其它名称

#include <sys/msg.h>
#include <ipc_priv.h>
#include <sysdep-cancel.h>
ssize_t
__libc_msgrcv (int msqid, void *msgp, size_t msgsz, long int msgtyp,
               int msgflg)
{
#ifdef __ASSUME_DIRECT_SYSVIPC_SYSCALLS
  return SYSCALL_CANCEL (msgrcv, msqid, msgp, msgsz, msgtyp, msgflg);
#else
  return SYSCALL_CANCEL (ipc, IPCOP_msgrcv, msqid, msgsz, msgflg,
                         MSGRCV_ARGS (msgp, msgtyp));
#endif
}
weak_alias (__libc_msgrcv, msgrcv)

msgsnd.c可以修改为其它名称

#include <sys/msg.h>
#include <ipc_priv.h>
#include <sysdep-cancel.h>
int
__libc_msgsnd (int msqid, const void *msgp, size_t msgsz, int msgflg)
{
#ifdef __ASSUME_DIRECT_SYSVIPC_SYSCALLS
  return SYSCALL_CANCEL (msgsnd, msqid, msgp, msgsz, msgflg);
#else
  return SYSCALL_CANCEL (ipc, IPCOP_msgsnd, msqid, msgsz, msgflg,
                         msgp);
#endif
}
weak_alias (__libc_msgsnd, msgsnd)


1.c

#include "common.h"

/**
 * @brief Create mq and send message to receiver.
 * @return
 */
void *sender1() {
    int mq;
    struct msg_st buf;
    ssize_t bytes_read;

    /* open the mail queue */
    /*
Linux的消息队列(queue)实质上是一个链表, 它有消息队列标识符(queue ID). 
msgget创建一个新队列或打开一个存在的队列; msgsnd向队列末端添加一条新消息;
msgrcv从队列中取消息, 取消息是不一定遵循先进先出的, 也可以按消息的类型字段取消息.

      int msgget(key_t key, int msgflg); 获得消息队列的特征标识符。
函数需要两个参数,key 和 msgflg。
key 是该消息队列的全局唯一标识符,通过该标识符可以定位消息队列,对其进行相关操作。
msgflg 为权限控制,决定对消息队列进行的操作,
一般使用 0666 | IPC_CREAT,熟悉文件系统的可以知道,0666 表示文件所有者、同组用户和其他用户对于该消息队列的权限均为可读可写,
后面对常量 IPC_CREAT 进行的位运算作用是 “若该队列未被创建则创建它”。
(对于该函数可以简单理解为创建消息队列)
      */
    mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
    CHECK((key_t) -1 != mq);

    do {
        P(w_mutex);
        P(snd_dp);
        printf("sender1> ");
        V(rcv_dp);
        /*
        头文件:#include<stdio.h>
 
定义函数:int fflush(FILE * stream);
函数说明:fflush()会强迫将缓冲区内的数据写回参数stream指定的文件中,如果参数stream为NULL,fflush()会将所有打开的文件数据更新。
返回值:成功返回0,失败返回EOF,错误代码存于errno中。
fflush()也可用于标准输入(stdin)和标准输出(stdout),用来清空标准输入输出缓冲区。
stdin是standard input的缩写,即标准输入,一般是指键盘;标准输入缓冲区即是用来暂存从键盘输入的内容的缓冲区。
stdout是standard output 的缩写,即标准输出,一般是指显示器;标准输出缓冲区即是用来暂存将要显示的内容的缓冲区。
 
 
清空标准输出缓冲区,
刷新输出缓冲区,即将缓冲区的东西输出到屏幕上 
如果圆括号里是已写打开的文件的指针,则将输出缓冲区的内容写入该指针指向的文件,否则清除输出缓冲区。
这里的stdout是系统定义的标准输出文件指针,默认情况下指屏幕,那就是把缓冲区的内容写到屏幕上。
可是从代码中看不出缓冲区会有什么内容,所以它实际上没有起什么作用
        */
        fflush(stdout);
        // 接收终端输入,置入buf.buffer 
        fgets(buf.buffer, BUFSIZ, stdin);
        // 设置消息类别,接收线程据此判断消息来源 
        buf.message_type = snd_to_rcv1;
        /* send the message */
        P(empty);
        /*
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 消息队列的发送操作。success return 0;
函数需要四个参数,msqid,msgp,msgsz 和 msgflg。
msqid 是函数 msgget 的返回值,用于表示对哪一个消息队列进行操作。
msgp 是接收消息的指针,指向消息结构体 msg_st。
msgsz 是接收消息的大小,这里可以看作结构体 msg_st 中数据段的大小。
msgflg 同函数 msgget 中的 msgflg,这里可以直接使用 0。
        */
        CHECK(0 <= msgsnd(mq, (void*)&buf, MAX_SIZE, 0));
        V(full);
        V(w_mutex);
        printf("\n");
        /*
        strncmp函数为字符串比较函数,字符串大小的比较是以ASCII 码表上的顺序来决定,此顺序亦为字符的值。
        其函数声明为int strncmp ( const char * str1, const char * str2, size_t n );
        功能是把 str1 和 str2 进行比较,最多比较前 n 个字节,若str1与str2的前n个字符相同,则返回0;若s1大于s2,则返回大于0的值;
        若s1 小于s2,则返回小于0的值。 [
        */
     } while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP)));

    /* wait for response */
    P(over);
    /*
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); 消息队列的接收操作。
函数需要五个参数,msqid,msgp,msgsz,msgtyp 和 msgflg。
msqid 是函数 msgget 的返回值,用于表示对哪一个消息队列进行操作。
msgp 是接收消息的指针,指向消息结构体 msg_st。
msgsz 是接收消息的大小,这里可以看作结构体 msg_st 中数据段的大小。
msgtyp 是接收消息的类别,函数可以接收指定类别的消息,默认为 0,忽视类别,接收队首消息,正值和负值有不同含义,详情查看附录。
msgflg 同函数 msgget 中的 msgflg,这里可以直接使用 0。
函数的返回值为实际接收到的消息字节数。
    */
    bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd1, 0);
    CHECK(bytes_read >= 0);
    printf("%s", buf.buffer);
    printf("--------------------------------------------\n");
    V(snd_dp);
    pthread_exit(NULL);
}

/**
 * @brief Send message to receiver.
 * @return
 */
void *sender2() {
    int mq;
    struct msg_st buf;
    ssize_t bytes_read;

    /* open the mail queue */
    mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
    CHECK((key_t) -1 != mq);

    do {
        P(w_mutex);
        P(snd_dp);
        printf("sender2> ");
        V(rcv_dp);
        fflush(stdout);
        fgets(buf.buffer, BUFSIZ, stdin);
        buf.message_type = snd_to_rcv2;
        /* send the message */
        P(empty);
        CHECK(0 <= msgsnd(mq, (void *) &buf, MAX_SIZE, 0));
        V(full);
        V(w_mutex);
        printf("\n");
     } while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP)));

    /* wait for response */
    //由receiver接收到send1、2两个进程的exit后会分别对over--,即v(over)。让send1、2一直在 wait for response 
    P(over);
    bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd2, 0);
    CHECK(bytes_read >= 0);
    printf("%s", buf.buffer);
    printf("--------------------------------------------\n");
    V(snd_dp);
    pthread_exit(NULL);
}

/**
 * @brief Receive message from sender, and response when sender exit.
 * @return
 */
void *receiver() {
    struct msg_st buf, over1, over2;
    int mq, must_stop = 2;
    //消息队列状态:struct msqid_ds
    struct msqid_ds t;
    //定义两个结束信号 over1,over2
    over1.message_type = 3;
    /*
    strcpy,即string copy(字符串复制)的缩写。
strcpy是一种C语言的标准库函数,strcpy把含有'\0'结束符的字符串复制到另一个地址空间,返回值的类型为char**/
    strcpy(over1.buffer, "over1\n");
    over2.message_type = 4;
    strcpy(over2.buffer, "over2\n");

    /* open the mail queue */
    mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
    CHECK((key_t) -1 != mq);

    do {
        /*
        size_t是无符号整型,至于是long型,还是int型,可能不同的编译器有不同的定义,我这里没有64位的机器,无法验证。这也验证了我们上面所说的ssize_t其实就是一个long*/
        ssize_t bytes_read, bytes_write;
        /* receive the message */
        P(full);
        //接收send进程发来的信息
        bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
        V(empty);
        CHECK(bytes_read >= 0);
        //接收MSG_STOP:exit 通过type找到send,over--
        if (!strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))) {
            if (buf.message_type == 1) {//type=1,指向send1
                bytes_write = msgsnd(mq, (void *) &over1, MAX_SIZE, 0);
                CHECK(bytes_write >= 0);
                V(over);
                must_stop--;
            } else if (buf.message_type == 2) {//type=2,指向send2
                bytes_write = msgsnd(mq, (void *) &over2, MAX_SIZE, 0);
                CHECK(bytes_write >= 0);
                V(over);
                must_stop--;
            }
        } else {
            P(rcv_dp);
            printf("Received%d: %s", buf.message_type, buf.buffer);
            printf("--------------------------------------------\n");
            V(snd_dp);
        }
    } while (must_stop);//将must_stop设为二,
    


    /* cleanup */
    P(snd_dp);
    CHECK(!msgctl(mq, IPC_RMID, &t));
    pthread_exit(NULL);
}

/**
 * Create three thread to test functions
 * @param argc Argument count
 * @param argv Argument vector
 * @return Always 0
 */
int main(int argc, char **argv) {
    pthread_t t1, t2, t3;
    int state;
//sem_init函数是Posix信号量操作中的函数。sem_init() 初始化一个定位在 sem 的匿名信号量。value 参数指定信号量的初始值。
    sem_init(&snd_dp, 1, 1);
    sem_init(&rcv_dp, 1, 0);
    sem_init(&empty, 1, 10);
    sem_init(&full, 1, 0);
    sem_init(&w_mutex, 1, 1);
    sem_init(&over, 1, 0);

    state = pthread_create(&t1, NULL, receiver, NULL);
    CHECK(state == 0);
    state = pthread_create(&t3, NULL, sender1, NULL);
    CHECK(state == 0);
    state = pthread_create(&t2, NULL, sender2, NULL);
    CHECK(state == 0);
 /*
pthread_join()函数,以阻塞的方式等待thread指定的线程结束。当函数返回时,被等待线程的资源被收回。
如果线程已经结束,那么该函数会立即返回。并且thread指定的线程必须是joinable的。
参数 :thread: 线程标识符,即线程ID,标识唯一线程。retval: 用户定义的指针,用来存储被等待线程的返回值。
返回值 : 0代表成功。 。
           */
    pthread_join(t3, NULL);
    pthread_join(t2, NULL);
    pthread_join(t1, NULL);
    return 0;
}

common.h


#ifndef EXP3_3_COMMON_H
#define EXP3_3_COMMON_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <sys/msg.h>
#include <pthread.h>
#include <semaphore.h>

#define QUEUE_ID    10086
#define MAX_SIZE    1024
#define MSG_STOP    "exit"
#define snd_to_rcv1 1
#define snd_to_rcv2 2
#define rcv_to_snd1 3
#define rcv_to_snd2 4
#define CHECK(x) \
    do { \
        if (!(x)) { \
            fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
            perror(#x); \
            exit(-1); \
        } \
    } while (0) \

#define P(x) sem_wait(&x)
#define V(x) sem_post(&x)
//消息队列需要自定义一个消息缓冲区,这里设计一个只包含两个成员变量的结构体作为消息缓冲区:
//其中 message_type 为消息种类,buffer 用来储存消息的数据段,最大可存储 MAX_SIZE 大小,+1 操作为了给结尾留出 \0。
struct msg_st {
    long int message_type;
    char buffer[MAX_SIZE + 1];
};

/* function */
void *sender1();
void *sender2();
void *receiver();

/* global variable */
/*
sender 和 receiver 之间的进程同步比较简单,临界资源为消息队列。是有:

receiver 接收消息,sender 发送消息,receiver 和 sender 存在同步关系,使用 full=0 和 empty=1 进行约束;因为full和empty设为一,send进程发一个receiver就接受一个
sender 之间存在互斥关系,两个发送线程不能同时工作 ,使用 w_mutex=1 进行约束;
receiver 等待发送进程结束后,返回应答,sender 收到应答后进行输出,receiver 和 sender 存在同步关系,使用 over=0 进行约束;
这里对于终端输出也进行了约束,使用 rcv_dp 和 snd_dp 进行约束,可以忽视这部分的处理,这一部分只是为了美观。
*/
sem_t w_mutex, empty, full, over, rcv_dp, snd_dp;

#endif //EXP3_3_COMMON_H

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/msg.h>
#include<semaphore.h>

pthread_t s,r1,r2;
sem_t Write, Read1, Read2;
int flag1=0,flag2=0;
struct msgbuf{
long mtype ;
char mtext[100];
};

struct MyMsqids{
int id1;
int id2;
};
int mymsgrcv( int msqid, struct msgbuf *msgp, int msgsz, long mtype,int msgflg)
{
}
int mymsgsnd( int msqid, struct msgbuf *msgp, int msgsz, int msgflg )
{
}
void *receiver1(void *arg)
{
printf("receiver1 is running\n");
struct msgbuf buf;
int ret;
while(1)
{
sem_wait(&Read1);

    memset(&buf,0,sizeof(buf));
    sem_wait(&Write);
    ret=msgrcv(*(int *)arg,&buf,sizeof(buf.mtext),2,0);
    if (ret==-1)
    {
        perror("msgrcv error");
        sem_post(&Write);
        exit(1);
    }
            else if(!strncmp(buf.mtext,"exit",4))
    {
                    flag1++;
        printf("receiver 1 exit !!\n");
        sleep(1);
                   sem_post(&Write);
        pthread_cancel(r1);
        sleep(1);
    }
    else
    {
        printf("receiver 1 :%s\n",buf.mtext);
        sem_post(&Write);
        sleep(1);
    }
}
}

void *receiver2(void arg)
{
printf("receiver2 is running\n");
struct msgbuf buf;
int ret;
while(1)
{
sem_wait(&Read2);
memset(&buf,0,sizeof(buf));
sem_wait(&Write);
ret=msgrcv((int *)arg,&buf,sizeof(buf.mtext),2,0);
if (ret==-1)
{
printf("msgrcv error");
sem_post(&Write);
exit(1);
}
else if(!strncmp(buf.mtext,"exit",4))
{
flag2++;
printf("receiver 2 exit !!\n");
sleep(1);
sem_post(&Write);
pthread_cancel(r2);
sleep(1);
}
else
{
printf("receiver 2 :%s\n",buf.mtext);
sem_post(&Write);
sleep(1);
}
}
}

void *sender(void *arg)
{
printf("sender is running\n");
char t[100];
struct msgbuf buf;
int choose;
int ret, msqid1, msqid2;
int x=1;
struct MyMsqids *msgids;
msgids = (struct MyMsqids *)arg;
msqid1=(*msgids).id1;
msqid2=(*msgids).id2;
printf("sender-msqid1 : %d\n",msqid1);
printf("sender-msqid2 : %d\n",msqid2);

while(x==1)
{
            if(flag1+flag2==2)
    {
                    printf("both receiver exit, sender exit !!\n");
                    sem_post(&Write);
                    pthread_cancel(s);
                    x=0;
    }
            else if(flag1+flag2!=2)
            {
        memset(&buf,0,sizeof(buf));
        sem_wait(&Write);
        printf("please enter some words to send !!\n");
        scanf("%s",t);
        buf.mtype=2;
 
        strcat(buf.mtext,t);
 
                    choose=rand()%2;
 
                    if(flag2==1)choose=0;
                    else if(flag1==1)choose=1;
 
                    if(choose==0)
                    {
                           ret=msgsnd(msqid1,&buf,sizeof(buf.mtext),0);
                        printf("send to receiver 1 !!\n");
                        sem_post(&Read1);
                    }
                    else if(choose==1)
                    {
                        ret=msgsnd(msqid2,&buf,sizeof(buf.mtext),0);
                        printf("send to receiver 2 !!\n");
                        sem_post(&Read2);
                    }
        if(ret==-1)
        {
            perror("msgsnd error");
        }
        sem_post(&Write);
        sleep(1);
    }
    }
}
int main()
{
printf("main is running\n");
int ret,msqid1,msqid2;
struct MyMsqids msqids;
int x;

sem_init(&Write,0,1);
sem_getvalue(&Write,&x);
if(x==0) sem_post(&Write);
printf("---semaphore_Write: %d\n",x);
sem_init(&Read1,0,0);
sem_getvalue(&Read1,&x);
if(x==1) sem_wait(&Read1);
printf("---semaphore_Read1: %d\n",x);
sem_init(&Read2,0,0);
sem_getvalue(&Read2,&x);
if(x==1) sem_wait(&Read2);
printf("---semaphore_Read2: %d\n",x);
key_t key;
key=100;
msqid1=msgget(key,IPC_CREAT|0666);
key=101;
msqid2=msgget(key,IPC_CREAT|0666);
msqids.id1=msqid1;
msqids.id2=msqid2;
printf("main-msqid1 : %d\n",msqid1);
printf("main-msqid2 : %d\n",msqid2);
if(msqid1==-1)
{
    perror("msgid 1 error");
    printf("---msgid 1 error\n");
    exit(1);
}
if(msqid2==-1)
{
    perror("msgid 2 error");
    printf("---msgid 2 error\n");
    exit(1);
}
 
ret=pthread_create(&s,NULL,sender,(void *)&msqids);
if (ret!=0)
{
    perror("sender create error");
    printf("sender create error\n");
    exit(1);
}
 
ret=pthread_create(&r1,NULL,receiver1,(void *)&msqid1);
if (ret!=0)
{
    perror("receiver 1 create error");
    printf("---receiver 1 create error\n");
    exit(1);
}
 
ret=pthread_create(&r2,NULL,receiver2,(void *)&msqid2);
if (ret!=0)
{
    perror("receiver 2 create error");
            printf("receiver 2 create error\n");
    exit(1);
}
 
pthread_join(s,NULL);
pthread_join(r1,NULL);
pthread_join(r2,NULL);
 
printf("MsgQ Done !!\n");
msgctl(msqid1,IPC_RMID,NULL);
    msgctl(msqid2,IPC_RMID,NULL);
return 0;
}

如有帮助,麻烦采纳一下。
主要函数与说明:
sem_init;sem_getvalue;sem_post;sem_wait;对信号量的操作
pthread_create;pthread_join;pthread_cancel;对线程的操作
msgrcv;msgsnd;msgget;msgctl;对消息队列的操作
strcat;strncmp;对字符串的操作

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/msg.h>
#include<semaphore.h>

pthread_t s,r1,r2;                //创建线程sender1,receiver1,receiver2
sem_t Write, Read1, Read2;            //创建信号量变量
int flag1=0,flag2=0;            //两个receiver结束的标志
struct msgbuf{                //消息缓冲区
    long mtype ;            //消息类型
    char mtext[100];            //消息内容
};

struct MyMsqids{
        int id1;
        int id2;
};

void *receiver1(void *arg)            //arg即传入的msgid,下同
{
       printf("receiver1 is running\n");
    struct msgbuf buf;
    int ret;
    while(1)
    {
                sem_wait(&Read1);
        //消息队列初始化,将buf的sizeof(buf)字节置为0
        memset(&buf,0,sizeof(buf));
        sem_wait(&Write);        //申请写的权限
        ret=msgrcv(*(int *)arg,&buf,sizeof(buf.mtext),2,0);
        if (ret==-1)
        {//返回值为-1表示接收失败
            perror("msgrcv error");
            sem_post(&Write);    //释放写的权限
            exit(1);        //异常退出程序
        }
                else if(!strncmp(buf.mtext,"exit",4))
        {
                        flag1++;
            printf("---receiver 1 exit !!\n");
            sleep(1);
                       sem_post(&Write);    //释放写的权限
            pthread_cancel(r1);
            sleep(1);
        }
        else            //正常传输
        {
            printf("receiver 1 :%s\n",buf.mtext);
            sem_post(&Write);    //释放写的权限
            sleep(1);
        }
    }
}

void *receiver2(void *arg)
{
       printf("receiver2 is running\n");
    struct msgbuf buf;
    int ret;
    while(1)
    {
                sem_wait(&Read2);
        //消息队列初始化,将buf的sizeof(buf)字节置为0
        memset(&buf,0,sizeof(buf));
        sem_wait(&Write);        //申请写的权限
        ret=msgrcv(*(int *)arg,&buf,sizeof(buf.mtext),2,0);
        if (ret==-1)
        {
            printf("msgrcv error");
            sem_post(&Write);    //释放写的权限
            exit(1);        //异常退出程序
        }
        else if(!strncmp(buf.mtext,"exit",4))
        {
                        flag2++;
            printf("---receiver 2 exit !!\n");
            sleep(1);
                        sem_post(&Write);    //释放写的权限
            pthread_cancel(r2);
            sleep(1);
        }
        else            //正常传输
        {
            printf("receiver 2 :%s\n",buf.mtext);
            sem_post(&Write);    //释放写的权限
            sleep(1);
        }
    }
}

void *sender(void *arg)
{
       printf("sender is running\n");
       char t[100];
    struct msgbuf buf;
    int choose;
    int ret, msqid1, msqid2;
    int x=1;
    struct MyMsqids *msgids;
    msgids = (struct MyMsqids *)arg;
    msqid1=(*msgids).id1;
    msqid2=(*msgids).id2;
    printf("sender-msqid1 : %d\n",msqid1);
    printf("sender-msqid2 : %d\n",msqid2);

    while(x==1)
    {
                if(flag1+flag2==2)
        {
                        printf("---both receiver exit, sender exit !!\n");
                        sem_post(&Write);        //释放写的权限
                        pthread_cancel(s);
                        x=0;
        }
                else if(flag1+flag2!=2)
                {
            memset(&buf,0,sizeof(buf));    //初始化buf
            sem_wait(&Write);        //申请写的权限
            printf("---please enter some words to send !!\n");
            scanf("%s",t);        //从键盘读入字符串
            buf.mtype=2;        //定义消息类型

            strcat(buf.mtext,t);

                        choose=rand()%2;

                        if(flag2==1)choose=0;           //如果r2结束,发送给r1
                        else if(flag1==1)choose=1;      //如果r1结束,发送给r2

                        if(choose==0)
                        {
                               ret=msgsnd(msqid1,&buf,sizeof(buf.mtext),0);
                            printf("---send to receiver 1 !!\n");
                            sem_post(&Read1);
                        }
                        else if(choose==1)
                        {
                            ret=msgsnd(msqid2,&buf,sizeof(buf.mtext),0);
                            printf("---send to receiver 2 !!\n");
                            sem_post(&Read2);
                        }
            if(ret==-1)        //如果两个接收者线程都结束了,则循环结束
            {
                perror("msgsnd error");
            }
            sem_post(&Write);        //释放写的权限
            sleep(1);
        }
        }
}
int main()
{
      printf("main is running\n");
    int ret,msqid1,msqid2;
    struct MyMsqids msqids;
        int x;

    sem_init(&Write,0,1);
    sem_getvalue(&Write,&x);
    if(x==0) sem_post(&Write);
        printf("---semaphore_Write: %d\n",x);

    sem_init(&Read1,0,0);
    sem_getvalue(&Read1,&x);
    if(x==1) sem_wait(&Read1);
        printf("---semaphore_Read1: %d\n",x);

    sem_init(&Read2,0,0);
        sem_getvalue(&Read2,&x);
    if(x==1) sem_wait(&Read2);
    printf("---semaphore_Read2: %d\n",x);

    //key必须不一样,消息队列与key相对应,所以我给两个消息队列取了不同的key
    key_t key;
    key=100;
    msqid1=msgget(key,IPC_CREAT|0666);
    key=101;
    msqid2=msgget(key,IPC_CREAT|0666);
    msqids.id1=msqid1;
    msqids.id2=msqid2;

    printf("main-msqid1 : %d\n",msqid1);
    printf("main-msqid2 : %d\n",msqid2);

    if(msqid1==-1)
    {
        perror("msgid 1 error");
        printf("---msgid 1 error\n");
        exit(1);
    }
    if(msqid2==-1)
    {
        perror("msgid 2 error");
        printf("---msgid 2 error\n");
        exit(1);
    }

    //创建线程时以结构体形式传值message queue id 12,成功传入。
    ret=pthread_create(&s,NULL,sender,(void *)&msqids);
    if (ret!=0)
    {
        perror("sender create error");
        printf("---sender create error\n");
        exit(1);
    }

    //创建线程时传值message queue id 1
    ret=pthread_create(&r1,NULL,receiver1,(void *)&msqid1);
    if (ret!=0)
    {
        perror("receiver 1 create error");
        printf("---receiver 1 create error\n");
        exit(1);
    }

    //创建线程时传值message queue id 2
    ret=pthread_create(&r2,NULL,receiver2,(void *)&msqid2);
    if (ret!=0)
    {
        perror("receiver 2 create error");
                printf("---receiver 2 create error\n");
        exit(1);
    }

    pthread_join(s,NULL);
    pthread_join(r1,NULL);
    pthread_join(r2,NULL);

    printf("MsgQ Done !!\n");
    msgctl(msqid1,IPC_RMID,NULL);        //ctl为control
        msgctl(msqid2,IPC_RMID,NULL);
    return 0;
}




#include <unistd.h>
#include <stdio.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>
#define TEXT_SZ 2048

struct shared_use_st
{
    int written;//作为一个标志,非0:表示可读,0表示可写
    char text[TEXT_SZ];//记录写入和读取的文本
};

int main()
{
    sem_t *semr = NULL;
    sem_t *semw = NULL;
    semr = sem_open("mysem_r", O_CREAT | O_RDWR , 0666, 0);
    semw = sem_open("mysem_w", O_CREAT | O_RDWR , 0666, 1);
    int running = 1;//程序是否继续运行的标志
    void *shm = NULL;//分配的共享内存的原始首地址
    struct shared_use_st *shared;//指向shm
    int shmid;//共享内存标识符
    //创建共享内存
    shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666|IPC_CREAT);
    if(shmid == -1)
    {
        fprintf(stderr, "shmget failed\n");
        exit(EXIT_FAILURE);
    }
    //将共享内存连接到当前进程的地址空间
    shm = shmat(shmid, 0, 0);
    if(shm == (void*)-1)
    {
        fprintf(stderr, "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    printf("\nMemory attached at %X\n", (int)shm);
    //设置共享内存
    shared = (struct shared_use_st*)shm;
    shared->written = 0;
    while(running)//读取共享内存中的数据
    {
        sem_wait(semr);
        printf("You wrote: %s", shared->text);
        //读取完数据,设置written使共享内存段可写
        shared->written = 0;
        //输入了over,退出循环(程序)
        if(strncmp(shared->text, "over", 4) == 0)
        running = 0;
        sem_post(semw);
    }
    sleep(1);
    exit(EXIT_SUCCESS);
}



可以参考一下https://blog.csdn.net/liangzhao_jay/article/details/46381149

【HDU_实验三(3):利用Linux的消息队列通信机制实现两个线程间的通信】https://minipro.baidu.com/ma/qrcode/parser?app_key=y1lpwNoOyVpW33XOPd72rzN4aUS43Y3O&launchid=bc0cf14e-e220-4896-b780-3999e4c230c5&path=%2Fpages%2Fblog%2Findex%3FblogId%3D106728412%26_swebFromHost%3Dbaiduboxapp

可以参考这个
https://wenku.baidu.com/view/783eea85b3717fd5360cba1aa8114431b90d8eb4.html