首页 > 编程语言 >Java开发 - 消息队列之Kafka初体验

Java开发 - 消息队列之Kafka初体验

时间:2023-03-20 13:04:16浏览次数:55  
标签:初体验 Java 启动 cart kafka 消息 import Kafka


目录

​​前言​​

​​Kafka​​

​​什么是Kafka​​

​​Kafka软件结构​​

​​Kafka的特点​​

​​怎么启动Kafka​​

​​下载Kafka​​

​​配置Kafka ​​

​​Zookeeper​​

​​启动Kafka​​

​​Kafka案例​​

​​添加依赖​​

​​添加配置​​

​​配置启动类​​

​​创建生产者​​

​​创建消费者​​

​​测试​​

​​结语​​


前言

前几日总结了消息队列的一些知识,相信看完的同学们对消息队列的功能和作用都多少有了些基础的了解,为了让大家更快的上手消息队列,今天将给大家带来一篇大数据中常用的Kafka的介绍和实战教程,学完此篇,你将了解什么是Kafka,怎么使用Kafka和怎么将Kafka应用到项目中。

Kafka

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka软件结构

Producer:生产者,消息发送方,消息来源;

Consumer:消费者,消息接收方;

Topic:话题,消息收发方根据话题才能找到对的那个人,不会乱发;

Record:消息记录,是生产者和消费者传递的内容,保存在指定Topic内。

Kafka的特点

前文提到,Kafka是针对大数据设计的,官方号称并发1w/s,运行时堪比内存,可见其功能还是很强大的。

实际运行中,Kafka将消息队列中的信息保存在硬盘,它堪比内存的效率是因为对硬盘的读取规则做了优化,这点很强。主要是:顺序读写,零拷贝,日志压缩等技术。详情推荐这篇博客:​​整理了一周的Kafka规划和优化方案​​

Kafka在处理队列中的数据时,能够一直向服务器硬盘中保存队列信息,理论上没有大小限制,除非服务器硬盘满了。默认信息保存7天,时间可配置,数据量比较大的情况一天一处理,可以有效提高服务器性能,减少Kafka消耗。

怎么启动Kafka

下载Kafka

下载地址:​​Kafka下载地址​

下载后博主是放在一个叫Kafka的文件夹内,文件夹下放下载后解压的Kafka和一个用来存储Kafka运行过程中产生数据的data(空目录)文件:

Java开发 - 消息队列之Kafka初体验_kafka

配置Kafka 

进入 kafka_2.13-2.4.1/config文件,用记事本打开如下文件:

Java开发 - 消息队列之Kafka初体验_消息队列_02

 将dataDir后面的路径修改成你自己的data文件夹地址:

Java开发 - 消息队列之Kafka初体验_kafka_03

保存,关闭。再打开server.properties文件:

Java开发 - 消息队列之Kafka初体验_Producer_04

 修改log.dirs后面的路径为你自己电脑上data的地址,和上面的地址是一样的:

Java开发 - 消息队列之Kafka初体验_kafka_05

做完这些记得保存啊,不保存啥用也没有。

Zookeeper

说起Zookeeper,其实我们上面的配置是配置的Zookeeper,但却和Kafka息息相关,想知道他们之间的关系吗?我们继续往下看。

ZooKeeper是一个分布式的,​​开放源码​​​的​​分布式应用程序​​​协调服务,是​​Google​​​的Chubby一个开源的实现,是​​Hadoop​​​和​​Hbase​​​的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、​​域名服务​​、分布式同步、组服务等。

上面的解释很官方,不是很好理解。通俗地说就是:早期的服务器软件安装后都有各自的配置,但是要安装的软件很多,配置多了就不好管理了,Zookeeper可以修改服务器系统中所有软件的配置,久而久之,很多软件就删除了自己写配置文件的功能,改从Zookeeper中获取。ZooKeeper就像一个动物园的饲养员,为什么说是动物园饲养员呢,你看MySql的标志是不是海豚,Tomcat是只猫,等等,你去看,这些软件都是动物标志,像个动物园一样。

启动Kafka

启动kafka要先启动zookeeper,mac电脑终端进入Kafka的bin文件目录下:

