@

Pulsar Functions(轻量级计算框架)

基础定义

Function instance(函数实例)是函数执行框架的核心元素,由以下元素组成:

  • 消费来自不同输入主题的消息的消费者的集合。
  • 调用函数的执行程序。
  • 将函数的结果发送到输出主题的生产者。

函数实例的内部工作流

  • 一个函数可以有多个实例,每个实例执行一个函数的副本。可以在配置文件中指定实例数。
  • 函数实例中的使用者使用FQFN作为订阅者名,以基于订阅类型在多个实例之间实现负载平衡。订阅类型可以在函数级别指定。
  • 每个函数都有一个单独的FQFN状态存储。可指定一个状态接口,以便在BookKeeper中持久化中间结果。其他用户可以查询函数的状态并提取这些结果。

工作流程

Function worker 是一个逻辑组件,用于在Pulsar Functions的集群模式部署中监视、编排和执行单个函数。每个函数实例都可以作为线程或进程执行,具体取决于所选的配置。如果Kubernetes集群可用,则可以在Kubernetes中以StatefulSets的形式生成函数。

Function worker的内部架构和工作流如下

image-20230303180946526

  • 用户向REST服务器发送请求以执行函数实例。
  • REST服务器响应请求并将请求传递给功能元数据管理器。
  • 函数元数据管理器将请求更新写入函数元数据主题。并跟踪所有与元数据相关的消息,并使用函数元数据主题持久化函数的状态更新。
  • 函数元数据管理器从函数元数据主题读取更新,并触发调度管理器计算分配。
  • 日程管理器将作业更新写入作业主题。
  • 函数运行时管理器侦听分配主题,读取分配更新,并更新其内部状态,该状态包含所有工作人员的所有分配的全局视图;如果更新更改了工作对象上的赋值,函数运行时管理器将通过启动或停止函数实例的执行来具体化新的赋值。
  • 会员管理要求协调主题选举一个领导工作者,所有工作人员都订阅故障转移订阅中的协调主题,但活动的工作人员成为领导者并执行分配,从而保证该主题只有一个活动消费者。
  • 成员管理器从协调主题读取更新。

函数运行时

函数实例是在运行时内调用的,许多实例可以并行运行。Pulsar支持三种不同成本和隔离保证的函数运行时类型,以最大限度地提高部署灵活性。可以根据需要使用其中之一来运行函数。

  • 线程运行时:每个实例都作为一个线程运行。由于线程模式的代码是用Java编写的,所以它只适用于Java实例。当函数以线程模式运行时,它与函数工作者运行在同一个Java虚拟机(JVM)上。
  • 进程运行时:每个实例都作为一个进程运行。当函数以进程模式运行时,它运行在函数工作者运行的同一台机器上。
  • Kubernetes运行时:函数由worker以Kubernetes StatefulSet的形式提交,每个函数实例作为pod运行。Pulsar支持在启动函数时向Kubernetes StatefulSets和服务添加标签,这有助于选择目标Kubernetes对象。

处理保证和订阅类型

Pulsar提供了三种不同的消息传递语义,可以将它们应用于一个函数。根据ack时间节点确定不同的传递语义实现。

  • At-most-once:最多一次,发送到函数的每个消息都将尽最大努力处理。不能保证消息是否会被处理。当选择此语义时,autoAck配置必须设置为true,否则启动将失败(autoAck配置将在将来的版本中弃用)。Ack时间节点:函数处理之前。
  • At-least-once:最少一次,发送到函数的每个消息都可以被处理多次(以防处理失败或重新交付)。如果创建函数时没有指定——processing- guaranteed标志,则该函数提供至少一次交付保证。Ack时间节点:发送消息到输出后。
  • Effectively-once:精确一次,送到函数的每条消息都可以被处理多次,但它只有一个输出。重复的消息将被忽略。有效地在至少一次处理和有保证的服务器端重复数据删除的基础上实现一次。这意味着一个状态更新可以发生两次,但是相同的状态更新只应用一次,另一个重复的状态更新在服务器端被丢弃。Ack时间节点:发送消息到输出后。
  • Manual:当选择这个语义时,框架不会执行任何ack操作,需要在函数中调用context.getCurrentRecord().ack()方法来手动执行ack操作。Ack时间节点:在函数方法中自定义。

