上周我们完成了系统实现的第一步,把kafka-connect-mqtt Docker Image成功上传到了 Oracle Registry中。
kafka-connect-mqtt 部署(已完成)
Streaming Service 部署(本期进行)
Oracle Kubernetes Engine 部署(本期进行)
OKE中部署Docker(本期进行)
Hive MQ 部署
Stream数据加载ADW
ADW数据加工及机器学习模型搭建
OAC前端可视化界面开发
今天接着来做2~4步,先搭建Streaming Service(流处理)环境,然后再搭建k8s集群环境,最后在k8s环境上部署kafka-connect-mqtt,并配置连接信息。
从架构图上来看,就是要实现下面红框这一部分。
今天做完后,我们将成功打通MQTT Broker到 Streaming Service这部分。
好,咱们开始!
Streaming Service 部署
本步骤中需要在Oracle Linux VM中部署 Docker环境,再从 Github上下载 kafka-connect-mqtt Docker文件,最后把 Docker Image Push到 Oracle Cloud Container Registry上。
1. 创建Kafka Connect Configuration
需要给Streaming Service的 Kafka Connect配置连接信息,以便和MQTT进行连接
路径:OCI Menu → Analytics → Streaming → Kafka Connect Configuration,选择 Create Kafka Connect Configuration
生成后可确认如下画面,记录 KAFKA CONNECT STORAGE TOPIC中的内容(后续连MQTT时需要)
例:
config.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-config
offset.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-offset
status.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-status
复制
2. 创建 Streaming Pool
路径:OCI Menu → Analytics → Streaming → Stream Pools,选择Create Stream Pool
创建成功后,确认如下画面,并记录View Kafka Connection Settings 中的内容(后续连MQTT时需要)
例:
# BOOTSTRAP SERVERS
api.cell-1.eu-frankfurt-1.streaming.oci.oraclecloud.com:9092
# SASL CONNECTION STRINGS
org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password="AUTH_TOKEN";
# SECURITY PROTOCOL
SASL_SSL
复制
Oracle Kubernetes Engine 部署
在创建OKE之前,需要在Identy中,设置Policy。
Admin用户:
allow service OKE to manage all-resources in tenancy
复制
一般用户:
allow group <group-name> to manage instance-family in <location>
allow group <group-name> to use subnets in <location>
allow group <group-name> to read virtual-network-family in <location>
allow group <group-name> to use vnics in <location>
allow group <group-name> to inspect compartments in <location>
allow group <group-name> to manage cluster-family in <location>
# 例:
allow group acme-dev-team to manage cluster-family in tenancy
复制
1. 创建Oracle Kubernetes Engine
路径:OCI Menu → Developer Services → Container Clusters(OKE) ,选择 Create Cluster
选择 Quick Create
确认基本信息
生成后可确认如下画面
2. OKE 基本环境设置
在生成的OKE Cluster界面上,选择 Acecc Kubeconfig 按照提示安装并设置OCI CLI, 配置Kube Config文件。(安装OCI CLI是为了远程连接OKE集群)
选择 Acecc Kubeconfig 界面如下
2.1 安装并配置 OCI CLI
按如下命令进行安装
bash -c "$(curl -L https://raw.githubusercontent.com/oracle/oci-cli/master/scripts/install/install.sh)"
复制
设置OCI Config 文件
oci setup config
复制
如下图,参考矩形框内信息,填写 user OCID, tenancy OCID等信息,创建OCI Config 文件
.oci directory中,选择Public Key文件, 把公钥粘贴到录下路径中 OCI Menu → Identity → Users → User Detials → API Keys
创建成功后,确认如下画面,并记录View Kafka Connection Settings 中的内容(后续连MQTT时需要)
详细内容可参照如下官方文档
https://docs.cloud.oracle.com/en-us/iaas/Content/API/SDKDocs/cliinstall.htm?tocpath=Developer%20Tools%20%7CCommand%20Line%20Interface%20(CLI)%20%7C_____1
2.2 配置Kube Config文件
mkdir -p $HOME/.kube
oci ce cluster create-kubeconfig --cluster-id ocid1.cluster.oc1.eu-frankfurt-1.aaaaaaaaaezdgzjuhbstgyrxgi3dinjrmy3timjxgntdcyzqgc2tsolbmi4g --file $HOME/.kube/config --region eu-frankfurt-1 --token-version 2.0.0
export KUBECONFIG=$HOME/.kube/config
复制
2.3 安装 kubectl
下载kubectl
curl -LO https://storage.googleapis.com/kubernetes-release/release/`curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/amd64/kubectl
复制
配置kubectl权限为可执行
chmod +x ./kubectl
复制
设定Path
sudo mv ./kubectl /usr/local/bin/kubectl
复制
OKE中部署Docker
本步骤中,我们会把之前上传到Oracle Registry中的 kafka-connect-mqtt Docker Image下载到 OKE中,并配置 Config Map和 Deploy yaml文件,最终在OKE中生成 kafka-coonect-mqtt Container
1. 创建k8s-configmap.yaml
创建一个k8s-configmap.yaml文件。配置信息参照如下代码,其中代码中的连接信息需要修改。
需要修改的部分如下:
bootstrap.servers 用 View Kafka Connection Settings的值进行替换
sasl.jaas.config
,
producer.sasl.jaas.config,
consumer.sasl.jaas.config 用 View Kafka Connection Settings值继续宁替换(后面的Token值也需要进行替换)config.storage.topic
,
offset.storage.topic,
status.storage.topic 用 KAFKA CONNECT STORAGE TOPICS进行替换
参考代码如下:
apiVersion: v1
kind: ConfigMap
metadata:
name: oss-kafka-connect-configmap
labels:
app: oss-kafka-connect
data:
connect-distributed.properties: |+
group.id=sesym2019-mqtt-connect
bootstrap.servers=api.cell-1.eu-frankfurt-1.streaming.oci.oraclecloud.com:9092
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
config.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-config
offset.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-offset
status.storage.topic=ocid1.connectharness.oc1.eu-frankfurt-1.amaaaaaax3ifkfyalsmn6o7psopm2i7wsglpdvx5apok74bzq7bkj4di2ftq-status
config.storage.replication.factor=1
config.storage.partitions=1
status.storage.replication.factor=1
status.storage.partitions=1
offset.storage.replication.factor=1
offset.storage.partitions=1
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka/plugins/lib
复制
2. 创建 k8s-deployment-service.yaml文件
创建k8s-deployment-service.yaml文件,配置信息参照如下代码,其中代码中的连接信息需要修改。
需要修改的部分如下:
Image
,
用OCI Registry地址进行替换3个 Value值
,用
View Kafka Connection Settings值进行替换
参考代码如下:
apiVersion: v1
kind: Service
metadata:
name: oss-kafka-connect-service
labels:
app: oss-kafka-connect
spec:
type: LoadBalancer
ports:
- port: 80
protocol: TCP
targetPort: 8083
selector:
app: oss-kafka-connect
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: oss-kafka-connect-deployment
spec:
replicas: 3
template:
metadata:
labels:
app: oss-kafka-connect
spec:
containers:
- name: oss-kafka-connect
image: fra.ocir.io/fry97l7sa3hs/kafka-connect-mqtt/kafka-connect:0.1
command: ["connect-distributed"]
args: ["/etc/kafka/connect-distributed.properties"]
volumeMounts:
- name: connect-distributed
mountPath: /etc/kafka/connect-distributed.properties
subPath: connect-distributed.properties
env:
- name: CONNECT_REST_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: CONNECT_BOOTSTRAP_SERVERS
value: api.cell-1.eu-frankfurt-1.streaming.oci.oraclecloud.com:9092
- name: CONNECT_REST_PORT
value: "8083"
- name: CONNECT_SASL_MECHANISM
value: PLAIN
- name: CONNECT_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
- name: CONNECT_GROUP_ID
value: sesym2019-mqtt-connect
- name: CONNECT_PRODUCER_SASL_MECHANISM
value: PLAIN
- name: CONNECT_PRODUCER_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_PRODUCER_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
- name: CONNECT_CONSUMER_SASL_MECHANISM
value: PLAIN
- name: CONNECT_CONSUMER_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_CONSUMER_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username="hol200207jhb/oracleidentitycloudservice/chulnam.kim@oracle.com/ocid1.streampool.oc1.eu-frankfurt-1.amaaaaaax3ifkfyankxpqvb7lljzzklyf7aaokmhg2y6wipjcfkfzfv3rrda" password=")Md:UnOpM..rY8rxg-#V";
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: <connect-harness-OCID>-config
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: <connect-harness-OCID>-offset
- name: CONNECT_STATUS_STORAGE_TOPIC
value: <connect-harness-OCID>-status
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_KEY_CONVERTER
value: org.apache.kafka.connect.storage.StringConverter
- name: CONNECT_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE
value: "true"
- name: CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE
value: "true"
ports:
- name: kafka-connect
containerPort: 8083
protocol: TCP
volumes:
- name: connect-distributed
configMap:
name: oss-kafka-connect-configmap
复制
如下图把OCI Registry变更为Public
3. 部署kafka-connect-mqtt
Docker Image和配置文件准备好之后,就生成Container。
kubectl apply -f .
复制
查看所有pods
kubectl get pods
复制
确认Load Balancer IP
kubectl get svc
复制
# 如果生成Container失败时,可以查看日志,确认原因
kubectl logs <pod NAME>
# 如需要删除Pod时,可执行如下命令
kubectl delete -f .
复制
好了,今天的内容比较多,总结下!
首先我们搭建了Streaming Service(流处理)环境,然后搭建了k8s集群环境,最后在k8s环境上,通过Oracle Registry中的kafka-connect-mqtt Docker Image、k8s-configmap.yaml文件、k8s-deployment-service.yaml文件,成功生成了kafka-connet-mqtt container
到此我们成功打通MQTT Broker到 Streaming Service这部分,IoT实时数据分析的数据接入部分算做了一大半多了!
下期咱们接着看,如何把Streaming的数据加载到ADW(Autonomous Data Warehouse),并基于指数平滑进行预测分析!