
作者简介
马听,多年 DBA 实战经验,对 MySQL、 Redis、ClickHouse 等数据库有一定了解,专栏《一线数据库工程师带你深入理解 MySQL》、《Redis 运维实战》作者。
从这一节开始,将通过几期跟各位分享我的一些 ClickHouse 实战笔记。
这一期首先聊聊 Kafka 数据同步到 ClickHouse 的其中一个方案:通过 Kafka 引擎方式同步,下面进入实际操作过程(环境:CentOS7.4):
1 Kafka 基础环境搭建
因为主要是为了测试数据同步,因此 Kafka 只简单安装了单机版本。
1.1 安装 JDK
cd usr/src
在这里[https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html]选择合适的 JDK 版本,并下载。
tar zxvf jdk-8u261-linux-x64.tar.gz
mv jdk1.8.0_261/ java
编辑 etc/profile
vim etc/profile
加入以下内容:
JAVA_HOME=/usr/src/java
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME PATH
执行
source etc/profile
1.2 安装 kafka
cd usr/src
在这里[http://archive.apache.org/dist/kafka/2.0.0/]选择合适的 kafka 版本,并下载。
tar zxvf kafka_2.11-2.0.0.tgz
mv kafka_2.11-2.0.0 kafka
1.3 启动 zk
cd usr/src/kafka
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
1.4 启动 kafka
nohup ./bin/kafka-server-start.sh config/server.properties &
1.5 创建 topics
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.6 查看 topics
./bin/kafka-topics.sh --list --zookeeper localhost:2181

1.7 产生消息


1.8 消费消息
cd usr/src/kafka/ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以看到在 1.7 步骤生成的消息。
2 安装 ClickHouse
3 创建消费表
create database kafka_data; use kafka_data;
create table kafka_queue(id UInt32,code String,name String)engine =Kafka() settings kafka_broker_list = 'localhost:9092',kafka_topic_list='test',kafka_group_name='group1',kafka_format='JSONEachRow',kafka_skip_broken_messages=100;
kafka_broker_list:kafka 的连接地址和端口。 kafka_topic_list:kafka 的 topic 名。 kafka_group_name:kafka 的组名。 kafka_format:表示用于解析消息的数据格式,消息发送端必须按此格式发送消息。 kafka_skip_broken_messages:当解析数据出现错误时,运行跳过失败的数据行数。
4 创建存储表
5 创建数据同步视图
create materialized view consumer to kafka_table as select id,code,name from kafka_queue
6 测试数据同步
/usr/src/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"id":2,"code":"two","name":"aa"}
select * from kafka_table
7 其他维护操作
drop table consumer
detach table consumer
attach materialized view consumer to kafka_table(id UInt32,code String,name String)as select id,code,name from kafka_queue






