消息队列实现实时线程与非实时线程通讯:xenomai与linux为例进行说明
经过测试发现
xenomai线程与linux线程通过rt_queue通讯,发现linux从queue里面获取不了信息
rt_queue只适用于xenomai与xenomai线程之间的通讯
解决方案:
采用std::queue加读写锁
msg_queue_manager.h
enum enuMsgType{MSGATYPE=1,MSGBTYPE,MSGCTYPE};
typedef struct {
RT_TASK producer_task;
}pro_info_t;
//typedef struct {
// RT_QUEUE q;
//}queue_info_t;
struct msgA_data_related_{
int vel;
int acc;
int msg_size;
};
struct msgB_data_related_{
char info[128];
float torque;
float current;
int msg_size;
};
struct msgC_data_related_{
char info[128];
};
struct msg_data_related_{
enuMsgType msgType;
union
{
struct msgA_data_related_ tmsgA;
struct msgB_data_related_ tmsgB;
struct msgC_data_related_ tmsgC;
} ;
};
msg_queue_manager.cpp
static std::queue<msg_data_related_*> msg_to_socket_queue;
pthread_mutex_t msgqueMutex;
void queue_init()
{
int ret =pthread_mutex_init(&msgqueMutex,NULL);
if(ret != 0)
{
printf("msgqueMutex init failed\n");
}
}
int enqueue(msg_data_related_ *ptr,uint32_t wait_usecond)
{
msg_data_related_* _pData = new msg_data_related_;
memcpy(_pData,ptr,sizeof(msg_data_related_));//但是对于结构体中含有指针的方式来说,此命令还是把msg_data_related_中的指针与ptr中指针指向同一块地址,所以尽量不这么干;
//printf("will enqueue \n");
if(get_msg_queue_size()>=QUEUE_MEMBER_SIZE)
{
usleep(wait_usecond);
if(get_msg_queue_size()>=QUEUE_MEMBER_SIZE)
{
delete _pData;
return -ENOMEM;
}
else
{
pthread_mutex_lock(&msgqueMutex);
msg_to_socket_queue.push(_pData);
pthread_mutex_unlock(&msgqueMutex);
return 1;
}
}
else
{
pthread_mutex_lock(&msgqueMutex);
msg_to_socket_queue.push(_pData);
pthread_mutex_unlock(&msgqueMutex);
return 1;
}
}
int dequeue(msg_data_related_ &tmsg_data_related_,uint32_t wait_usecond)
{
int queue_size=get_msg_queue_size();
if(queue_size>0)
{
pthread_mutex_lock(&msgqueMutex);
tmsg_data_related_=*(msg_to_socket_queue.front());
delete msg_to_socket_queue.front();
msg_to_socket_queue.pop();
pthread_mutex_unlock(&msgqueMutex);
return 1;
}
else
{
usleep(wait_usecond);
if(get_msg_queue_size()<= 0)
{
return -ENOMEM;
}
else
{
pthread_mutex_lock(&msgqueMutex);
tmsg_data_related_=*(msg_to_socket_queue.front());
delete msg_to_socket_queue.front();
msg_to_socket_queue.pop();
pthread_mutex_unlock(&msgqueMutex);
return 1;
}
}
}
int get_msg_queue_size(void)
{
int queue_size=0;
pthread_mutex_lock(&msgqueMutex);
queue_size=msg_to_socket_queue.size();
pthread_mutex_unlock(&msgqueMutex);
return queue_size;
}
xenomai实时线程生产者
msga_producer_task.cpp
#include "msga_producer_task.h"
static pro_info_t tcb;
static msg_data_related_ msgDataInterface;
static msgA_data_related_ msgAData;
void msga_producer_task_proc(void *arg);
void * msga_producer_init()
{
int ret =0;
//ret = rt_queue_create(&tcb.q, "QUEUE", sizeof(struct msg_data_related_ *), QUEUE_MEMBER_SIZE, Q_FIFO);
rt_printf("step1 \n");
/* Creating cyclic xenomai task */
ret = rt_task_create(&tcb.producer_task,"producerA_task",0,90,T_FPU);
if (ret)
{
fprintf (stderr, "producerA_task create failed!!!!\n");
goto release_master;
}
/* Starting cyclic task */
fprintf(stdout, "starting producerA_task \n");
ret = rt_task_start(&tcb.producer_task, &msga_producer_task_proc, NULL);
if (ret)
{
fprintf (stderr, "producerA_task start failed!!!!\n");
goto release_master;
}
return &tcb;
release_master:
return NULL;
}
void msga_producer_task_proc(void *arg)
{
int messg_index=0;
int run=1;
/* Set Xenomai task execution mode */
int ret = rt_task_set_mode(0, T_CONFORMING, NULL);
if (ret)
{
printf("error while rt_task_set_mode, code %d\n",ret);
return;
}
rt_task_set_periodic(NULL,TM_NOW,SM_FRAME_PERIOD_NS);
/* Start pdo exchange loop until user stop */
while (run)
{
if(rt_task_wait_period(NULL)!=0)
{
printf("sched_rms_period error_in_consumer_task_proc!\n");
}
msgAData.vel=messg_index;
msgAData.acc=messg_index*2;
msgAData.msg_size=msgAValiableSize+sizeof(int);
msgDataInterface.msgType=MSGATYPE;
msgDataInterface.tmsgA=msgAData;
struct msg_data_related_ *ptr = &msgDataInterface;
int ret =enqueue(ptr,100);
if(ret >0)
{
std::cout<<"a thread enqueue success"<<std::endl;
}
else
{
std::cout<<"a thread enqueue failed"<<std::endl;
}
// ret = rt_queue_write((RT_QUEUE *)&arg, (void *)&ptr, sizeof(struct msg_data_related_ *), Q_NORMAL);
// if(-ENOMEM==ret )
// {
// printf("queue no space \n");
// int ret_of_write=1;
// while(ret_of_write)
// {
// usleep(10);
// ret = rt_queue_write((RT_QUEUE *)&arg,&ptr , sizeof(struct msg_data_related_ *), Q_NORMAL);
// if(ret>=0)
// {
// ret_of_write=0;
// }
// }
// }
// //rt_printf("producer_task_proc executing \n");
messg_index++;
}
//rt_queue_delete(&tcb.q);
}
linux 非实时线程生产者msgc_producer_task.cpp
#include "msgc_producer_task.h"
#include <unistd.h>
#include <iostream>
pthread_t test_rtqueue_nonRTthread_id;
void* msgc_producer_task_proc(void* );
void* test_rt_queue_with_nonRTthread_fun(void* );
void msgc_producer_init()
{
int ret = pthread_create(&test_rtqueue_nonRTthread_id, NULL, msgc_producer_task_proc, NULL);//开始S形速度规划
if (ret)
{
perror("test_rt_queue_thread_fun_thread error!");
return ;
}
return;
}
void* msgc_producer_task_proc(void* )
{
static msgC_data_related_ tmsgC_data_related_;
static msg_data_related_ tmsg_data_related_interface;
while(1)
{
usleep(10000);
strcpy(tmsgC_data_related_.info," MSG FROM C! \n");
tmsg_data_related_interface.msgType=MSGCTYPE;
tmsg_data_related_interface.tmsgC=tmsgC_data_related_;
struct msg_data_related_ *ptr = &tmsg_data_related_interface;
int ret=enqueue(ptr,100);
if(ret >0)
{
std::cout<<"c thread enqueue success"<<std::endl;
}
else
{
// std::cout<<"c thread enqueue failed"<<std::endl;
}
}
}
消息队列消费者consumer_task.cpp:t
ypedef struct {
int exit;
RT_TASK consumer_task;
}con_info_t;
void consumer_task_proc(void *arg);
void *consumer_init()
{
int ret = 0;
con_info_t *tcb = (con_info_t *)calloc(1,sizeof(con_info_t));//calloc在动态分配完内存后,自动初始化该内存空间为零,而malloc不初始化,里边数据是随机的垃圾数据。
//tcb->pRcvQ = (RT_QUEUE*)get_producer_queue(arg);
rt_printf("step1 \n");
/* Creating cyclic xenomai task */
ret = rt_task_create(&tcb->consumer_task,"consumer_task",0,70,T_FPU);
if (ret)
{
fprintf (stderr, "producer_task create failed!!!!\n");
goto release_master;
}
/* Starting cyclic task */
fprintf(stdout, "starting consumer_task \n");
ret = rt_task_start(&tcb->consumer_task, &consumer_task_proc, tcb);
if (ret)
{
fprintf (stderr, "consumer_task start failed!!!!\n");
goto release_master;
}
return tcb;
release_master:
return NULL;
}
void consumer_deinit(void *arg)
{
con_info_t *tcb = (con_info_t *)arg;
if(tcb == NULL) return ;
tcb->exit = 1;
while(tcb->exit == 1)
{
usleep(1000);
}
rt_task_join(&tcb->consumer_task);
rt_task_delete(&tcb->consumer_task);
free(tcb);
}
void consumer_task_proc(void *arg)
{
int count=0;
con_info_t *tcb = (con_info_t *)arg;
tcb->exit = 0;
/* Set Xenomai task execution mode */
int ret;
ret = rt_task_set_mode(0, T_CONFORMING, NULL);
if (ret)
{
rt_printf("error while rt_task_set_mode, code %d\n",ret);
return;
}
rt_task_set_periodic(NULL,TM_NOW,SM_FRAME_PERIOD_NS);
/* Start pdo exchange loop until user stop */
while (tcb->exit == 0)
{
if(rt_task_wait_period(NULL)!=0)
{
rt_printf("sched_rms_period error_in_consumer_task_proc!\n");
}
struct msg_data_related_ rmsg;
ret =dequeue(rmsg,100);
// ret = rt_queue_read(tcb->pRcvQ, &msg, sizeof(struct msg_data_related_ *), NULL);
if(ret>0)
{
switch (rmsg.msgType)
{
case MSGATYPE:
{
printf("consumer get msg->vel is:%d\n",rmsg.tmsgA.vel);
printf("consumer get msg->acc is:%d\n",rmsg.tmsgA.acc);
break;
}
case MSGBTYPE:
{
printf("consumer get msg info is:%s\n",rmsg.tmsgB.info);
break;
}
case MSGCTYPE:
{
printf("consumer get msg info is:%s\n",rmsg.tmsgC.info);
break;
}
}
}
if(count%100==0)
{
rt_printf("consumer_task_proc exectuting \n");
}
count++;
}
tcb->exit = 0;
}
测试结果: