当前位置:网站首页 > 云计算与后端部署 > 正文

消息队列详解

前言

1:在没有os时中断与任务,任务与任务之间进行消息传递依靠全局变量,但是如果在使用了OS的应用中使用全局变量会涉及到“资源管理的问题”。所以,在FreeRTOS中使用其提供的消息队列进行消息的传递
2:当队列中的消息是空时,读取消息的任务将被阻塞,用户还可以指定阻塞的任务时间xTicksToWait,在这段时间中,如果队列为空,该任务将保持阻塞状态以等待队列数据有效。当队列中有新消息时,被阻塞的任务会被唤醒并处理新消息;当等待的时间超过了指定的阻塞时间,即使队列中尚无有效数据,任务也会自动从阻塞态转为就绪态。消息队列是一种异步的通信方式。
3:发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,发送的位置是消息队列队头而非队尾,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理
在这里插入图片描述
4:读然后发现队列没有数据,此时有三种选择:

  • 直接离开,不阻塞
  • 等一个设定的时间(设定多少个tick),这段时间处于阻塞的状态。如果成功获得了任务就变成了就绪态,此时优先级如果比当前运行的任务优先级还高就会马上运行。如果一直没获得,到了时间也会自动退出阻塞态,并且返回一个错误代码
  • 死等,阻塞到等到为止
  • 任务向消息队列发送消息:系统会根据用户指定的阻塞超时时间将任务阻塞,在指定的超时时间内如果还不能完成入队操作,发送消息的任务者中断服务程序会收到一个错误码 errQUEUE_FULL,然后解除阻塞状态;
  • 中断向消息队列发送消息:当然,只有在任务中发送消息才允许进行阻塞状态,而在中断中发送消息不允许带有阻塞机制的,需要调用在中断中发送消息的 API 函数接口,因为发送消息的上下文环境是在中断中,不允许有阻塞的情况

6:消息队列可以应用于发送不定长消息的场合,包括任务与任务间的消息交换,队列是FreeRTOS 主要的任务间通讯方式,可以在任务与任务间、中断和任务间传送信息,发送到队列的消息是通过拷贝方式实现的,这意味着队列存储的数据是原数据,而不是原数据的引用。
7:无论是发送或者是接收消息都是以拷贝的方式进行,如果消息过于庞大,可以将消息的地址作为消息进行发送、接收。队列创建的队列项字节长度可以是好10个字节什么的,不一定就是uint8_t个字节什么的
8:队列是具有自己独立权限的内核对象,并不属于任何任务。所有任务都可以向同一队列写入和读出。一个队列由多任务或中断写入是经常的事,但由多个任务读出倒是用的比较少。
9:入队出队函数功能介绍
在这里插入图片描述

xQUEUE结构体

