User Guide

Wormhole 系统中有三类用户角色 Admin,User,App。本章介绍 User 类型用户的使用规范。

普通用户登陆后可以访问 Admin 授权的Project,具有管理 Stream,Flow,Job 的权限,可以使用Project 下所属的 Namespace,UDF 等。

Stream 管理

Spark Stream管理

类型

理论上Stream 可以处理所有类型的数据,为提升性能,针对 Hdfs 数据备份和分流功能作了相应优化,所以将 Stream 分为三种类型。

资源配置

消费kafka中无key数据

如果绑定的topic中数据没有key,则可设定是否启用默认的kafka key,在specail config中设置{“useDefaultKey”:true},会将注册到该stream的第一个flow的source namespace作为这个topic中数据的key,该stream中同source namespace的flow就可以消费这个topic。如果绑定的topic中数据有key,则按照数据的key进行处理。

renameKeyConfig中originKey为原始的key,renameKey为要消费的namespace(0.6.3之后版本支持)

{
  "useDefaultKey":true/false      //使用flow的sourcenamespace作为key,这个配置项和下面的配置项选择一个就可以
  "renameKeyConfig": [
    {
      "topicName": "topicl",
      "originKey": "topicl_ums", //如果topic中没有原始key这个配置项可以省略
      "renameKey": "data_increment_data.kafka.kafka01022.topicl.ums.*.*.*"
    }
  ]
}

Topic 绑定

Stream 消费哪些 Topic 根据 Flow 的启停自动绑定和注销。

Flow 启动时检查其 Source Namespace 对应 Topic 是否已绑定在 Stream 上,若已绑定不会重复注册,根据 Stream 目前已消费到的 Offset 继续执行。否则将该 Topic 绑定到 Stream 上,初始 Offset 设置为该 Topic 的最新 Offset。

Flow 停止时检查其 Source Namespace 对应 Topic 是否对应该 Stream 上其他 Flow,若无则注销该 Stream 与 Topic 的绑定关系。

UDF 绑定

启动 Stream 时可以选择需要加载的 UDF,也可以取消已选择的 UDF。Stream 启动后可以点击生效按钮,选择需要增加的 UDF,Stream 不需要重启,可动态加载新 UDF。

启动

注意事项:

生效

Stream 运行过程中支持 UDF 热部署,支持动态调整 Topic 消费的 Offset 和 Rate。

注意事项:

若需要调整 Stream configs 项的配置,需要重启 Stream。

状态转换

Stream 状态转换图如下,其中 refresh 代表 Refresh 按钮,start 代表启动按钮,stop 代表停止按钮。

类型

Flink中支持的Stream类型只有default,支持异构sink,包括Kafka/RDBS/Elasticsearch/Hbase/Phoenix/Cassandra/MongoDB系统中,数据类型支持处理UMS数据类型和用户自定义UMS_Extension类型

资源配置

启动

Flow 管理

需指定 Stream,Source Namespace,Protocol,Sink Namespace,配置数据转换逻辑等。

选择 Stream

Spark Stream

Protocol

Source Namespace

Sink Namespace

Sink Namespace 对应的物理表需要提前创建,表的 Schema 中是否需要创建 UMS 系统字段 ums_id_(long 类型), ums_ts_(datetime 类型), ums_active_(int 类型),根据以下策略判断须增加的字段:

Table Keys

设置sink表的table keys,用于幂等的实现。如果有多个,用逗号隔开。

Result Fields

配置最后输出的字段名称,All 代表输出全部字段,点击 Selected 可配置需要输出的字段名称,多个用逗号隔开

Sink Config

Sink Config 项配置与所选系统类型相关,点击配置按钮后页面上方有对应系统的配置项例子

配置数据插入方式(只增加or增删改)

其中 “mutation_type” 的值有 “i” 和 “iud”,代表向 Sink 表中插数据时使用只增原则或增删改原则。如果为 “iud”,源数据中须有 ums_id_(long 类型), ums_ts_(datetime 类型), ums_op_(string 类型) 字段,Sink 表中都须有 ums_id_(long 类型), ums_ts_(datetime 类型), ums_active_(int 类型) 字段。若不配置此项,默认为 “iud”

注意事项:

分表幂等

针对关系型数据库,为了减小ums_id、ums_op与ums_ts字段对业务系统的侵入性,可单独将这三个字段和table keys单独建立一个表,原业务表保持不变。假设ums_id、ums_op、ums_ts和table key组成的表名为umsdb,那么分表幂等的配置为:

{"mutation_type":"split_table_idu","db.function_table":"umsdb"}

ums_uid_字段输出

默认配置中ums_uid_字段会被过滤掉,不会写入sink端,通过配置sink_uid可将ums_uid_字段写入目标库

{"sink_uid":true}

sink分批读/写

Sink时支持分批读和分批写,批次大小配置项为batch_size

{"batch_size":"10000"}

sink kudu表名带特殊字符处理

