文章作者:姜二委
贝壳找房内容来源:贝壳产品技术
1.KafkaConnect简介
Kafka0.9+增加了一个新的特性KafkaConnect,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过Connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。
KafkaConnect可以将完整的存储系统中的数据注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从KafkaTopic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如MySQL、MongoDB、Elasticsearch、Cassandra、Ignite等。
KafkaConsumer和Producer都是Develop的工具,如果想用来做DataPipeline或ETL工作并且同步大量的数据,同时还要对DataPipeline做管理和监控,可以用KafkaConnect来实现。
KafkaConnect特性包括:
KafkaConnector通用框架,提供统一的集成API
同时支持分布式模式和单机模式
RESTAPI,用来查看和管理KafkaConnectors
自动化的offset管理,开发人员不必担心错误处理的影响
分布式、可扩展
流处理和批处理集成
Kafka背后的商业公司Confluent鼓励社区创建更多的开源的Connector,将Kafka生态圈壮大起来,促进KafkaConnnect的应用。
KafkaConnnect有两个核心概念:Source和Sink。Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。
当前KafkaConnect支持两种分发担保:atleastonce(至少一次)和atmostonce(至多一次),exactlyonce将在未来支持。
Connectors的发布和开发可以参照官方文档。如果以前你通过producerAPI/consumerAPI写了一些导入导出的功能,不妨尝试一下换成KafkaConnect,看看是否简化了你的代码,提高了应用可扩展和容错的能力。
2.KafkaConnect概念
KafkaConnect的几个重要的概念包括:Connectors、Tasks、Workers、Converters、Transforms。
Connectors:通过管理任务来细条数据流的高级抽象
Tasks:数据写入kafka和数据从kafka读出的实现
Workers:运行connectors和tasks的进程
Converters:KafkaConnect和其他存储系统直接发送或者接受数据之间转换数据
Transforms:更改Connector产生的或发送到Connector的每个消息的简单处理逻辑
DeadLetterQueue:KafkaConnect如何处理Connector错误
2.1Connectors
在kafkaconnect中,connector决定了数据应该从哪里复制过来以及数据应该写入到哪里去,一个connector实例是一个需要负责在kafka和其他系统之间复制数据的逻辑作业,connectorplugin是jar文件,实现了kafka定义的一些接口来完成特定的任务。
2.2Tasks
task是kafkaconnect数据模型的主角,每一个connector都会协调一系列的task去执行任务,connector可以把一项工作分割成许多的task,然后再把task分发到各个worker中去执行(分布式模式下),task不自己保存自己的状态信息,而是交给特定的kafkatopic去保存(config.storage.topic和status.storage.topic)。
在分布式模式下有一个概念叫做任务再平衡(TaskRebalancing),当一个connector第一次提交到集群时,所有的worker都会做一个taskrebalancing从而保证每一个worker都运行了差不多数量的工作,而不是所有的工作压力都集中在某个worker进程中,而当某个进程挂了之后也会执行taskrebalance。
当Connector首次提交到集群时,Worker将重新平衡集群中的全部Connectors及其Tasks,以便每个Worker具有大致相同的工作量。当Connector增加或减少它们需要的任务数量,或者Connector的配置发生更改时,也会使用相同的重新平衡过程。当一个Worker失败时,task将在活动的worker之间重新进行平衡。当task失败时,不会触发再平衡,因为task失败被视为异常情况。因此,失败的task不会由框架自动重新启动,而应该通过RESTAPI重新启动。
2.3Workers
connectors和tasks都是逻辑工作单位,必须安排在进程中执行,而在kafkaconnect中,这些进程就是workers,分别有两种worker:standalone和distributed。这里不对standalone进行介绍,具体的可以查看官方文档。我个人觉得distributedworker很棒,因为它提供了可扩展性以及自动容错的功能,你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。
2.4Converters
converter会把bytes数据转换成kafkaconnect内部的格式,也可以把kafkaconnect内部存储格式的数据转变成bytes,converter对connector来说是解耦的,所以其他的connector都可以重用,例如,使用了avroconverter,那么jdbcconnector可以写avro格式的数据到kafka,当然,hdfsconnector也可以从kafka中读出avro格式的数据。
ConfluentPlatform提供了以下Converters:
`AvroConverter`:io.confluent.connect.avro.AvroConverter:usewith`SchemaRegistry`
`ProtobufConverter`:io.confluent.connect.protobuf.ProtobufConverter:usewith`SchemaRegistry`
`JsonSchemaConverter`:io.confluent.connect.json.JsonSchemaConverter:usewith`SchemaRegistry`
`JsonConverter`:org.apache.kafka.connect.json.JsonConverter(without`SchemaRegistry`):usewithstructureddata
`StringConverter`:org.apache.kafka.connect.storage.StringConverter:simplestringformat
`ByteArrayConverter`:org.apache.kafka.connect.converters.ByteArrayConverter:providesa“pass-through”optionthatdoesnoconversion
2.5Transforms
Connector可以配置Transforms来对单个消息进行简单而轻量级的修改。这对于较小的数据调整和事件路由来说非常方便,并且多个Transforms可以在Connectors配置中链接在一起。但是,应用于多个消息的更复杂的转换和操作最好使用ksqlDB和Kafka流实现。
Transforms是一个简单的函数,它接受一条记录作为输入并输出修改后的记录。KafkaConnect提供的所有转换都执行简单但通常有用的修改。注意,您可以使用自己的自定义逻辑实现转换接口,将它们打包为KafkaConnect插件,并与任何Connectors一起使用它们。
KafkaConnectTransformations:
Transform
Description
Cast
Castfieldsortheentirekeyorvaluetoaspecifictype(forexample,toforceanintegerfieldtoasmallerwidth).
Drop
Dropeitherakeyoravaluefromarecordandsetittonull.
ExtractField
ExtractthespecifiedfieldfromaStructwhenschemapresent,oraMapinthecaseofschemalessdata.Anynullvaluesarepassedthroughunmodified.
ExtractTopic
Replacetherecordtopicwithanewtopicderivedfromitskeyorvalue.
Filter
Includeordroprecordsthatmatchthefilter.conditionpredicate.
Flatten
Flattenanesteddatastructure.Thisgeneratesnamesforeachfieldbyconcatenatingthefieldnamesateachlevelwithaconfigurabledelimitercharacter.
HoistField
WrapdatausingthespecifiedfieldnameinaStructwhenschemapresent,oraMapinthecaseofschemalessdata.
InsertField
Insertfieldusingattributesfromtherecordmetadataoraconfiguredstaticvalue.
MaskField
Maskspecifiedfieldswithavalidnullvalueforthefieldtype.
MessageTimeStampRouter
Updatetherecord’stopicfieldasafunctionoftheoriginaltopicvalueandtherecord’stimestampfield.
RegexRouter
Updatetherecordtopicusingtheconfiguredregularexpressionandreplacementstring.
ReplaceField
Filterorrenamefields.
SetSchemaMetadata
Settheschemaname,version,orbothontherecord’skeyorvalueschema.
TimestampConverter
ConverttimestampsbetweendifferentformatssuchasUnixepoch,strings,andConnectDateandTimestamptypes.
TimestampRouter
Updatetherecord’stopicfieldasafunctionoftheoriginaltopicvalueandtherecordtimestamp.
TombstoneHandler
Managetombstonerecords.Atombstonerecordisdefinedasarecordwiththeentirevaluefieldbeingnull,whetherornotithasValueSchema.
ValueToKey
Replacetherecordkeywithanewkeyformedfromasubsetoffieldsintherecordvalue.
2.6DeadLetterQueue
由于多种原因,可能会出现无效记录。举个栗子:一条记录到达以JSON格式序列化的SinkConnector,但SinkConnector配置期望的是Avro格式。当SinkConnector无法处理无效记录时,将根据Connector配置属性errors.tolerance处理该错误。
当errors.tolerance:none无效的记录会导致Connectortask立即失败,Connector进入失败状态。要解决这个问题,您需要检查KafkaConnectWorker日志,找出导致故障的原因,纠正它,并重新启动Connector。
当errors.tolerance:all,忽略所有错误或无效记录,继续处理。ConnectWorker日志中没有写入错误。要确定记录是否失败,您必须使用internalmetrics或计算错误记录数量,并将其与已处理的记录数量进行比较。
可用的错误处理特性是将所有无效记录路由到一个特殊KafkaTopic。此Topic包含SinkConnector无法处理的死信记录队列。
即使死信主题包含失败的记录,它也不会显示原因。你可以添加以下附加配置属性来包括失败的记录头信息:
errors.deadletterqueue.context.headers.enable=true
当此参数设置为true(默认为false)时,记录头被添加到死信队列中。然后可以使用kafkacatUtility查看记录头,并确定记录失败的原因。
为了避免与原始记录头冲突,死信队列上下文头键开始_connect.errors。
3.KafkaConnectElasticsearch实践
当你需要Connector的时候,你可以选择去Kafka和Confluent社区选择优秀的开源Connector,你也可以选择自己开发Connector为社区做贡献。
我的需求是使用KafkaConnect做DataPipeline,把Kafka集群中的消息高并行低延迟的同步到Elasticsearch集群中,ConfluentHub社区官方提供了很优秀的Connector:kafka-connect-elasticsearch,我就不重复造轮子了,拿来即用。
接下来梳理下kafka-connect-elasticsearch过程中的一些使用经验,如果是自己玩玩你可以使用Confluent全家桶,大家都知道Confluent是当初Linkin的几位kafka核心开发者创业成立的公司,致力于kafka的商业化,该团队基于kafka给社区贡献了几个优质的开源项目SchemaRegistry、KafkaRest、KSQL,还有很多kafkaconnectors组件。Confluent包含了从kafka集群搭建到connector组件部署,再到connect监控的一站式集成,使用非常方便,但是核心的ConfluentControlCenter及周边支持是企业版才有的特性,免费版只能试用一段时间,而且功能特性还有限制,社区版功能更是甚少,所以准备自己搭建监控平台。
你可以实现自己所需要的Connector,先从ConfluentHub下载kafka-connect-elasticsearch组件,为了方便管理,建议跟kafkaconnect在相同目录下,我使用的kafka_2.12-2.3.1版本,目录结构如下:
3.1Standalone模式部署KafkaConnect
standalone模式部署时,connect-standalone.properties配置文件中配置了kafkabroker地址、消息key和valueconverter格式、offset存储位置、connectors加载目录等基本信息,更详细参数参考