提示

  • 默认情况下,Pulsar函数提供至少一次交付保证。如果创建函数时没有为——processing guaranteed标志提供值,则该函数提供至少一次保证。
  • 排他订阅类型在Pulsar函数中不可用,原因:
    • 如果只有一个实例,exclusive等于故障转移。
    • 如果有多个实例,exclusive可能会在函数重新启动时崩溃并重新启动。在这种情况下,排他不等于故障转移。因为当主消费者断开连接时,所有未确认的和后续的消息都被传递到下一个。
  • 要将订阅类型从shared更改为key_shared,可以在pulse -admin中使用- retain-key- ordered选项。

可以在创建函数时设置函数的处理保证。如下面的命令创建了一个应用了“精确一次”保证的函数。

bin/pulsar-admin functions create \
  --name my-effectively-once-function \
  --processing-guarantees EFFECTIVELY_ONCE \

可以使用update命令更改应用于函数的处理保证。

bin/pulsar-admin functions update \
  --processing-guarantees ATMOST_ONCE \

窗口函数

定义

目前,窗口函数仅在Java中可用,并且不支持MANUAL和effective -once delivery语义。窗口函数是跨数据窗口(即事件流的有限子集)执行计算的函数。如下图所示,流被划分为“桶”,其中可以应用函数。

image-20230306161555132

函数的数据窗口定义包含两个策略:

  • 清除策略:控制在窗口中收集的数据量。
  • 触发策略:控制何时触发一个函数并执行该函数以根据清除策略处理窗口中收集的所有数据。

触发策略和驱逐策略都由时间或计数驱动。

提示

  • 同时支持处理时间和事件时间。
  • 处理时间是根据函数实例构建和处理窗口时的壁时间定义的。窗口完整性的判断很简单,您不必担心数据到达混乱。

窗口类型

滚动窗口

滚动窗口将元素分配给具有指定时间长度或计数的窗口。滚动窗口的驱逐策略总是基于窗口已满。因此只需要指定触发器策略,基于计数或基于时间。在具有基于计数的触发策略的滚动窗口中,如以下示例所示,触发策略被设置为2。当窗口中有两个项目时,无论时间如何,都会触发并执行每个函数。

image-20230306162713067

相反,如下面的示例所示,滚动窗口的窗口长度为10秒,这意味着当10秒的时间间隔过去时,函数将被触发,而不管窗口中有多少事件。

image-20230306162723916

滑动窗口

滑动窗口方法通过设置清除策略来限制保留用于处理的数据量,并使用滑动间隔设置触发器策略来定义固定的窗口长度。如果滑动间隔小于窗口长度,则存在数据重叠,这意味着同时落入相邻窗口的数据将被多次用于计算。如下面的示例所示,窗口长度为2秒,这意味着任何超过2秒的数据都将被清除,不会在计算中使用。滑动间隔被配置为1秒,这意味着该函数每秒执行一次,以处理整个窗口长度内的数据。

image-20230306162804742

函数配置

在独立的Pulsar中创建和验证函数(包括有状态函数和窗口函数)的分步说明和示例

  • 在conf/broker.conf文件(对于Pulsar standalone, conf/standalone.conf)中,将functionsWorkerEnabled设置为true。vim conf/broker.conf
functionsWorkerEnabled=true

如果是standalone Pulsar 在conf/standalone.conf文件中增加上面的字段。

  • 重启broker
bin/pulsar-daemon stop broker
bin/pulsar-daemon start broker
  • 检查Pulsar Function集群