impala建的kudu表中表名可能带”.”等特殊字符,如果在namespace中将”.”加入,就会影响wormhole对namespace分割处理,可以sink config中配置连接符解决(0.6.3及之后版本支持)。例如kudu的表名为impala::dbname.tablename,namespace中database可配置为impala::dbname,table可配置为tablename,sinkconfig中配置:{“table_connect_character”:”.”}即可

sink hbase设置版本字段进行幂等

Sink hbase可以设置列版本号字段,进行幂等:{“hbase.version.column”:”ums_id_”},如果不配置,则按照wormhole原来的方式进行幂等(0.6.3及之后版本支持)

sink es相关配置

index时间后缀配置,配置项为index_extend_config,例如{“index_extend_config”:”_yyyy-MM-dd”} 访问header配置,配置项header_config,例如{“header_config”:{“content-type”:”application/json”}}(0.7.0之后版本支持)

配置安全认证的sink kafka

在用户需要向启用了kerberos安全认证的kafka集群Sink数据时,需要在sink config里面做如下配置:{“kerberos”:true},默认情况下,是向未启用kerberos认证的kafka集群Sink数据(0.6.1及之后版本)

sink clickhouse

wormhole sink clickhouse支持分布式和本地写两种,如果instance是distributed节点,可以值sink config中配置{“ch.engine”:”distributed”}。如果是merge tree节点连接地址用逗号分隔即可,sink config中配置{“ch.engine”:”mergetree”}。按照merger tree 方式写入,现在wh支持的分发方式是xxHash64

sink http

{“method_type”:”put”,”url_params”:”${projectId}/streams/${streamId}/jobs/${jobId}/basic”,”transform_type”:”form/json”}

配置介绍:

${}会替换成流上对应的字段的值

{“method_type”:”put/post/get”}

{“transform_type”:”form/json”},请求格式form,其他的为json格式

header配置需要添加在instance config中

sink rocketMQ

{“producerGroup”:”test_producer”,”format”:”flattenJson”,”preserveSystemField”:true}

配置介绍:

producerGroup:group组

format:ums/flattenJson

preserveSystemField:是否保留系统字段

用户自定义sink

Wormhole 0.6.1及之后版本支持用户自定义sink

1、编写自定义sink class

(1)在wormhole项目中建立customer sink class流程

(2)在用户项目中建立customer sink class流程

edp.wormholewormhole-sinks0.7.0 edp.wormholewormhole-ums_1.3-flinkx_1.5.10.7.0

2、配置flow

配置flow在Sink Config中配置customer sink class的完整的名字

{“other_sinks_config”:{“customer_sink_class_fullname”:”customer sink full class name”}}

Transformation

Spark Flow Transformation

配置数据转换逻辑,支持 SQL 和自定义 Class 方式,可以配置多条转换逻辑,调整逻辑顺序

自定义 Class
SQL
Lookup SQL

Lookup SQL 可以关联流下其他系统数据,如 RDBS/Hbase/Redis/Elasticsearch 等,规则如下。

若 Source Namespace 为 kafka.edp_kafka.udftest.udftable,Lookup Table 为 RDBMS 系统,如 mysql.er_mysql.eurus_test 数据库下的 eurus_user 表,Left Join 关联字段是 id,name,且从 Lookup 表中选择的字段 id,name 与主流上kafka.edp_kafka.udftest.udftable 中的字段重名,0.6.0及以上版本支持两种类型的Lookup SQL语句如下:

(1)主流上的字段名用${}标注(0.6.0及以上版本支持),推荐使用该种方式,例如

select id as id1,name as name1,address,age from eurus_user where (id,name) in (${id},${name});

(2)主流上的字段名用namespace.filedName进行标注,例如

select id as id1, name as name1, address, age from eurus_user where (id, name) in (kafka.edp_kafka.udftest.udftable.id, kafka.edp_kafka.udftest.udftable.name);

(3)关系型数据库支持不关联流上字段进行join(0.7.0及之后版本支持),例如

select id as id1, name as name1, address, age from eurus_user where id = 1;

这种方式要慎用,如果流上数据为n条,从数据库里查出来是m条,那么join之后数据的总量就会是n*m条,可能会造成内存溢出。

若 Source Namespace为 kafka.edp_kafka.udftest.udftable,Union Table为 mysql.ermysql.eurustest 数据库下的 eurus_user 表,eurus_user 表中须含有与源数据相同的 UMS 系统字段,SQL 语句规则同上。

注意事项:

Spark SQL

Spark SQL 用于处理 Source Namespace 数据,from 后面直接接表名即可。Spark SQL 支持使用 UDF 方法,UDF 方法须包含在该 Flow 对应的 Stream 中。

Stream Join SQL

配置数据转换逻辑,支持 SQL ,可以配置多条转换逻辑,调整逻辑顺序。

支持两种事件模型Processing Time和Event Time。Processing Time为数据处理时的时间,即数据进入flink operator时获取时间戳;Event Time为事件产生的时间,即数据产生时自带时间戳,在Wormhole系统中对应ums_ts_字段。

CEP

Wormhole Flink版对传输的流数据除了提供Lookup SQL、Flink SQL两种Transformation操作之外,还提供了CEP(复杂事件处理)这种转换机制。

CEP里面的几个必填属性的含义和用途如下:

须设定以下参数:

1)WindowTime:它是指在触发了符合Begin Pattern的数据记录后的窗口时间,如果watermark的time超过了触发时间+窗口时间,本次pattern结束

2)KeyBy:依据数据中的哪个字段来做分区,举个例子,比如,现在有一条数据,它的schema包括ums_id_,ums_op_,ums_ts_,value1,value2这几个字段,这里选定value1来做分区的依赖字段,那么,value1字段相同的数据将被分配到同一个分组上。CEP操作将分别针对每一分组的数据进行处理

3)Strategy:策略分为NO_SKIP和SKIP_PAST_LAST_EVENT两种,前者对应数据滑动策略,后者对应数据滚动策略,具体区别可以借鉴下面的例子:(假设一次处理4条)

4)Output:输出结果的形式,大致分为三类:Agg、Detail、FilteredRow

5)Pattern:筛选数据的规则。每个CEP由若干个pattern组成。

每个Pattern包括以下三个必填信息:

SQL

Lookup SQL具体可参考Spark Flow Transformation的Lookup SQL章节

Flink SQL 用于处理 Source Namespace 数据,from 后面直接接表名即可。Wormhole 0.6.0-beata及之后版本的Flinkx支持window,UDF和UDAF操作。0.6.0版本Flink SQL支持key by操作,key by字段在Transformation Config中进行配置,设置格式为json,其中json中key为key_by_fields,value为key by的字段,如果有多个字段,则用逗号分隔,例如:{“key_by_fields”:”name,city”}

Window

process time处理方式中window中相应的字段名称为processing_time。例:SELECT name, SUM(key) as keysum from ums GROUP BY TUMBLE(processing_time, INTERVAL ‘1’ HOUR), name;

event time处理方式中window中相应的字段名称为ums_ts_。例:SELECT name, SUM(key) as keysum from ums GROUP BY TUMBLE(ums_ts_, INTERVAL ‘1’ HOUR), name;

相关配置包括:

UDF

Wormhole Flink UDF支持普通的java程序,而不需要按照Flink官方文档的格式实现UDF。UDF名称大小写敏感。UDF相应的字段需要使用as指定新字段的名称。例如:

Java程序:

public class addint {
  public int fInt(int i) {
      return i + 1;
  }
} 使用UDF的Flink SQL:

select intvalue, fInt(intvalue) as fint from mytable; 
UDAF

(1)使用UDAF需要进行以下操作

(2)UDAF例程:计算带权重的值的平均值

(3)使用UDAF的Flink SQL:

SELECT name, udafAvg(score,weight) as udafAvg from ums GROUP BY name;
异常处理设置

Flink中通过Transformation Config可选择对流处理中异常信息的处理方式。现在能捕获读取kafka后数据预处理、lookup操作、写sink时的异常。处理方式有三种:

注意:当在配置文件中设置checkpoint为true,则异常处理不能设置为interrupt,否则flow会一直重启。

修改 Flow

修改 Flow 时,不能修改所选 Stream,SourceNamespace 和 SinkNamespace,可以修改 Protocol 类型,Result Fields,Sink Config 和 Transformation 转换逻辑

启动 Flow

启动 Spark Flow

生效 Flow

生效 Spark Flow

停止 Flow

停止 Spark Flow

停止 Flow 时向 Stream 发送取消指令,并检查 Stream 其他 Flow 对应的 Topic 是否包含该 Flow 对应的 Topic,如果不包含则取消 Stream 与该 Topic 的绑定关系

点击停止按钮提交取消对应Flink Task请求

可通过error列表查看失败数据的offset,并针对失败数据提交backfill作业

Flow 状态转换

Flow 漂移

Flow漂移规则

Flow状态 新Stream状态 新Flow状态
new/stopped/failed _ new/stopped
starting/updating/stopping 不可迁移 不可迁移
suspending _ stopped
running new/stopping/stopped/failed stopped
running starting/waiting/running starting

running flow topic offset 确定规则:

Job

借助 Job 可轻松实现 Lambda 架构和 Kappa 架构。

首先使用 hdfslog Stream 将源数据备份到 Hdfs,Flow 出错或需要重算时,可配置 Job 重算。具体配置可参考Stream 和 Flow。Job中source端可选择数据的版本信息,将该版本的数据重算。

Job配置中version为namespace的第五层,表示数据的版本。使用 hdfslog Stream 将源数据备份到 Hdfs时,改层需要为数字,配置job时,可根据不同的版本进行数据重算。

Job中Spark SQL表名为“increment”。例如:

select key, value from increment;

监控预警

Stream运行过程中会将每批处理的错误信息,offset信息,数据量信息和延时等信息发送至wormhole_feedback topic中。Wormhole Web应用负责消费这些信息,其中错误信息保存在MySQL数据库中,数据量信息和延时统计信息保存在Elasticsearch或者es中。

吞吐和延时信息从Stream/Flow两个维度展示,监控项说明如下。

Latency

Records

每批次处理的数据条数,对于UMS类型数据,指每批处理的ums消息payload中tuple总条数。

Throughput

Records/WormholeDelay