消息队列实现实时线程与非实时线程通讯:xenomai与linux为例进行说明

经过测试发现
xenomai线程与linux线程通过rt_queue通讯,发现linux从queue里面获取不了信息
rt_queue只适用于xenomai与xenomai线程之间的通讯

解决方案:

采用std::queue加读写锁

msg_queue_manager.h

enum enuMsgType{MSGATYPE=1,MSGBTYPE,MSGCTYPE};
typedef struct {
    RT_TASK producer_task;
}pro_info_t;


//typedef struct {
//    RT_QUEUE q;
//}queue_info_t;

struct msgA_data_related_{
    int vel;
    int acc;
    int msg_size;

};

struct msgB_data_related_{
    char info[128];
    float torque;
    float current;
    int msg_size;
};

struct msgC_data_related_{
    char info[128];
};


struct msg_data_related_{
    enuMsgType msgType;
    union
    {
        struct msgA_data_related_ tmsgA;
        struct msgB_data_related_ tmsgB;
        struct msgC_data_related_ tmsgC;
    } ;

};

msg_queue_manager.cpp

static std::queue<msg_data_related_*>  msg_to_socket_queue;
pthread_mutex_t msgqueMutex;

void  queue_init()
{
    int ret =pthread_mutex_init(&msgqueMutex,NULL);
    if(ret != 0)
    {
        printf("msgqueMutex init failed\n");
    }
}

int enqueue(msg_data_related_ *ptr,uint32_t wait_usecond)
{
    msg_data_related_* _pData = new msg_data_related_;
    memcpy(_pData,ptr,sizeof(msg_data_related_));//但是对于结构体中含有指针的方式来说,此命令还是把msg_data_related_中的指针与ptr中指针指向同一块地址,所以尽量不这么干;
    //printf("will enqueue \n");
    if(get_msg_queue_size()>=QUEUE_MEMBER_SIZE)
    {
        usleep(wait_usecond);
        if(get_msg_queue_size()>=QUEUE_MEMBER_SIZE)
        {
            delete _pData;
            return -ENOMEM;
        }
        else
        {
            pthread_mutex_lock(&msgqueMutex);
            msg_to_socket_queue.push(_pData);
            pthread_mutex_unlock(&msgqueMutex);
            return 1;
        }
    }
    else
    {
        pthread_mutex_lock(&msgqueMutex);
        msg_to_socket_queue.push(_pData);
        pthread_mutex_unlock(&msgqueMutex);
        return 1;
    }



}

int dequeue(msg_data_related_ &tmsg_data_related_,uint32_t wait_usecond)
{

    int queue_size=get_msg_queue_size();
    if(queue_size>0)
    {
        pthread_mutex_lock(&msgqueMutex);
        tmsg_data_related_=*(msg_to_socket_queue.front());
        delete  msg_to_socket_queue.front();
        msg_to_socket_queue.pop();
        pthread_mutex_unlock(&msgqueMutex);
        return 1;
    }
    else
    {
        usleep(wait_usecond);
        if(get_msg_queue_size()<= 0)
        {
            return -ENOMEM;
        }
        else
        {
            pthread_mutex_lock(&msgqueMutex);
            tmsg_data_related_=*(msg_to_socket_queue.front());
            delete  msg_to_socket_queue.front();
            msg_to_socket_queue.pop();
            pthread_mutex_unlock(&msgqueMutex);
            return 1;
        }
    }
}

int get_msg_queue_size(void)
{
    int queue_size=0;
    pthread_mutex_lock(&msgqueMutex);
    queue_size=msg_to_socket_queue.size();
    pthread_mutex_unlock(&msgqueMutex);
    return queue_size;
}

 

xenomai实时线程生产者

msga_producer_task.cpp

#include "msga_producer_task.h"


static pro_info_t tcb;

static msg_data_related_ msgDataInterface;

static msgA_data_related_ msgAData;


void msga_producer_task_proc(void *arg);



void * msga_producer_init()
{
    int ret =0;

    //ret = rt_queue_create(&tcb.q, "QUEUE", sizeof(struct msg_data_related_ *), QUEUE_MEMBER_SIZE, Q_FIFO);

    rt_printf("step1 \n");
    /* Creating cyclic xenomai task */
    ret = rt_task_create(&tcb.producer_task,"producerA_task",0,90,T_FPU);
    if (ret)
    {
        fprintf (stderr, "producerA_task create failed!!!!\n");
        goto release_master;
    }

    /* Starting cyclic task */
    fprintf(stdout, "starting producerA_task \n");
    ret = rt_task_start(&tcb.producer_task, &msga_producer_task_proc, NULL);
    if (ret)
    {
        fprintf (stderr, "producerA_task start failed!!!!\n");
        goto release_master;
    }
    return &tcb;
release_master:
    return NULL;
}

