基于KafkaConnect的流数据同

文章来源:一氧化碳中毒   发布时间:2021-7-17 13:34:01   点击数:
  

文章作者:姜二委

贝壳找房

内容来源:贝壳产品技术

1.KafkaConnect简介

Kafka0.9+增加了一个新的特性KafkaConnect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过Connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。

KafkaConnect可以将完整的存储系统中的数据注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从KafkaTopic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如MySQL、MongoDB、Elasticsearch、Cassandra、Ignite等。

KafkaConsumer和Producer都是Develop的工具,如果想用来做DataPipeline或ETL工作并且同步大量的数据,同时还要对DataPipeline做管理和监控,可以用KafkaConnect来实现。

KafkaConnect特性包括:

KafkaConnector通用框架,提供统一的集成API

同时支持分布式模式和单机模式

RESTAPI,用来查看和管理KafkaConnectors

自动化的offset管理,开发人员不必担心错误处理的影响

分布式、可扩展

流处理和批处理集成

Kafka背后的商业公司Confluent鼓励社区创建更多的开源的Connector,将Kafka生态圈壮大起来,促进KafkaConnnect的应用。

KafkaConnnect有两个核心概念:Source和Sink。Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。

当前KafkaConnect支持两种分发担保:atleastonce(至少一次)和atmostonce(至多一次),exactlyonce将在未来支持。

Connectors的发布和开发可以参照官方文档。如果以前你通过producerAPI/consumerAPI写了一些导入导出的功能,不妨尝试一下换成KafkaConnect,看看是否简化了你的代码,提高了应用可扩展和容错的能力。

2.KafkaConnect概念

KafkaConnect的几个重要的概念包括:Connectors、Tasks、Workers、Converters、Transforms。

Connectors:通过管理任务来细条数据流的高级抽象

Tasks:数据写入kafka和数据从kafka读出的实现

Workers:运行connectors和tasks的进程

Converters:KafkaConnect和其他存储系统直接发送或者接受数据之间转换数据

Transforms:更改Connector产生的或发送到Connector的每个消息的简单处理逻辑

DeadLetterQueue:KafkaConnect如何处理Connector错误

2.1Connectors

在kafkaconnect中,connector决定了数据应该从哪里复制过来以及数据应该写入到哪里去,一个connector实例是一个需要负责在kafka和其他系统之间复制数据的逻辑作业,connectorplugin是jar文件,实现了kafka定义的一些接口来完成特定的任务。

2.2Tasks

task是kafkaconnect数据模型的主角,每一个connector都会协调一系列的task去执行任务,connector可以把一项工作分割成许多的task,然后再把task分发到各个worker中去执行(分布式模式下),task不自己保存自己的状态信息,而是交给特定的kafkatopic去保存(config.storage.topic和status.storage.topic)。

在分布式模式下有一个概念叫做任务再平衡(TaskRebalancing),当一个connector第一次提交到集群时,所有的worker都会做一个taskrebalancing从而保证每一个worker都运行了差不多数量的工作,而不是所有的工作压力都集中在某个worker进程中,而当某个进程挂了之后也会执行taskrebalance。

当Connector首次提交到集群时,Worker将重新平衡集群中的全部Connectors及其Tasks,以便每个Worker具有大致相同的工作量。当Connector增加或减少它们需要的任务数量,或者Connector的配置发生更改时,也会使用相同的重新平衡过程。当一个Worker失败时,task将在活动的worker之间重新进行平衡。当task失败时,不会触发再平衡,因为task失败被视为异常情况。因此,失败的task不会由框架自动重新启动,而应该通过RESTAPI重新启动。

2.3Workers

connectors和tasks都是逻辑工作单位,必须安排在进程中执行,而在kafkaconnect中,这些进程就是workers,分别有两种worker:standalone和distributed。这里不对standalone进行介绍,具体的可以查看官方文档。我个人觉得distributedworker很棒,因为它提供了可扩展性以及自动容错的功能,你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。

2.4Converters

converter会把bytes数据转换成kafkaconnect内部的格式,也可以把kafkaconnect内部存储格式的数据转变成bytes,converter对connector来说是解耦的,所以其他的connector都可以重用,例如,使用了avroconverter,那么jdbcconnector可以写avro格式的数据到kafka,当然,hdfsconnector也可以从kafka中读出avro格式的数据。

ConfluentPlatform提供了以下Converters:

`AvroConverter`:io.confluent.connect.avro.AvroConverter:usewith`SchemaRegistry`

`ProtobufConverter`:io.confluent.connect.protobuf.ProtobufConverter:usewith`SchemaRegistry`

`JsonSchemaConverter`:io.confluent.connect.json.JsonSchemaConverter:usewith`SchemaRegistry`

`JsonConverter`:org.apache.kafka.connect.json.JsonConverter(without`SchemaRegistry`):usewithstructureddata

`StringConverter`:org.apache.kafka.connect.storage.StringConverter:simplestringformat

`ByteArrayConverter`:org.apache.kafka.connect.converters.ByteArrayConverter:providesa“pass-through”optionthatdoesnoconversion

2.5Transforms

Connector可以配置Transforms来对单个消息进行简单而轻量级的修改。这对于较小的数据调整和事件路由来说非常方便,并且多个Transforms可以在Connectors配置中链接在一起。但是,应用于多个消息的更复杂的转换和操作最好使用ksqlDB和Kafka流实现。