typedef unsigned long UBaseType_t; 
typedef struct QueueDefinition { 
    int8_t *pcHead; /*< 指向队列存储区开始地址 */ int8_t *pcTail; /*< 指向队列存储区最后一个字节 */ int8_t *pcWriteTo; /*< 指向存储区中下一个空闲区域 */ union /* Use of a union is an exception to the coding standard to ensure two mutually exclusive structure members don't appear simultaneously (wasting RAM). */ { 
    int8_t *pcReadFrom; /*< 当用作队列的时候指向最后一个出队的队列项首地址 */ UBaseType_t uxRecursiveCallCount;/*< 当用作递归互斥量的时候用来记录递归互斥量被调用的次数 */ } u; List_t xTasksWaitingToSend; /*< 等待发送任务列表,那些因为队列满导致入队失败而进入阻塞态的任务就会挂到此列表上。 */ List_t xTasksWaitingToReceive; /*< 等待接收任务列表,那些因为队列空导致出队失败而进入阻塞态的任务就会挂到此列表上 */ volatile UBaseType_t uxMessagesWaiting;/*< 队列中当前队列项数量,也就是消息数 */ UBaseType_t uxLength; /*< 创建队列时指定的队列长度,也就是队列中最大允许的队列项(消息)数量 */ UBaseType_t uxItemSize; /*< 创建队列时指定的每个队列项(消息)最大长度,单位字节 */ volatile int8_t cRxLock; /*< 当队列上锁以后用来统计发送到队列中的队列项数量,也就是入队的队列项数量,当队列没有上锁的话此字段为 queueUNLOCKED */ volatile int8_t cTxLock; /*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */ #if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) ) uint8_t ucStaticallyAllocated; /*< 如果使用静态存储的话此字段设置为 pdTURE*/ #endif #if ( configUSE_QUEUE_SETS == 1 ) //队列集相关宏 struct QueueDefinition *pxQueueSetContainer; #endif #if ( configUSE_TRACE_FACILITY == 1 )//跟踪调试相关宏 UBaseType_t uxQueueNumber; uint8_t ucQueueType; #endif } xQUEUE; /* The old xQUEUE name is maintained above then typedefed to the new Queue_t name below to enable the use of older kernel aware debuggers. */ typedef xQUEUE Queue_t; 

xQueueCreate() 消息队列创建函数

1:xQueueCreate()用于创建一个新的队列并返回可用于访问这个队列的队列句柄。队列句柄其实就是一个指向队列数据结构类型的指针
2:这是动态创建,静态的一般不用
3:参数:

  • uxQueueLength:队列能够存储的最大消息单元数目,即队列长度,最大可包含多少个消息
  • uxItemSize:队列中每个消息的大小,以字节为单位

4、 返回值:如果创建成功则返回一个队列句柄,用于访问创建的队列。如果创建不成功则返回
NULL,可能原因是创建队列需要的 RAM 无法分配成功
使用实例

#define QUEUE_LEN 4 /* 队列的长度,最大可包含多少个消息 */ #define QUEUE_SIZE 4 /* 队列中每个消息大小(字节) */ BaseType_t xReturn = pdPASS;/* 定义一个创建信息返回值,默认为pdPASS */ /* 创建Test_Queue */ Test_Queue = xQueueCreate((UBaseType_t ) QUEUE_LEN,/* 消息队列的长度 */ (UBaseType_t ) QUEUE_SIZE);/* 消息的大小 */ if(NULL != Test_Queue) printf("创建Test_Queue消息队列成功!\r\n"); 
#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )//因为是动态创建,所以需要把这个宏置1 #define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) ) #endif 

正式创建讲解创建实际上用的都是这个通用的
xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType )
1:判断队列需要多大内存xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
2:为队列申请内存pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );消息队列控制块大小+消息存储空间大小
3:然后调用prvInitialiseNewQueue正式的创建

#define queueQUEUE_TYPE_BASE ( ( uint8_t ) 0U ) #if( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType ) { 
    Queue_t *pxNewQueue;//指向新申请的队列的地址 size_t xQueueSizeInBytes;//记录队列需要多大的内存 uint8_t *pucQueueStorage;//存储空间的地址,就是队列跳过队列结构体的地址 configASSERT( uxQueueLength > ( UBaseType_t ) 0 ); if( uxItemSize == ( UBaseType_t ) 0 ) { 
    /* 如果单个消息空间大小为0,xQueueSizeInBytes就设置为0(这样就不需要申请内存了)这个对应的就是信号量 */ xQueueSizeInBytes = ( size_t ) 0; } else { 
    /* 如果单个消息空间大小不为0,分配存储消息的空间,空间的大小为队列长度*单个消息大小 */ xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */ } /* 向系统申请内存,内存大小为消息队列控制块大小+消息存储空间大小,这段内存空间是保持连续的 */ pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); if( pxNewQueue != NULL ) { 
    /* 计算出消息存储空间的起始地址 */ pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t ); #if( configSUPPORT_STATIC_ALLOCATION == 1 ) { 
    /* Queues can be created either statically or dynamically, so note this task was created dynamically in case it is later deleted. */ pxNewQueue->ucStaticallyAllocated = pdFALSE; } #endif /* configSUPPORT_STATIC_ALLOCATION */ prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );//调用prvInitialiseNewQueue将消息队列进行初始化 } return pxNewQueue; } #endif /* configSUPPORT_STATIC_ALLOCATION */ 

