Canal Configuration
MySQL 授权
授权 Canal 链接 MySQL 账号具有作为 MySQL Slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
复制
Note:对于自建 MySQL , 需要先开启 binlog 写入功能,配置 binlog-format 为 ROW 模式
MySQL 中新建 DB、Table
CREATE DATABASE test;
USE test;
CREATE TABLE `tb_user` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(30) DEFAULT NULL,
`password` varchar(30) DEFAULT NULL,
`age` int(10) unsigned DEFAULT NULL,
`create_by` varchar(30) NOT NULL,
`update_by` varchar(30) NOT NULL,
`create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制
ES 中新建 index
PUT /test_user
{
"mappings": {
"properties": {
"username": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"age": {
"type": "long"
},
"create_time": {
"type": "date"
},
"update_time": {
"type": "date"
}
}
}
}
复制
Canal Deployer Configruation
下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
复制
新建目录
mkdir -p ~/app/canal-deployer
复制
解压安装包到目录
tar zxvf canal.deployer-1.1.5.tar.gz -C ~/app/canal-deployer
复制
修改 instance.properties
vi ~/app/canal-deployer/conf/example/instance.properties
复制
修改这4行
canal.instance.mysql.slaveId=0
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
复制
完整的 instance.properties 如下
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
复制
启动
cd ~/app/canal-deployer/bin
./startup.sh
复制
Canal Adapter Configruation
下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
复制
新建目录
mkdir -p ~/app/canal-adapter
复制
解压安装包到目录
tar zxvf canal.adapter-1.1.5.tar.gz -C ~/app/canal-adapter
复制
修改 application.yml
vi ~/app/canal-adapter/conf/application.yml
复制
完整的 application.yml 如下
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
key: es7key
hosts: http://127.0.0.1:9200
properties:
mode: rest
# security.auth: test:123456 # only used for rest mode
cluster.name: docker-cluster
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
复制
在 ~/app/canal-adapter/conf/es7/ 目下新建一个 test_user.yml
vi ~/app/canal-adapter/conf/es7/test_user.yml
复制
新增如下
dataSourceKey: defaultDS
outerAdapterKey: es7key
destination: example
groupId: g1
esMapping:
_index: test_user
_id: _id
upsert: true
sql: "select a.id as _id, a.username, a.age, a.create_time, a.update_time from tb_user a"
commitBatch: 3000
复制
启动
cd ~/app/canal-adapter/bin
./startup.sh
复制
启动过程中会遇到一个超级大坑
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]
复制
原因是 druid 包冲突导致的,解决办法如下:
下载源码包
wget https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz
复制
解压后,使用IDEA打开,定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<scope>provided</scope>
</dependency>
复制
更新后,在项目根目录下执行
mvn clean package
复制
然后到 canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的
ES Test
解决完 jar 包冲突后,再次重启 canal adapter 后,发现日志不再报错,测试 MySQL 增量数据同步是否成功
未插入数据前
GET /test_user/_search
复制
插入数据
INSERT INTO `tb_user` (`id`, `username`, `password`, `age`, `create_by`, `update_by`, `create_time`, `update_time`)
VALUES
(1, 'canal1', 'canal1', 28, 'admin', 'admin', '2021-07-01 00:00:00.000', '2021-07-01 00:00:00.000');
复制
再次执行查询
也可以执行单个查询
GET /test_user/_doc/1
复制
更新数据
update tb_user set age = 38 where id = 1;
复制
再次查询
删除数据
delete from tb_user where id = 1;
复制
再次查询
Reference
https://github.com/alibaba/canal/issues/3144
https://github.com/alibaba/canal/wiki/QuickStart
https://github.com/alibaba/canal/releases
泰克风格 只讲干货 不弄玄虚