azure队列存储 - 将消息放回队列
Im构建队列处理webjob以从队列中读取消息并使用数据从blob存储中检索报告uri,然后将其作为电子邮件中的链接发送。我的流程工作得很好,但我需要在特定的时间窗口内发送电子邮件。azure队列存储 - 将消息放回队列
我有另一个进程(webjob)从sql后端检索这些数据并将其放入电子邮件队列中。
此webjob每30分钟运行一次,并且只获取当前时间和当前时间的2小时窗口内的数据。所以我知道排队中的任何事情都是在今天和现在的2小时之内。我如何进一步缩小这个范围以便从队列中读取数据,并且如果“发送电子邮件”时间设置为19:00,当前时间为18:00,则可以将该消息放回队列中读取再后来,下一次应该更接近19:00,然后我可以处理它并通过电子邮件发送出去。
时间不一定要被发现,所以即使它在19点30分钟内(或者它的任何发送时间)也可以被处理。所以我有效地从队列中取出一个对象,检查它的时间,并且如果它不在其分配的“电子邮件输出”时间的30分钟内,我将它放回队列中并且再次处理
**在我的webjob ,我有一个'Functions'类,它包含一个方法'ProcessQueueMessage',每当一个消息被放入队列时它就会被触发。
// This function will get triggered/executed when a new message is written
// on an Azure Queue called queue.
public async Task ProcessQueueMessage([QueueTrigger("%reportgenerator%")] Data.Dto.Schedule.ScheduleDto schedule)
{
var reports = await this._scheduledReportGenerationService.GenerateScheduledEmails(schedule.ID);
}
的ScheduleDto类将有一代人的时间属性,我可以读这一点,它比较当前时间和仅在处理它的我指定的“时间窗口”内。我将如何停止队列消息在这里被删除,以便我可以重新处理它?
当您将消息添加到队列中时,只需设置initialVisibilityDelay,以便消息在最小处理时间之前才能看到。
CloudQueue queue = queueClient.GetQueueReference(queueName);
var msg = new CloudQueueMessage("Hello World!");
TimeSpan timeSpanDelay = GetEarliestProcessTime();
await queue.AddMessageAsync(msg, null, timeSpanDelay, null, null);
因此,有是与Azure存储队列几件事情,是要帮助你与这样的场景:
关于把消息发回队列中,你没有做什么特别的。这是由存储队列提供的功能。当您将邮件出队(Azure存储术语中的GET Message
)时,邮件在一段时间内变得不可见,如果未被出队邮件的进程删除,邮件将再次变为可见,并可被另一个进程拾取。
因此,当您将邮件出列时,请检查时间,如果时间不正确,您什么都不做。但是,请确保一旦处理完消息,您将删除该消息,否则将会再次提取消息。
您可以做的另一件事是当您将邮件出列并且发现它不是处理该邮件的正确时间时,您更新该邮件并将其可见性超时属性设置为使该邮件再次可见的值接近处理时间。例如,您在18:00出列消息,发现此消息需要在19:00处理。在这种情况下,您将更新邮件并将其可见性超时设置为50分钟(或30分钟以上的值为Web作业的计划)。这将确保当你的webjob在18:30运行时,这个消息不会被web作业拾取,因为这个消息只会在18:50显示。
您可以在这里阅读有关更新消息的更多信息:https://docs.microsoft.com/en-us/rest/api/storageservices/update-message以及关于在此处列出消息的信息:https://docs.microsoft.com/en-us/rest/api/storageservices/get-messages。
更新
我完全忘了这是在WebJob所以什么都不做实际上将删除该邮件。我猜你有两种选择(重复评论中提到的那种):
- 抛出异常而不是无所事事。这将确保WebJob处理器不会删除该消息。我没有自己尝试过,但是您也可以更新消息并将其可见性超时设置为更接近WebJob本身所需时间的值(然后引发异常)。不过这是一种反模式。
- 您在队列中添加一条新消息,并将其初始可见性超时值设置为接近所需时间的值(这也在另一个答案中涵盖),并且删除此消息。
我还没有实际使用GetMessage读取队列消息,在我的WebJob中,我有host.RunAndBlock,然后在我的函数类中我有一个方法(ProcessQueueMessage)这就是无论何时将新消息写入到队列中,都会触发,这是'默认'webjob行为。这不是最佳做法吗? – proteus
阅读和删除(以及放入毒队)所有似乎都是自动发生的,没有任何我的干预,我只是在处理消息时才检测到它 – proteus
@proteus:你说得对,WebJobs下的消息自动删除时函数正常完成。 Gaurav表示相反,也许他熟悉较低级别的Queue Storage API。 – camelCase
当EN-队列Azure存储队列中的项目,你可以添加额外的细节,这将导致该项目被隐藏了配置的持续时间的项目。如果您的批处理作业只能每两小时运行一次,但您希望延迟发送更精细时间控制的电子邮件,那么我建议两小时连续批处理作业可以使用此“initialVisibilityDelay”功能。
这是另一个SO问题,描述了API。
您使用的是Azure存储队列还是服务总线队列? –
另外,从您的问题来看,如果您使用2个webjobs或者只有1个webjob,则不清楚。一个小问题:请花一点时间,格式化你的问题,并在段落中分解。阅读一个大的大段落是一个痛苦:)。 –
我有2个webjobs,一个从后台获取'今天2小时内'的数据,创建指定的报告并将其放入blob存储中。另一个读取这些数据,并通过电子邮件发送blob(pdf)的URI。我使用Azure存储队列 – proteus