调用 prvInitialiseNewQueue()函数将消息队列进行初始化。其实xQueueGenericCreate()主要是用于分配消息队列内存的
消息队列长度 uxQueueLength
单个消息大小 uxItemSize
存储消息起始地址 pucQueueStorage
消息队列类型 ucQueueType
消息队列控制块 pxNewQueue
一:prvInitialiseNewQueue正式创建,初始化新队列pxNewQueue
1:初始化队列结构体相关成员变量
2:调用xQueueGenericReset( pxNewQueue, pdTRUE );//重置消息队列,在消息队列初始化的时候,需要重置一下相关参数

static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue ) { 
    /* Remove compiler warnings about unused parameters should configUSE_TRACE_FACILITY not be set to 1. */ ( void ) ucQueueType; if( uxItemSize == ( UBaseType_t ) 0 ) { 
    /* 没有为消息存储分配内存,但是 pcHead 指针不能设置为 NULL,因为队列用作互斥量时,pcHead 要设置成 NULL。这里只是将 pcHead 指向一个已知的区域 */ pxNewQueue->pcHead = ( int8_t * ) pxNewQueue; } else { 
    /* 设置 pcHead 指向存储消息的起始地址 */ pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage; } /* 初始化消息队列控制块的其他成员,消息队列的长度与消息的大小。*/ pxNewQueue->uxLength = uxQueueLength; pxNewQueue->uxItemSize = uxItemSize; ( void ) xQueueGenericReset( pxNewQueue, pdTRUE );//重置消息队列,在消息队列初始化的时候,需要重置一下相关参数 #if ( configUSE_TRACE_FACILITY == 1 ) { 
    pxNewQueue->ucQueueType = ucQueueType; } #endif /* configUSE_TRACE_FACILITY */ #if( configUSE_QUEUE_SETS == 1 ) { 
    pxNewQueue->pxQueueSetContainer = NULL; } #endif /* configUSE_QUEUE_SETS */ traceQUEUE_CREATE( pxNewQueue ); } 

xQueueGenericReset队列复位函数
1:第二个参数表示,你要复位的队列是新创建的队列还是老的队列
2:判断新复位的队列是否为新创建的队列,如果不是的话要处理相应的列表xTasksWaitingToSend
3:如果要复位的是新创建的队列,初始化列表xTasksWaitingToSend 和xTasksWaitingToReceive 这两个列表

BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue ) { 
    Queue_t * const pxQueue = ( Queue_t * ) xQueue; configASSERT( pxQueue ); taskENTER_CRITICAL();//进入临界区 { 
    pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );//pcTail 指向存储消息内存空间的结束地址 pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;//当前消息队列中的消息个数uxMessagesWaiting 为0 pxQueue->pcWriteTo = pxQueue->pcHead;//pcWriteTo 指向队列消息存储区下一个可用消息空间,因为是重置消息队列,就指向消息队列的第一个消息空间,也就是 pcHead 指向的空间 pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue
到此这篇消息队列详解的文章就介绍到这了,更多相关内容请继续浏览下面的相关推荐文章,希望大家都能在编程的领域有一番成就!

版权声明


相关文章:

  • CAS 服务端部署_cas-server2024-11-25 13:18:06
  • 原神私人服务器部署教程(Win)_原神pc服务器2024-11-25 13:18:06
  • 服务端部署PyTorch模型-方法(二):NVIDIA‘s Triton_服务器配置pytorch环境2024-11-25 13:18:06
  • nuxtjs服务端部署流程_nuxt部署服务器2024-11-25 13:18:06
  • PaddleOCR-API服务端部署-windows版本_paddleocr部署使用教程2024-11-25 13:18:06
  • SVN(subversion )服务端和客户端的下载安装使用2024-11-25 13:18:06
  • macbook的 safari浏览器退出后又自动启动,强制退出以后也重启,重启电脑也重启2024-11-25 13:18:06
  • JKS后缀结尾tomcat的证书转换成key和crt结尾的nginx证书2024-11-25 13:18:06
  • Centos 查看服务器磁盘,内存,端口等命令2024-11-25 13:18:06
  • http之浏览器同源政策——端口、域名、协议,三者同及不跨域;解决跨域的方案有哪些2024-11-25 13:18:06
  • 全屏图片