博客
关于我
Spring Cloud构建微服务架构--基于kafka构建消息总线BUS
阅读量:187 次
发布时间:2019-02-28

本文共 2899 字,大约阅读时间需要 9 分钟。

CentOS环境下Kafka集群部署指南

一、环境准备

在CentOS环境下部署Kafka集群之前,需先安装Zookeeper。Kafka集群通常包括以下组件:

  • Produce:负责发布消息到broker。
  • Broker:Kafka的消息中间件,集群中 Broker 的数量会影响吞吐率。
  • Consumer Group:用于消费消息的组件。
  • Zookeeper:用于管理Kafka集群的配置、选举leader以及处理Consumer Group的rebalance。

Kafka通过Zookeeper实现集群管理,Producer采用push模式发布消息,Consumer采用pull模式消费消息。

下载Kafka和Zookeeper镜像

# 下载kafka镜像docker pull wurstmeister/kafka:latest# 下载zookeeper镜像docker pull wurstmeister/zookeeper:latest

启动Kafka和Zookeeper镜像

# 启动zookeeper镜像docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper# 启动kafka镜像docker run -d --name kafka -p 9092:9092 \-e KAFKA_BROKER_ID=0 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.0.105:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.105:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-t wurstmeister/kafka

注意事项

  • KAFKA_ZOOKEEPER_CONNECT 配置的是Zookeeper的地址,支持单节点或多节点(多节点时用逗号分隔)。
  • 容器操作辅助指令可参考以下命令:
    • 查看容器:docker container ls
    • 停止容器:docker container stop XXXXXXXX
    • 删除容器:docker container rm XXXXXXXX

二、配置中心项目POM.xml引入Kafka自动装配jar包

在项目的POM.xml中添加Kafka相关的依赖:

org.springframework.cloud
spring-cloud-starter-bus-kafka

三、修改application.yml引入Kafka地址配置

在application.yml中添加Kafka配置:

spring:    application:        name: spring-cloud-config-server-9091    cloud:        config:            server:                git:                    uri: XXXXXX.git                    username: XXXXXX                    password: XXXXXX    bus:        enabled: true        trace: enabled: true        refresh: enabled: true    kafka:        bootstrap-servers: 192.168.0.105:9092        consumer:            group-id: config-servers    server:        port: 9091eureka:    client:        service-url:            defaultZone: http://localhost:9090/eurekamanagement:    endpoints:        web:            exposure:                exclude: bus-refresh

注意事项

  • bootstrap-servers 配置Kafka的地址,需替换为实际地址。
  • group-id 配置Consumer组的ID,需与消费者组件保持一致。

四、启动Spring-Config-Server

启动服务时会输出以下日志信息:

2020-07-17 23:47:42.669 INFO 6392 --- [           main] j.s.s.SpringCloudConfigServerApplication : No active profile set, falling back to default profiles: default2020-07-17 23:47:43.801 WARN 6392 --- [           main] o.s.boot.actuate.endpoint.EndpointId     : Endpoint ID 'bus-env' contains invalid characters, please migrate to a valid format.2020-07-17 23:47:44.204 INFO 6392 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.2020-07-17 23:47:44.739 INFO 6392 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)

注意事项

  • 关注启动日志中的配置警告,确保服务正常运行。
  • 可参考Spring Cloud文档获取更多Kafka集群配置信息。

转载地址:http://ltsn.baihongyu.com/

你可能感兴趣的文章
pandas :检测一个DF和另一个DF之间缺失的列
查看>>
Pandas-从具有嵌套列表列表的现有列创建动态列时出错
查看>>
Pandas-通过对列和索引的值求和来合并两个数据框
查看>>
pandas.columns、get_dummies等用法
查看>>
pandas.DataFrame.copy(deep=True) 实际上并不创建深拷贝
查看>>
pandas.read_csv()的详解-ChatGPT4o作答
查看>>
PANDAS.READ_EXCEL()输出‘;溢出错误:日期值超出范围‘;而不存在日期列
查看>>
pandas100个骚操作:再见 for 循环!速度提升315倍!
查看>>
Pandas:如何根据其他列值的条件对列进行求和?
查看>>
Pandas:对给定列求和 DataFrame 行
查看>>
Pandas、Matplotlib、Pyecharts数据分析实践
查看>>
Pandas中文官档~基础用法2
查看>>
Pandas中文官档~基础用法5
查看>>
Pandas中文官档~基础用法6
查看>>
Pandas中的GROUP BY AND SUM不丢失列
查看>>
pandas交换两列
查看>>
pandas介绍-ChatGPT4o作答
查看>>
pandas去除Nan值
查看>>
pandas实战:电商平台用户分析
查看>>
Pandas库常用方法、函数集合
查看>>