void msga_producer_task_proc(void *arg)
{
    int messg_index=0;
    int run=1;

    /* Set Xenomai task execution mode */
    int   ret = rt_task_set_mode(0, T_CONFORMING, NULL);
    if (ret)
    {
        printf("error while rt_task_set_mode, code %d\n",ret);
        return;
    }

    rt_task_set_periodic(NULL,TM_NOW,SM_FRAME_PERIOD_NS);
    /* Start pdo exchange loop until user stop */
    while (run)
    {
        if(rt_task_wait_period(NULL)!=0)
        {
            printf("sched_rms_period error_in_consumer_task_proc!\n");

        }

        msgAData.vel=messg_index;
        msgAData.acc=messg_index*2;
        msgAData.msg_size=msgAValiableSize+sizeof(int);

        msgDataInterface.msgType=MSGATYPE;

        msgDataInterface.tmsgA=msgAData;

        struct msg_data_related_ *ptr = &msgDataInterface;
        int ret =enqueue(ptr,100);
        if(ret >0)
        {
            std::cout<<"a thread enqueue success"<<std::endl;
        }
        else
        {
             std::cout<<"a thread enqueue failed"<<std::endl;
        }

        //        ret = rt_queue_write((RT_QUEUE *)&arg, (void *)&ptr, sizeof(struct msg_data_related_ *), Q_NORMAL);

        //        if(-ENOMEM==ret )
        //        {
        //            printf("queue no space \n");
        //            int ret_of_write=1;
        //            while(ret_of_write)
        //            {
        //                usleep(10);
        //                ret = rt_queue_write((RT_QUEUE *)&arg,&ptr , sizeof(struct msg_data_related_ *), Q_NORMAL);
        //                if(ret>=0)
        //                {
        //                    ret_of_write=0;
        //                }
        //            }
        //        }
        //        //rt_printf("producer_task_proc executing \n");

        messg_index++;
    }

    //rt_queue_delete(&tcb.q);
}

linux 非实时线程生产者msgc_producer_task.cpp

#include "msgc_producer_task.h"
#include <unistd.h>
#include <iostream>
pthread_t test_rtqueue_nonRTthread_id;
void* msgc_producer_task_proc(void* );
void* test_rt_queue_with_nonRTthread_fun(void* );

void  msgc_producer_init()
{
    int ret = pthread_create(&test_rtqueue_nonRTthread_id, NULL, msgc_producer_task_proc, NULL);//开始S形速度规划

    if (ret)
    {
        perror("test_rt_queue_thread_fun_thread error!");
        return ;
    }
    return;

}



void* msgc_producer_task_proc(void* )
{
    static msgC_data_related_ tmsgC_data_related_;
    static msg_data_related_ tmsg_data_related_interface;
    while(1)
    {
        usleep(10000);
        strcpy(tmsgC_data_related_.info," MSG FROM C! \n");
        tmsg_data_related_interface.msgType=MSGCTYPE;
        tmsg_data_related_interface.tmsgC=tmsgC_data_related_;
        struct msg_data_related_ *ptr = &tmsg_data_related_interface;
        int ret=enqueue(ptr,100);
        if(ret >0)
        {
            std::cout<<"c thread enqueue success"<<std::endl;
        }
        else
        {
//             std::cout<<"c thread enqueue failed"<<std::endl;
        }
    }

}

 

 

消息队列消费者consumer_task.cpp:t

ypedef struct {
    int       exit;

    RT_TASK   consumer_task;
}con_info_t;

void consumer_task_proc(void *arg);

void *consumer_init()
{
    int ret = 0;
    con_info_t *tcb = (con_info_t *)calloc(1,sizeof(con_info_t));//calloc在动态分配完内存后,自动初始化该内存空间为零,而malloc不初始化,里边数据是随机的垃圾数据。

    //tcb->pRcvQ = (RT_QUEUE*)get_producer_queue(arg);
    rt_printf("step1 \n");
    /* Creating cyclic xenomai task */
    ret = rt_task_create(&tcb->consumer_task,"consumer_task",0,70,T_FPU);
    if (ret)
    {
        fprintf (stderr, "producer_task create failed!!!!\n");
        goto release_master;
    }

    /* Starting cyclic task */
    fprintf(stdout, "starting consumer_task \n");
    ret = rt_task_start(&tcb->consumer_task, &consumer_task_proc, tcb);
    if (ret)
    {
        fprintf (stderr, "consumer_task start failed!!!!\n");
        goto release_master;
    }
    return tcb;
release_master:
    return NULL;
}

void consumer_deinit(void *arg)
{
    con_info_t *tcb = (con_info_t *)arg;
    if(tcb == NULL) return ;
    tcb->exit = 1;
    while(tcb->exit == 1)
    {
        usleep(1000);
    }
    rt_task_join(&tcb->consumer_task);
    rt_task_delete(&tcb->consumer_task);
    free(tcb);
}

void consumer_task_proc(void *arg)
{
    int count=0;
    con_info_t *tcb = (con_info_t *)arg;

    tcb->exit = 0;

    /* Set Xenomai task execution mode */
    int ret;
    ret = rt_task_set_mode(0, T_CONFORMING, NULL);
    if (ret)
    {
        rt_printf("error while rt_task_set_mode, code %d\n",ret);
        return;
    }

    rt_task_set_periodic(NULL,TM_NOW,SM_FRAME_PERIOD_NS);
    /* Start pdo exchange loop until user stop */
    while (tcb->exit == 0)
    {
        if(rt_task_wait_period(NULL)!=0)
        {
            rt_printf("sched_rms_period error_in_consumer_task_proc!\n");

        }
        struct msg_data_related_ rmsg;

        ret =dequeue(rmsg,100);

//        ret = rt_queue_read(tcb->pRcvQ, &msg, sizeof(struct msg_data_related_ *), NULL);
        if(ret>0)
        {

            switch (rmsg.msgType)
            {
            case  MSGATYPE:
            {
                printf("consumer get msg->vel is:%d\n",rmsg.tmsgA.vel);
                printf("consumer get msg->acc is:%d\n",rmsg.tmsgA.acc);
                break;
            }
            case MSGBTYPE:
            {
                printf("consumer get msg info is:%s\n",rmsg.tmsgB.info);
                break;

            }
            case MSGCTYPE:
            {
                printf("consumer get msg info is:%s\n",rmsg.tmsgC.info);
                break;

            }

            }

        }
        if(count%100==0)
        {
            rt_printf("consumer_task_proc exectuting \n");

        }
        count++;
    }
    tcb->exit = 0;
}

测试结果:

消息队列实现实时线程与非实时线程通讯:xenomai与linux为例进行说明