# 进入Kafka文件夹
cd Desktop/JAVATOOL/kafka/kafka_2.13-2.4.1/bin
# 启动Zookeeper服务
./zookeeper-server-start.sh ../config/zookeeper.properties
# 启动Kafka服务
./kafka-server-start.sh ../config/server.properties

zookeeper启动后终端输出: 

Java开发 - 消息队列之Kafka初体验_Customer_06

kafka启动后终端输出:

Java开发 - 消息队列之Kafka初体验_Customer_07

主要是看到这个started字样。

关闭Kafka命令: 

# 关闭Kafka服务
./kafka-server-stop.sh
# 启动Zookeeper服务
./zookeeper-server-stop.sh

需要先关闭Kafka,再关闭zookeeper,否则Kafka会一直报断开连接的错。 

文件路径写你自己的,不要写博主的,两个服务启动用两个窗口来运行命令,运行期间窗口不要关闭。

Windows电脑需要进入到kafka/bin/windows目录下,接着输入以下命令:

启动zookeeper:

zookeeper-server-start.bat ..\..\config\zookeeper.properties

启动kafka: 

kafka-server-start.bat ..\..\config\server.properties

window关闭服务直接x掉窗口就可以。 

Kafka案例

我们在此前微服务项目的cart子项目中演示Kafka的消息收发,下面跟着博主一步步来做吧。

添加依赖

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

这里特别提醒下,没有看前面微服务项目的童鞋你需要添加的依赖会少一些,建议先去看看微服务内容,把微服务基础项目先搭建起来,后续的开发很多都会在此基础上进行。 传送门:​​Java开发 - 数风流人物,还看“微服务”​​

添加配置

spring:
kafka:
# 定义kafka的位置
bootstrap-servers: localhost:9092
# 话题的分组名称,是必须配置的
# 为了区分当前项目和其他项目使用的,防止不同项目相同话题的干扰
# 本质是在话题名称前添加项目名称为前缀来防止的
consumer:
group-id: cart

配置启动类

在启动类上添加两个注解:

// 启动kafka的功能
@EnableKafka
// 为了测试kafka,我们可以周期性的发送消息到消息队列
// 使用SpringBoot自带的调度工具即可
@EnableScheduling

Java开发 - 消息队列之Kafka初体验_微服务_08

  

创建生产者

在cart包下新建一个kafka包,包下建一个叫Producer的类:

package com.codingfire.cloud.cart.kafka;

import com.codingfire.cloud.commons.pojo.cart.entity.Cart;
import com.google.gson.Gson;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

// 我们需要周期性的向Kafka发送消息
// 需要将具备SpringBoot调度功能的类保存到Spring容器才行
@Component
public class Producer {
// 能够实现将消息发送到Kafka的对象
// 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
// KafkaTemplate<[话题名称的类型],[传递消息的类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

// 每隔10秒向Kafka发送信息
int i=1;
// fixedRate是周期运行,单位毫秒 10000ms就是10秒
@Scheduled(fixedRate = 10000)
// 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
public void sendMessage(){
// 实例化一个Cart类型对象,用于发送消息
Cart cart=new Cart();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setPrice(RandomUtils.nextInt(100)+200);
cart.setCount(RandomUtils.nextInt(5)+1);
cart.setUserId("UU100");
// 将cart对象转换为json格式字符串
Gson gson=new Gson();
// 执行转换
String json=gson.toJson(cart);
System.out.println("本次发送的消息为:"+json);
// 执行发送
// send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型
kafkaTemplate.send("myCart",json);

}
}

创建消费者

在kafka包下创建Consumer类:

package com.codingfire.cloud.cart.kafka;

import com.codingfire.cloud.commons.pojo.cart.entity.Cart;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

// 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0
@Component
public class Consumer {

// SpringKafka框架接收Kafka中的消息使用监听机制
// SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
// 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
@KafkaListener(topics = "myCart")
// 上面注解和下面方法关联,方法的参数就是接收到的消息
public void received(ConsumerRecord<String,String> record){
// 方法参数类型必须是ConsumerRecord
// ConsumerRecord<[话题名称类型],[消息类型]>
// 获取消息内容
String json=record.value();
// 要想在java中使用,需要转换为java对象
Gson gson=new Gson();
// 将json转换为java对象,需要提供转换目标类型的反射
Cart cart=gson.fromJson(json,Cart.class);
System.out.println("接收到对象为:"+cart);
}


}