Transforms是一个简单的函数,它接受一条记录作为输入并输出修改后的记录。KafkaConnect提供的所有转换都执行简单但通常有用的修改。注意,您可以使用自己的自定义逻辑实现转换接口,将它们打包为KafkaConnect插件,并与任何Connectors一起使用它们。

KafkaConnectTransformations:

Transform

Description

Cast

Castfieldsortheentirekeyorvaluetoaspecifictype(forexample,toforceanintegerfieldtoasmallerwidth).

Drop

Dropeitherakeyoravaluefromarecordandsetittonull.

ExtractField

ExtractthespecifiedfieldfromaStructwhenschemapresent,oraMapinthecaseofschemalessdata.Anynullvaluesarepassedthroughunmodified.

ExtractTopic

Replacetherecordtopicwithanewtopicderivedfromitskeyorvalue.

Filter

Includeordroprecordsthatmatchthefilter.conditionpredicate.

Flatten

Flattenanesteddatastructure.Thisgeneratesnamesforeachfieldbyconcatenatingthefieldnamesateachlevelwithaconfigurabledelimitercharacter.

HoistField

WrapdatausingthespecifiedfieldnameinaStructwhenschemapresent,oraMapinthecaseofschemalessdata.

InsertField

Insertfieldusingattributesfromtherecordmetadataoraconfiguredstaticvalue.

MaskField

Maskspecifiedfieldswithavalidnullvalueforthefieldtype.

MessageTimeStampRouter

Updatetherecord’stopicfieldasafunctionoftheoriginaltopicvalueandtherecord’stimestampfield.

RegexRouter

Updatetherecordtopicusingtheconfiguredregularexpressionandreplacementstring.

ReplaceField

Filterorrenamefields.

SetSchemaMetadata

Settheschemaname,version,orbothontherecord’skeyorvalueschema.

TimestampConverter

ConverttimestampsbetweendifferentformatssuchasUnixepoch,strings,andConnectDateandTimestamptypes.

TimestampRouter

Updatetherecord’stopicfieldasafunctionoftheoriginaltopicvalueandtherecordtimestamp.

TombstoneHandler

Managetombstonerecords.Atombstonerecordisdefinedasarecordwiththeentirevaluefieldbeingnull,whetherornotithasValueSchema.

ValueToKey

Replacetherecordkeywithanewkeyformedfromasubsetoffieldsintherecordvalue.

2.6DeadLetterQueue

由于多种原因,可能会出现无效记录。举个栗子:一条记录到达以JSON格式序列化的SinkConnector,但SinkConnector配置期望的是Avro格式。当SinkConnector无法处理无效记录时,将根据Connector配置属性errors.tolerance处理该错误。

当errors.tolerance:none无效的记录会导致Connectortask立即失败,Connector进入失败状态。要解决这个问题,您需要检查KafkaConnectWorker日志,找出导致故障的原因,纠正它,并重新启动Connector。

当errors.tolerance:all,忽略所有错误或无效记录,继续处理。ConnectWorker日志中没有写入错误。要确定记录是否失败,您必须使用internalmetrics或计算错误记录数量,并将其与已处理的记录数量进行比较。

可用的错误处理特性是将所有无效记录路由到一个特殊KafkaTopic。此Topic包含SinkConnector无法处理的死信记录队列。

即使死信主题包含失败的记录,它也不会显示原因。你可以添加以下附加配置属性来包括失败的记录头信息:

errors.deadletterqueue.context.headers.enable=true

当此参数设置为true(默认为false)时,记录头被添加到死信队列中。然后可以使用kafkacatUtility查看记录头,并确定记录失败的原因。

为了避免与原始记录头冲突,死信队列上下文头键开始_connect.errors。

3.KafkaConnectElasticsearch实践

当你需要Connector的时候,你可以选择去Kafka和Confluent社区选择优秀的开源Connector,你也可以选择自己开发Connector为社区做贡献。

我的需求是使用KafkaConnect做DataPipeline,把Kafka集群中的消息高并行低延迟的同步到Elasticsearch集群中,ConfluentHub社区官方提供了很优秀的Connector:kafka-connect-elasticsearch,我就不重复造轮子了,拿来即用。

接下来梳理下kafka-connect-elasticsearch过程中的一些使用经验,如果是自己玩玩你可以使用Confluent全家桶,大家都知道Confluent是当初Linkin的几位kafka核心开发者创业成立的公司,致力于kafka的商业化,该团队基于kafka给社区贡献了几个优质的开源项目SchemaRegistry、KafkaRest、KSQL,还有很多kafkaconnectors组件。Confluent包含了从kafka集群搭建到connector组件部署,再到connect监控的一站式集成,使用非常方便,但是核心的ConfluentControlCenter及周边支持是企业版才有的特性,免费版只能试用一段时间,而且功能特性还有限制,社区版功能更是甚少,所以准备自己搭建监控平台。

你可以实现自己所需要的Connector,先从ConfluentHub下载kafka-connect-elasticsearch组件,为了方便管理,建议跟kafkaconnect在相同目录下,我使用的kafka_2.12-2.3.1版本,目录结构如下:

3.1Standalone模式部署KafkaConnect

standalone模式部署时,connect-standalone.properties配置文件中配置了kafkabroker地址、消息key和valueconverter格式、offset存储位置、connectors加载目录等基本信息,更详细参数参考

转载请注明:http://www.lwblm.com/bzbk/12154.html