本文共 13554 字,大约阅读时间需要 45 分钟。
centos环境下,借助docker安装kafka,安装kafka之前我们需要安装Zookeeper;
一个典型的Kafka集群中包含以下:
- 若干Produce;
- 若干broker(broker数量越多,集群吞吐率越高);
- 若干Consumer Group,以及一个Zookeeper集群。
Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
下载kafka、zookeeper镜像:
#下载kafka镜像[root@jason jiahongfei]# docker pull wurstmeister/kafkaUsing default tag: latestlatest: Pulling from wurstmeister/kafkae7c96db7181b: Pull complete f910a506b6cb: Pull complete b6abafe80f63: Pull complete c7db1651fcd4: Pull complete 3a06d69a6954: Pull complete 18616ed64100: Pull complete Digest: sha256:a9980b591efe62a68de0acf5f5ce2f6fa7112ab07ec1099c976cdadc740c7ea4Status: Downloaded newer image for wurstmeister/kafka:latest#下载zookeeper镜像[root@jason jiahongfei]# docker pull wurstmeister/zookeeperUsing default tag: latestlatest: Pulling from wurstmeister/zookeepera3ed95caeb02: Pull complete ef38b711a50f: Pull complete e057c74597c7: Pull complete 666c214f6385: Pull complete c3d6a96f1ffc: Pull complete 3fe26a83e0ca: Pull complete 3d3a7dd3a3b1: Pull complete f8cc938abe5f: Pull complete 9978b75f7a58: Pull complete 4d4dbcc8f8cc: Pull complete 8b130a9baa49: Pull complete 6b9611650a73: Pull complete 5df5aac51927: Pull complete 76eea4448d9b: Pull complete 8b66990876c6: Pull complete f0dd38204b6f: Pull complete Digest: sha256:7a7fd44a72104bfbd24a77844bad5fabc86485b036f988ea927d1780782a6680Status: Downloaded newer image for wurstmeister/zookeeper:latest
启动kafka、zookeeper镜像:
#启动zookeepter镜像:docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper#启动kafka镜像docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.105:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.105:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafkaKAFKA_BROKER_ID=0KAFKA_ZOOKEEPER_CONNECT=192.168.0.105:2181KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.105:9092KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092#KAFKA_ZOOKEEPER_CONNECT 配置的是zookeeper的地址,可以单节点配置,也可以配置zookeeper集群多节点,用逗号隔开
注:容器操作辅助指令:
查看目前有哪些容器:docker container ls
暂停容器:docker container stop XXXXXXXXXXX #容器编号
删除容器:docker container rm XXXXXXXXXXX
org.springframework.cloud spring-cloud-starter-bus-kafka
spring: application: name: spring-cloud-config-server-9091 cloud: config: server: git: uri: XXXXXX.git username: XXXXXX password: XXXXXX bus: enabled: true trace: enabled: true refresh: enabled: true kafka: bootstrap-servers: 192.168.0.105:9092 consumer: group-id: config-serverserver: port: 9091eureka: client: service-url: defaultZone: http://localhost:9090/eurekamanagement: endpoints: web: exposure: exclude: bus-refresh
服务的消费者,同样需要引入上面的kafka配置
启动日志如下:
2020-07-17 23:47:42.669 INFO 6392 --- [ main] j.s.s.SpringCloudConfigServerApplication : No active profile set, falling back to default profiles: default2020-07-17 23:47:43.801 WARN 6392 --- [ main] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'bus-env' contains invalid characters, please migrate to a valid format.2020-07-17 23:47:44.114 WARN 6392 --- [ main] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'bus-refresh' contains invalid characters, please migrate to a valid format.2020-07-17 23:47:44.204 WARN 6392 --- [ main] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.2020-07-17 23:47:44.484 INFO 6392 --- [ main] o.s.cloud.context.scope.GenericScope : BeanFactory id=61cdd14d-e26e-3416-a945-6a6c14a92ba92020-07-17 23:47:44.604 INFO 6392 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.2020-07-17 23:47:44.611 INFO 6392 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.2020-07-17 23:47:44.616 INFO 6392 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.2020-07-17 23:47:44.739 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:44.741 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:44.761 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:44.766 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:44.781 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:44.786 INFO 6392 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2020-07-17 23:47:45.729 INFO 6392 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9091 (http)2020-07-17 23:47:45.869 INFO 6392 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]2020-07-17 23:47:45.871 INFO 6392 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.35]2020-07-17 23:47:46.051 INFO 6392 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext2020-07-17 23:47:46.051 INFO 6392 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 3360 ms2020-07-17 23:47:46.181 WARN 6392 --- [ main] c.n.c.sources.URLConfigurationSource : No URLs will be polled as dynamic configuration sources.2020-07-17 23:47:46.181 INFO 6392 --- [ main] c.n.c.sources.URLConfigurationSource : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.2020-07-17 23:47:46.196 INFO 6392 --- [ main] c.netflix.config.DynamicPropertyFactory : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@37854b342020-07-17 23:47:46.374 WARN 6392 --- [ main] c.n.c.sources.URLConfigurationSource : No URLs will be polled as dynamic configuration sources.2020-07-17 23:47:46.374 INFO 6392 --- [ main] c.n.c.sources.URLConfigurationSource : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.2020-07-17 23:47:46.617 INFO 6392 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'2020-07-17 23:47:47.839 INFO 6392 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'2020-07-17 23:47:47.867 INFO 6392 --- [ main] onConfiguration$FunctionBindingRegistrar : Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration2020-07-17 23:47:49.584 WARN 6392 --- [ main] ockingLoadBalancerClientRibbonWarnLogger : You already have RibbonLoadBalancerClient on your classpath. It will be used by default. As Spring Cloud Ribbon is in maintenance mode. We recommend switching to BlockingLoadBalancerClient instead. In order to use it, set the value of `spring.cloud.loadbalancer.ribbon.enabled` to `false` or remove spring-cloud-starter-netflix-ribbon from your project.2020-07-17 23:47:50.214 INFO 6392 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'2020-07-17 23:47:50.938 INFO 6392 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.springCloudBusInput' has 1 subscriber(s).2020-07-17 23:47:51.123 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel2020-07-17 23:47:51.253 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel springCloudBusInput2020-07-17 23:47:51.295 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel2020-07-17 23:47:51.318 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel springCloudBusOutput2020-07-17 23:47:51.335 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler org.springframework.cloud.stream.binding.StreamListenerMessageHandler@3e9beef22020-07-17 23:47:51.418 INFO 6392 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler errorLogger2020-07-17 23:47:51.456 INFO 6392 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel2020-07-17 23:47:51.460 INFO 6392 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).2020-07-17 23:47:51.461 INFO 6392 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'2020-07-17 23:47:51.891 WARN 6392 --- [ main] b.k.p.KafkaBinderConfigurationProperties : Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding..group to specify the group instead of group.id2020-07-17 23:47:51.983 INFO 6392 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: springCloudBus2020-07-17 23:47:51.988 INFO 6392 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: bootstrap.servers = [192.168.0.105:9092] client.dns.lookup = default client.id = connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS2020-07-17 23:47:52.073 INFO 6392 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.02020-07-17 23:47:52.076 INFO 6392 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f842020-07-17 23:47:52.076 INFO 6392 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1595000872073
转载地址:http://ltsn.baihongyu.com/