测试

下面,我们启动项目,由于我们的项目配置了nacos和seata,所以这两个服务要保证是开启的。如果你是新起的项目可以不用。

项目运行后,根据我们的代码,我们应该是每10s能到一条消息输入和输出的,我们在控制台看一下:

Java开发 - 消息队列之Kafka初体验_kafka_09

说明我们的测试是成功的,我们已经完成了一个完整的消息发送和接收的过程。 

但是,这也只是Kafka一个最简单的使用案例,关于Kafka肯定不止是做这么一点点事情,关于Kafka更多的功能,博主也还在学习中,推荐一个网址给大家学习:​​kafka中文教程​

结语

最后了,说点什么吧。只能说,Kafka博大精深,微服务深不可测,真不是三言两语,三篇两篇就能说得清楚的,这篇博客就如标题一样,只能算是初体验了,相较于es和redis来说,这篇博客只能算是皮毛,要学的东西还有很多很多,最重要的是实战。

最后要说的是专注,学习不要好高骛远,专注于一个点去学习,光是微服务中任何一个辅助软件都足够研究很久了,先学会用,其次再深入学习,关于更多Kafka的实战应用,后期博主吃透后会给大家详细剖析,一起努力吧。

标签:初体验,Java,启动,cart,kafka,消息,import,Kafka
From: https://blog.51cto.com/u_15993027/6132592

相关文章

  • JavaScript解析JSON
    一个对象以“{”开始,“}”结束。每个“key”后跟一“:”,“‘key/value’对”之间运用“,”分隔。遍历JSON对象中的数据,可通过for-in循环实现。数据{"主题":{......
  • JavaScript加密/解密与OpenAI的对接:生成加密对话的ChatGPT 4.0应用
    首先,我们来看一个简单的JavaScript加密算法的示例,该算法将输入的字符串每个字符的ASCII值加上1,并返回一个新的字符串。以下是加密函数的代码:javascriptCopycodefunctionen......
  • 一款开源的 Kafka 管理平台
    项目地址:https://github.com/provectus/kafka-ui 我是通过docker-compose安装的 操作过程可以会遇到地址解析问题,可参考文章:https://www.cnblogs.com/ReturnOfTheKi......
  • java-线程互斥同步
    写在前边,/***线程互斥同步:通过两种锁机制来控制多个线程对共享资源的互斥访问,synchronized、ReentrantLock*/一、JVM实现的synchronized。使用synchronized不用担心没......
  • java-线程的使用方式
    前言,三种使用线程的方法:实现Runnable接口、实现Callable接口、继承Thread类。实现接口会更好,因为继承了Thread类就无法继承其他类,但可以实现多个接口/***线程状态:new、r......
  • Java 中如何优化大量的 if...else...
    策略模式(StrategyPattern)将每个条件分支的实现作为一个独立的策略类,然后使用一个上下文对象来选择要执行的策略。这种方法可以将大量的ifelse语句转换为对象之间的交互......
  • Java访问权限修饰符(public , private , protected)
    访问权限修饰符包括:public、protected、private和默认修饰符(friendly/包访问权限)。可以修饰在类、字段、方法前面。public:公开权限,所有类都可以访问。protected:继承访......
  • java 项目使用 本地的gradle wrapper 或者 mvn gradle
    使用ganradle的好处是当前项目对版本idea自动下载mvnwrapper目录结构.mvnwrappermaven-wrapper.jarmaven-wrapper.propertiesMaven......
  • java-基础线程机制
     前言,基础线程机制:Executor管理多个异步任务的执行、Daemon守护线程、sleep()、yield() 一、Executor:1.newCachedThreadPool(),一个任务创建一个线程ExecutorServic......
  • Java基础语法-数组
    第一部分:数组1.数组1.1数组介绍数组就是存储数据长度固定的容器,存储多个数据的数据类型要一致。1.2数组的定义格式1.2.1第一种格式数据类型[]数组名示例:int[]arr......