暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka安装及使用---Kafka从入门到精通(二)

1112

前面说了kafka的topic有分区的概念,每个分区又有leader 和 follower,kafka听过ack机制保证消息的可靠性。

初识kafka---Kafka从入门到精通(一)

1、下载安装zookeeper

下载地址:http://zookeeper.apache.org/releases.html#download

1、进入解压地址,进入D:\apache-zookeeper-3.8.0\conf

2、 将“zoo_sample.cfg”重命名为“zoo.cfg”

3、 打开“zoo.cfg”找到并编辑dataDir=D:\apache-zookeeper-3.8.0

4、添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.8.0

5、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin

6、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

7、 打开新的cmd,输入“zkServer“,运行Zookeeper

当出现下图,代表启动成功:

2、下载安装kafka

下载地址:https://kafka.apache.org/downloads

1、解压进入:D:\kafka_2.13-3.1.0\config

2、找到server.properties并打开,log.dirs=D:\kafka_2.13-3.1.0\kafka-logs

3、并编辑zookeeper.connect=localhost:2181

4、Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181

5、输入下面命令启动kafka,出现下图代表启动成功。

    .\bin\windows\kafka-server-start.bat .\config\server.properties
    复制

    3、集成springBoot

    1、先pom文件加入kafka二方包

       <!--引入kafak和spring整合的jar-->
      <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      </dependency>
      复制

        #------------------------------------Kafka-------------------#
        # 指定kafka 代理地址,可以多个
        spring.kafka.bootstrap-servers=localhost:9092
        ##########################producer about config##############################
        spring.kafka.producer.acks=1
        spring.kafka.producer.batch-size=16384
        spring.kafka.producer.retries=0
        spring.kafka.producer.buffer-memory=33554432
        #spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer.class
        #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer.class
        ##########################consumer about config##############################
        spring.kafka.consumer.enable-auto-commit=true
        spring.kafka.consumer.group-id=kafka_group_2
        spring.kafka.consumer.auto-commit-interval=100
        #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class
        #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class
        复制

          /**
          * @author keying
          */
          @RestController
          public class TestKafkaController {

          @Resource
          private KafkaTemplate<String, String> kafkaTemplate;

          /**
          * 同步发送
          *
          * @return
          */
          @RequestMapping("/sendKafka")
          public String syncSendMessage() {
          for (int i = 0; i < 10; i++) {
          try {
          kafkaTemplate.send("kafka-boot", "0", "kafkaMessage" + i).get();
          }catch (Exception e){
          e.printStackTrace();
          }
          }
          return "success";
          }


          }



          /**
          * @author keying
          */
          @Component
          public class ConsumerKafka {

          @KafkaListener(id = "foo", topics = "kafka-boot")
          public void listen1(String foo) {
          System.out.println("消费" + foo);
          }
          }
          复制

          文章转载自后端从入门到精通,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论