问题描述

在通过Azure Function消费Event Hub中的消息时,我们从Function 的 Trigger Details 日志中,可以获得当前Funciton中处理的消息是哪一个分区(PartitionID), 偏移量Offset,序列号SequenceNumber 等信息。

 

但是在Event的属性中,只发现由PartitionKey存在(该值由消息生产时复制,可以变动,可以为空),根据PartitionKey,无法在应用自己的日志中准确判断分区号(Partition ID).

所以,在Function代码中,应该如何获取到Event的分区号信息呢?

 

问题解答

在Event的属性中,没有直接获取 PartitionId 的属性,但是有 PartitionContext 属性。这个属性为一个JSON格式对象,它的内容中包含了当前的PartitionID,所以可以通过 event.metadata["PartitionContext"]["PartitionId"]  来完成获取分区号的目标。

实例代码如下:

import logging
import azure.functions as func

def main(events: func.EventHubEvent):
    for event in events:
        logging.info(f'  Function triggered to process a message: {event.get_body().decode()}')
        logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
        logging.info(f'  SequenceNumber = {event.sequence_number}')
        logging.info(f'  Offset = {event.offset}')
        logging.info(f'  PartitionId = {event.metadata["PartitionContext"]["PartitionId"]}')
        #logging.info(f'  PartitionId = {event.metadata["PartitionContext"].PartitionId}')

        # Metadata
        for key in event.metadata:
            logging.info(f'Metadata: {key} = {event.metadata[key]}')

执行结果如图:

 

参考资料

适用于 Azure Functions 的 Azure 事件中心触发器:https://docs.azure.cn/zh-cn/azure-functions/functions-bindings-event-hubs-trigger?tabs=in-process%2Cfunctionsv2%2Cextensionv5&pivots=programming-language-python