3.消息队列:msgget/msgctl/msgsnd/msgrcv
1.消息队列
[1]消息队列是有类型的
与管道不同,消息队列中的数据是有[类型编号]的===>因此进程在通信的
时,接收方可以接收指定[类型编号]的数据。
管道 | 消息队列 |
---|---|
流管道 | 有边界 |
流管道 | 可以后进/先出 |
[2]消息队列与命名管道的比较
[相同点]:与命名管道一样,消息队列进行通信的进程可以是不相关的进程,同时它们都是通
过发送和接收的方式来传递数据的。而且它们对每个数据都有一个最大长度的限制。
[不同点]:与命名管道相比,消息队列的优势在于
1.消息队列可以独立于发送和接收进程而存在,从而消除了在同步命名管道的打开和关闭时可能产生的困难
2.同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法
3.接收程序可以通过消息类型有选择地接收数据,而不是像命名管道中那样,只能默认地接收
2.消息大小的3大限制
限制 | 查看命令 |
---|---|
最大消息长度限制 | cat /proc/sys/kernel/msgmax |
消息队列总的字节数 | cat /proc/sys/kernel/msgmnb |
消息条目数 | cat /proc/sys/kernel/msgmni |
注:Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。
3.消息队列API
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg); //创建 打开
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); //向消息队列中增添一条消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); //从消息队列中取走一条消息
[1] int msgget(key_t key, int msgflg);
参数
key:0x1110,IPC_PRIVATE(宏定义--0)
msgflg:权限标识(如,0666),可以 | 上IPC_CREAT / IPC_EXCL
返回值:
成功:返回非0值,即msg标识号
失败:返回-1,errno被设置为合适的值
-----------------------
IPC_PRIVATE特殊情况:
当key==IPC_PRIVATE时,msgget创建的msg的标识符是0x00000000,
该消息队列,只能在自己家族中使用;不在没有血缘关系的进程间使用。
//key_t最后是0,使用IPC_PRIVATE以后,IPC_CREAT|IPC_EXCL不会检查到已存在的消息队列,没有实质性的作用
//每次重新调用以后都会创建新的消息队列,msg_id都不一样
//意味着即使msg_id传送给其他进程,其他进程也不能用(可以通过fork--血缘关系使用)
--------- 消息队列 -----------
键 msqid 拥有者 权限 已用字节数 消息
0x00000000 98304 gjw 666 0 0
0x00000000 131073 gjw 666 0 0
0x00000000 163842 gjw 666 0 0
0x00000000 196611 gjw 666 0 0
-----------------------
[1]msgget(0x1100,0666);
if msg存在,则打开
if msg不存在,则errno=ENOENT
[2]msgget(0x1100,0666|IPC_CREAT);
if msg存在,则打开
if msg不存在,则创建
[3]msgget(0x1100,0666|IPC_CREAT|IPC_EXCL);
if msg存在,则errno=EEXIST
if msg不存在,则创建
[2] int msgctl(int msqid, int cmd, struct msqid_ds *buf);
返回值:成功返回0,失败返回-1
参数:
msqid: 由msgget函数返回的消息队列标识码
cmd:是将要采取的动作,(有三个可取值)
IPC_STAT:获取消息队列的状态属性-->msgctl(msg_id,IPC_STAT,&buf);
IPC_SET:设置消息队列的状态属性-->msgctl(msg_id,IPC_SET,&buf);
IPC_RMID:ipcrm msg <msgID> 删除消息队列-->msgctl(msg_id,IPC_RMID,&buf);
---------------------------------
int main()
{
int msg_id;
int ret = 0;
msg_id = msgget(0x1236,0666);
if(-1 == msg_id)
{
perror("msgget");
return -1;
}
struct msqid_ds buf;
memset(&buf,0,sizeof(struct msqid_ds));
//[1] IPC_STAT [获取]消息队列的状态
ret = msgctl(msg_id,IPC_STAT,&buf);
if(-1 == ret)
{
perror("msgctl");
return -1;
}
printf("perm:%o\n",buf.msg_perm.mode);//权限
printf("bytes:%d\n",(int)buf.__msg_cbytes);//消息队列中的字节数
printf("number of msg:%d\n",(int)buf.msg_qnum);//消息条目数
printf("Input any key to delete the msg...\n");getchar();
//[2] IPC_SET [修改]消息队列
buf.msg_perm.mode = 0644;
ret = msgctl(msg_id,IPC_SET,&buf);
if(-1 == ret)
{
perror("msgctl");
return -1;
}
printf("Input any key to delete the msg...\n");getchar();
//[3] IPC_RMID从内核删除消息队列
ret = msgctl(msg_id,IPC_RMID,&buf);
if(-1 == ret)
{
perror("msgctl");
return -1;
}
printf("delete successfully\n");
return 0;
}
[重点] 消息结构体
[1]消息结构体一般都是自定义的
[2]消息结构体中的第一个属性必须是long mtype(用于标识消息类型)
[3]后面的内容可以自定义
struct msgbuf {
long mtype; //消息类型的标识 %ld
//........
//........
//........
}
例如:
typedef struct mesgbuf {
long mtype; /* message type, must be > 0 */
int age;
char name[1024]; /* message data */
int score;
}msgbuf_t;
使用方法:
struct mesgbuf msg_buf;
msgsnd(msg_id,&msg_buf ,sizeof(msg_buf ),IPC_NOWAIT);
msgrcv(msg_id,&msg_buf ,sizeof(msg_buf ),type,IPC_NOWAIT);
printf("recv message: type=%ld, age=%d, score=%d, name=%s\n",buf.mtype,buf.age,buf.score,buf.name);
[3] int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
返回值:成功返回0;失败返回-1
参数:
msqid:消息队列标识码
msgp:指向准备发送的数据
msgsz:是msgp指向的消息长度(这个长度不含保存消息类型的那个long int长整型),
仅仅是char mtext的字符串的长度。
msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情
msgflg=IPC_NOWAIT表示队列满不等待,返回EAGAIN错误
[4] ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数:
msgp:接收消息,存放的位置
msgsz:接收消息的长度
msgtyp:消息类型
msgflg
C/S模型的多进程之间消息队列通信
一.程序设计思想:
[1]客户端发给服务器消息类型总是1;服务器接收的消息类型总是1
[2]服务器端回给客户端的type是对方进程号
[3]客户端只接收(消息类型=自己进程号)的消息
二.程序隐藏的问题分析:
因为客户端和服务器既向消息队列中写数据,又从消息队列中读数据。说明
操作该消息队列是无序的,无序的操作很容易让消息队列“空了”或“满了”。
举例:假设客户端和服务器都想向已经满的缓冲区中发数据:
1.客户端不能向消息队列中发数据
2.服务器不能向消息队列中发数据
===>产生死锁现象。
总结:产生死锁的根本原因是C/S都[双向]操控消息队列
解决方案:服务器端可以用[多进程]的方式实现消息队列[单一方向的回流]
<服务器代码>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#define MSG_BUF 256
typedef struct mesgbuf
{
long mtype;
long pid; //message type
char mtext[MSG_BUF];
}msgbuf_t;
#define MSGTOSRV 1
int main()
{
int ret = 0;
int msg_id = 0;
msgbuf_t my_msg;
key_t key;
long type;
key = ftok("./",'a');
msg_id = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建
if(msg_id == -1)
{
if(errno == EEXIST)
{
//printf("EEXIST\n");
key = ftok("./",'a');
msg_id = msgget(key,IPC_CREAT|0666);
}
else
{
perror("msget");
exit(-1);
}
}
while(1)
{
memset(&my_msg,0,sizeof(my_msg));
//取消息的时候要指定type
ret = msgrcv(msg_id,&my_msg,sizeof(my_msg),MSGTOSRV,0);//接收( type=MSGTOSRV )消息
if(ret == -1)
{
perror("msgsnd");
exit(-1);
}
printf("recv message:type=%ld, cli_pid=%ld, recvmsg=%s\n",my_msg.mtype,my_msg.pid,my_msg.mtext);
//填充类型
my_msg.mtype = my_msg.pid;
//发消息之前要封装type
ret = msgsnd(msg_id,&my_msg,sizeof(my_msg),IPC_NOWAIT);//发送
if(ret == -1)
{
perror("msgsnd");
exit(-1);
}
printf("send message:type=%ld, cli_pid=%ld, sendmsg=%s\n",my_msg.mtype,my_msg.pid,my_msg.mtext);
}
return 0;
}
<客户端代码>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#define MSG_BUF 256
typedef struct mesgbuf
{
long mtype;
long pid; //message type
char mtext[MSG_BUF];
}msgbuf_t;
#define MSGTOSRV 1
int main(int argc,char* argv[])
{
if(argc<2)
{
fprintf(stderr,"Usage:%s <input string> \n",argv[0]);
exit(EXIT_FAILURE);
}
int ret = 0;
int msg_id = 0;
msgbuf_t my_msg;
key_t key;
long type = getpid();
key = ftok("./",'a');
msg_id = msgget(key,IPC_CREAT|IPC_EXCL|0666);//创建
if(msg_id == -1)
{
if(errno == EEXIST)
{
printf("EEXIST\n");
key = ftok("./",'a');
msg_id = msgget(key,IPC_CREAT|0666);
}
else
{
perror("msget");
exit(-1);
}
}
memset(&my_msg,0,sizeof(my_msg));
//while(1)
{
my_msg.mtype=MSGTOSRV;
my_msg.pid=getpid();
strcpy(my_msg.mtext,argv[1]);
ret = msgsnd(msg_id,&my_msg,sizeof(my_msg),IPC_NOWAIT);//发送
if(ret == -1)
{
perror("msgsnd");
exit(-1);
}
printf("please touch any to recvmessage...\n");
getchar();
memset(&my_msg,0,sizeof(my_msg));
ret = msgrcv(msg_id,&my_msg,sizeof(my_msg),type,0);//接收/获取消息
if(ret == -1)
{
perror("msgsnd");
exit(-1);
}
printf("recv message:type=%ld, pid=%ld, recvmsg=%s\n",my_msg.mtype,getpid(),my_msg.mtext);
}
return 0;
}
**解决方案:服务器端可以用[多进程]的方式实现消息队列[单一方向]的回流 **
按照一个方向去读,按照一个方向去写,就可以避免这种情况
1.每次有新的客户端,都向同一个[总的]消息队列中发数据
2.服务器检查消息类型是1,再检查消息中的cli_pid,如果pid不存在,说明新的客户端来了,
服务器就fork一个子进程,让子进程单独建立一个PRIVATE的消息队列。
该PRIVATE的消息队列干啥呢?服务器的子进程发报文,客户端收报文。
客户端怎么收报文呢?客户端根据自己的pid收报文。
====>这样你会发现,整个消息队列将会变得十分"有序"