bin/pulsar-admin functions-worker get-cluster

image-20230306171929532

函数示例

使用官方的函数示例演示,查看根目录下examples文件夹

image-20230306172608701

  • 创建租户和命名空间
bin/pulsar-admin tenants create my-test
bin/pulsar-admin namespaces create my-test/my-namespace
bin/pulsar-admin namespaces list my-test
  • 修改 vim examples/example-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://my-test/my-namespace/test_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/test_result"

output: "persistent://my-test/my-namespace/test_result"
autoAck: true
parallelism: 1
  • 创建函数
bin/pulsar-admin functions create \
   --function-config-file examples/example-function-config.yaml \
   --jar examples/api-examples.jar

image-20230306173307386

  • 查看函数的配置
bin/pulsar-admin functions get \
   --tenant my-test \
   --namespace my-namespace \
   --name example

image-20230307094316237

  • 查看状态
bin/pulsar-admin functions status \
   --tenant my-test \
   --namespace my-namespace \
   --name example

image-20230307094334789

  • 消费消息
bin/pulsar-client consume persistent://my-test/my-namespace/test_result -s 'my-subscription' -p Earliest -n 0
  • 生产消息
bin/pulsar-client produce persistent://my-test/my-namespace/test_src --messages "test-messages-`date`" -n 10
  • 查看消费者的输出

image-20230307094031460

有状态函数示例

  • 在BookKeeper中启用streamStorage服务。目前服务使用的是NAR包,需要在conf/bookkeeper.conf文件中进行配置。vim conf/bookkeeper.conf
