首页 > 系统相关 >云服务器(Linux)安装部署Kafka

云服务器(Linux)安装部署Kafka

时间:2022-11-14 23:25:15浏览次数:42  
标签:iptables -- ip 防火墙 kafka topic Linux 服务器 Kafka

云服务器(Linux)安装部署Kafka

前期准备

kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。

下载安装包

官网地址:

较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。

注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。

上载安装包到云服务器

使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。

打开命令行,进入到放有压缩包的目录,执行

tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用

vi server.properties

编辑配置文件。

删除listeners和advertised前方的#号,改成如下配置:

listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip)
# 如果要提供外网访问则必须配置此项
advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip)
# zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port
zookeeper.connect=云服务器公网ip:2181

开放云服务器端口

在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181

开放linux防火墙端口

先查看使用的防火墙类型iptables/firewalld

iptables操作命令

1.打开/关闭/重启防火墙

开启防火墙(重启后永久生效):chkconfig iptables on

关闭防火墙(重启后永久生效):chkconfig iptables off

开启防火墙(即时生效,重启后失效):service iptables start

关闭防火墙(即时生效,重启后失效):service iptables stop

重启防火墙:service iptables restartd

2.查看打开的端口

/etc/init.d/iptables status
3.开启端口

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存并重启防火墙
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。

操作指令如下:

1.启动防火墙

systemctl start firewalld
2.禁用防火墙

systemctl stop firewalld
3.设置开机启动

systemctl enable firewalld
4.停止并禁用开机启动

sytemctl disable firewalld
5.重启防火墙

firewall-cmd --reload

6.查看状态

systemctl status firewalld或者 firewall-cmd --state
7.在指定区域打开端口(记得重启防火墙)

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。

启动kafka服务

cd命令进入kafka_2.12-3.2.0目录下,执行

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行

bin/kafka-server-start.sh config/server.properties

启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。

测试单机连通性

查询kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper
创建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
开启生产者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
开启消费者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test


Springboot连接kafak

在pom.xml文件中引入kafka依赖

		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

在application.yml配置文件中配置kafka

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 云服务器外网ip地址:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生产者

@RestController
public class KafkaController {
    private final static String TOPIC_NAME = "test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("消息 %s 发送成功!", msg);
    }
}

消费者

@Component
public class DemoConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     */
    @KafkaListener(topics = "test-topic", groupId = "testGroup1")
    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手动提交offset,一般是提交一个banch,幂等性防止重复消息
        // === 每条消费完确认性能不好!
        ack.acknowledge();
    }

    //配置多个消费组
    @KafkaListener(topics = "test--topic", groupId = "testGroup2")
    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手动提交offset
        ack.acknowledge();
    }
}

使用swagger测试发送消息

控制台打印消息

标签:iptables,--,ip,防火墙,kafka,topic,Linux,服务器,Kafka
From: https://www.cnblogs.com/ndchao/p/chaos1.html

相关文章

  • 【Linux环境部署】Python版本更新及替换
    默认python版本替换1.测试当前默认python版本user@user-jwq#pythonPython2.7.18(default,Jul12022,12:27:04)[GCC9.4.0]onlinux2Type"help","copyright......
  • Linux内存管理 (2)页表的映射过程【转】
    转自:https://www.cnblogs.com/arnoldlu/p/8087022.html 关键词:swapper_pd_dir、ARMPGD/PTE、LinuxPGD/PTE、pgd_offset_k。 Linux下的页表映射分为两种,一是Linux自......
  • 嵌入式-Linux常用初级指令和介绍
    ls:列出当前文件夹下有哪些文件 ls-a:显示所有文件,包括隐藏的文件和文件夹pwd:显示在当前哪个文件夹下面  mkdir+新建文件夹名称:新建文件夹  进入文件夹:cd+......
  • 嵌入式-Linux基础操作
    Crtl+Alt+T:调出命令窗口xrandr:列出分辨率列表 设置窗口的分辨率大小为1280x960:xrandr-s1280x960   通过命令窗口来执行一段C语言程序:VI工具的使用:(1)vifir......
  • linux环境与IDE
    1 Linux上练习和开发 1.1 安装centos环境1.1.1 windows机器上安装虚拟机vmwarehttps://www.likecs.com/show-145995.html 1.1.2 下载centos7操作系统光盘http......
  • linux服务器每次登陆都会把IP写入hosts.deny文件,导致登陆之后下次登陆不了
    解决办法:第一种:将自己的IP加入hsots.allow文件中  sshd:IP第二种解决办法:只要一登陆,自己的IP就被加到/etc/hosts.deny里,自己都不能登陆了原因:你之前这IP肯定登......
  • linux清空日志文件内容的几种常用方法
    1.使用cp/cat/dd命令及dev/null设备来清空文件内容(1)通过cp复制/dev/null内容到指定文件cp/dev/nullfilename.out(2)通过cat命令显示/dev/null内容,重定向到指定文件cat/......
  • Linux shell脚本全面学习
    1.Linux脚本编写基础1.1语法基本介绍1.1.1开头程序必须以下面的行开始(必须方在文件的第一行):#!/bin/sh符号#!用来告诉系统它后面的参数是用来执行该文件的程序。在......
  • Linux 下使用 NMON 分析系统性能
    nmon是一个分析OS性能屡试不爽的工具,Nmon的使用需要nmon工具和nmonanalyser分析程序两者配合使用。nmon工具生成性能数据文件,然后monanalyser以nmon生成的数据文件作为输入,......
  • 高并发服务器设计之多路复用模型
     多路复用的方式是真正实用的服务器程序,非多路复用的网络程序只能作为学习或着陪测的角色。本文说下个人接触过的多路复用函数:select/poll/epoll/port。kqueue的*nix系统没......