《 如果快速构建kafka 实时etl管道 》著作原文:
https://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect/
在进入正题之前,我们首先来看几个问题:
Q:什么是kafka connect ?
A:简单来说,是一个数据接入接出工具。
Q:kafka connect 有什么特点?
A:流式的,实时的。所以称作 etl pipeline , 也就是数据管道。同时,使用kafka connect , 可以实现数据无需编程,无需代码,就能实现数据从各种的数据源同步到其他各种不同的数据源。
Q:kafka connect 支持哪些数据源的 source 和 sink ?
A:答案是很多很多:https://www.confluent.io/product/connectors/
言归正传。
Q:这篇文章讲了什么?
A: 主要讲述如何通过jdbc把mysql 数据源通过 kafka connect 实时同步到hive。
Q: 我也想试一试,但是从头开始搭建环境,想想就觉得累,怎么办?
A:作者就知道你懒,所以已经帮你弄了个虚拟机,然后采用vagrant来托管。虚拟机环境的安装操作绝大部分都可以通过vagrant来完成。
Q:vagrant 怎么玩?
A:就这么简单:
$ git clone https://github.com/confluentinc/kafka-connect-blog$ cd kafka-connect-blog $ vagrant up复制
Q:就怎么简单?
A:当然不是。我在实际应用的时候,还是遇到了不少的坑。比如:
vagrant在配置虚拟机的时候需要下载安装:
/Users/poon/.vagrant.d/boxes/ubuntu-VAGRANTSLASH-trusty64/20180510.0.5/virtualbox/box-disk1.vmdk
然而国内的网络是很慢的,而且windows和mac还有区别,我是在windows上下载了一遍,然后部署到mac上的。
2. vagrant provision 的时候还会碰到很多诸如安装包安装不上的问题,都要逐个解决。最常见的是问题是:网络不通;版本过期。比如:
3. java-7-oracle 相关安装包已经没有了官方支持,导致进行不下去。我的解决版本是,修改kafka-connect-log 下的配置文件, 把java7相关配置改成java8. 这样虽然安装成了,但是还有一些写死的配置文件,等安装完之后,在虚拟机里还要进一步调整。这要在/mnt/etc/或者 /vagrant/etc 目录下查找并修改。
vagrant安装问题解决,虚拟机环境没问题之后,就可以按照文中步骤做:
./setup.sh 和 ./start.sh 了。 注意这两个shell的启动要在vagrant下进行,不能在root下进行。否成容易导致权限错乱。
接下来的步骤就是:准备mysql数据,插入数据,变更数据,删除列(schema.capatibility )的分步验证了。看看数据是否能够在hive里面实时变更。
记住,在查看数据变化时,别忘了执行:
$ connect-standalone /mnt/etc/connect-avro-standalone.properties \ /mnt/etc/mysql.properties /mnt/etc/hdfs.properties &复制
其中:
/mnt/etc/mysql.properties
/mnt/etc/hdfs.properties复制
是已经默认配置好了的mysql连接信息,以及 sink到hdfs的配置信息。
这两个文件就是 从mysql抽到 hive 唯一需要我们之际customize的地方。
案例中之针对 mysql的demo.users表做了抽取同步。那么如果我想多抽几张表,应该怎么配置呢?我们发现hdfs里面有topics这个参数,经过试验,得出配置如下:
vagrant@vagrant-ubuntu-trusty-64:~$ cat /mnt/etc/mysql.properties
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
name=test-mysql-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=2
connection.url=jdbc:mysql://localhost:3306/demo?user=root&password=mypassword
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=modified
topic.prefix=test_jdbc_
vagrant@vagrant-ubuntu-trusty-64:~$ cat /mnt/etc/hdfs.properties
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
topics=test_jdbc_users,test_jdbc_mytable
hdfs.url=hdfs://localhost:9000
flush.size=2
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=department
vagrant@vagrant-ubuntu-trusty-64:~$
需要注意的是:
mysql表一定要有数字类型的非空主键的。
tasks.max 需要设置大于1
topics有多个时,需要逗号分割。一定是真实存在的topic。
parttition.field.name 如果配的话,只能让每个topic的partiton字段都一样(貌似不能不配)。现实中,配也没关系,因为通常比较常用的partition字段是年月日等。
本次主要介绍使用 vagrant和 kafka connect 快速验证 mysql到hive的实时抽取。下次将尝试不同数据源之间的实时同步。比如 oracle到hive等。敬请期待。