### Grpc Server ###
#
## the grpc server port to listen on. default is 4181
storageserver.grpc.port=4181
#
#### Dlog Settings for table service ###
#
##### Replication Settings
dlog.bkcEnsembleSize=3
dlog.bkcWriteQuorumSize=2
dlog.bkcAckQuorumSize=2
#
#### Storage ###
#
## local storage directories for storing table ranges data (e.g. rocksdb sst files)
storage.range.store.dirs=data/bookkeeper/ranges
#
## whether the storage server capable of serving readonly tables. default is false.
storage.serve.readonly.tables=false
#
## the cluster controller schedule interval, in milliseconds. default is 30 seconds.
storage.cluster.controller.schedule.interval.ms=30000
  • 创建vim examples/example-stateful-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["persistent://my-test/my-namespace/wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1
  • 创建函数
bin/pulsar-admin functions create \
   --function-config-file examples/example-stateful-function-config.yaml \
   --jar examples/api-examples.jar
  • 查询带有itxs键的函数的状态表。该操作监视与itxs相关的更改。
bin/pulsar-admin functions querystate \
   --tenant my-test \
   --namespace my-namespace \
   --name word_count -k itxs -w
  • 消费消息
bin/pulsar-client consume persistent://my-test/my-namespace/wordcount_result -s 'my-subscription' -p Earliest -n 0

bin/pulsar-client consume test_wordcount_dest -s 'my-subscription' -p Earliest -n 0
  • 生产消息
bin/pulsar-client produce persistent://my-test/my-namespace/wordcount_src --messages "itxs" -n 10

窗口函数示例

  • 创建vim examples/example-stateful-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["persistent://my-test/my-namespace/window_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/window_result"

output: "persistent://my-test/my-namespace/window_result"
autoAck: true
parallelism: 1
windowConfig:
  windowLengthCount: 10
  slidingIntervalCount: 5
  • 创建函数
bin/pulsar-admin functions create \
   --function-config-file examples/example-window-function-config.yaml \
   --jar examples/api-examples.jar
  • 消费消息
bin/pulsar-client consume -s test-sub -n 0 persistent://my-test/my-namespace/window_result
  • 生产消息
bin/pulsar-client produce -m "3" -n 10 persistent://my-test/my-namespace/window_src
  • 查看消费窗口输出

image-20230307143014857

自定义函数开发

定义

Pulsar 函数支持Java、Python和Go等语言,如果是Java语言则支持下面三类接口:

  • 使用原生语言接口:不需要特定于Pulsar的库或特殊依赖(只需要JDK核心库);适合于不需要访问上下文的函数。
  • 使用Pulsar函数SDK:特定于脉冲星的库,提供了语言本机接口中无法提供的一系列功能,例如状态管理或用户配置;适用于需要访问上下文的函数。
  • 扩展Pulsar函数SDK:对特定于pulse的库的扩展,在Java中提供初始化和关闭接口。适用于需要初始化和释放外部资源的函数。

原生语言接口示例

  • 新建Maven工程,Pom文件内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itxs</groupId>
    <artifactId>pulsar-demo</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>sn.itxs.pulsar.function.NativeFunctionDemo</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建NativeFunctionDemo.java
package sn.itxs.pulsar.function;

import java.util.function.Function;

public class NativeFunctionDemo implements Function<String, String> {
    @Override
    public String apply(String s) {
        return String.format("hahaha,native implement %s!", s);
    }
}
  • 打包生成pulsar-demo-1.0.jar,上传到安装Pulsar服务器上的,这里就放在pulsar根目录下的examples文件夹,后续的操作就和前面函数示例一样

  • 创建函数描述文件,vim examples/native-example-function-config.yaml

tenant: "my-test"
namespace: "my-namespace"
name: "native-example"
className: "sn.itxs.pulsar.function.NativeFunctionDemo"
inputs: ["persistent://my-test/my-namespace/native_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/native_result"

output: "persistent://my-test/my-namespace/native_result"
autoAck: true
parallelism: 1
  • 创建函数
bin/pulsar-admin functions create \
   --function-config-file examples/native-example-function-config.yaml \
   --jar examples/pulsar-demo-1.0.jar
  • 消费消息
bin/pulsar-client consume persistent://my-test/my-namespace/native_result -s 'my-subscription' -p Earliest -n 0
  • 生产消息
bin/pulsar-client produce persistent://my-test/my-namespace/native_src --messages "actual pulsar" -n 10
  • 查看消费者的输出

image-20230307164653697

Pulsar函数SDK示例

  • 由于依赖Pulsar函数SDK,因此JDK需要选择17,在前面的工程添加Pom依赖
    <properties>
        <pulsar.version>2.11.0</pulsar.version>
    </properties>
        
    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>${pulsar.version}</version>
        </dependency>
    </dependencies>
  • 打包指定sn.itxs.pulsar.function.SdkFunctionDemo

  • 创建SdkFunctionDemo.java

package sn.itxs.pulsar.function;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class SdkFunctionDemo implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        return String.format("hahaha,pulsar sdk implement %s!", input);
    }
}
  • 打包生成pulsar-demo-1.0.jar,上传到安装Pulsar服务器上的,这里还是覆盖pulsar根目录下的examples文件夹文件,其他和前面一样

  • 创建函数描述文件,vim examples/sdk-example-function-config.yaml

tenant: "my-test"
namespace: "my-namespace"
name: "sdk-example"
className: "sn.itxs.pulsar.function.SdkFunctionDemo"
inputs: ["persistent://my-test/my-namespace/sdk_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/sdk_result"

output: "persistent://my-test/my-namespace/sdk_result"
autoAck: true
parallelism: 1
  • 创建函数
bin/pulsar-admin functions create \
   --function-config-file examples/sdk-example-function-config.yaml \
   --jar examples/pulsar-demo-1.0.jar
  • 消费消息
bin/pulsar-client consume persistent://my-test/my-namespace/sdk_result -s 'my-subscription' -p Earliest -n 0
  • 生产消息
bin/pulsar-client produce persistent://my-test/my-namespace/sdk_src --messages "actual pulsar" -n 10
  • 查看消费者的输出

image-20230307170134637

  • 本人博客网站IT小神 www.itxiaoshen.com