0.拓扑图
官网: http://kafka.apache.org/documentation.html#introduction
kafka原理
1.logstash的配置
[root@VM_0_4_centos config]# cat wxqyh.yml|egrep -v '^$|^#'
input {
file {
type => "4personal20001"
path => "/mnt/data/logs/personal_20001/log4j.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
file {
type => "4personal20002"
path => "/mnt/data/logs/personal_20002/log4j.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "^%{TIMESTAMP_ISO8601}\[%{WORD:level} %{GREEDYDATA:ajpcon}\| %{GREEDYDATA:data}"
}
match => {
"message" => "^%{TIMESTAMP_ISO8601}\[ %{WORD:level} %{GREEDYDATA:ajpcon}\| %{GREEDYDATA:data}"
}
remove_field => "message"
}
}
output {
if [type] == "4personal20001" {
kafka {
max_request_size => 10485761
bootstrap_servers => "10.0.0.134:9092"
topic_id => "topic4personal1"
compression_type => "snappy"
}
}
if [type] == "4personal20002" {
kafka {
max_request_size => 10485761
bootstrap_servers => "10.0.0.134:9092"
topic_id => "topic4personal2"
compression_type => "snappy"
}
}
}
2.kafka的配置
[root@VM_0_134_centos config]# cat server.properties|egrep -v '^$|^#'
broker.id=0
listeners=PLAINTEXT://10.0.0.134:9092
advertised.listeners=PLAINTEXT://10.0.0.134:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
message.max.bytes=20000000
replica.fetch.max.bytes=20485760
log.dirs=/mnt/data/monitor01/kafka_2.11-1.1.0/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
3.logstash2的配置
[root@VM_0_134_centos config]# cat haode.yml|egrep -v '^$|^#'
input {
kafka {
type => "topic12wxqyh8"
codec => "plain"
topics => ["topic12wxqyh8"]
client_id => "es1"
group_id => "es1"
bootstrap_servers => "10.0.0.134:9092"
}
kafka {
type => "topic12wxqyh9"
codec => "plain"
topics => ["topic12wxqyh9"]
client_id => "es2"
group_id => "es2"
bootstrap_servers => "10.0.0.134:9092"
}
kafka {
type => "topic24wxqyh6"
codec => "plain"
topics => ["topic24wxqyh6"]
client_id => "es3"
group_id => "es3"
bootstrap_servers => "10.0.0.134:9092"
}
kafka {
type => "topic24wxqyh7"
codec => "plain"
topics => ["topic24wxqyh7"]
client_id => "es4"
group_id => "es4"
bootstrap_servers => "10.0.0.134:9092"
}
kafka {
type => "topic4personal1"
codec => "plain"
topics => ["topic4personal1"]
client_id => "es5"
group_id => "es5"
bootstrap_servers => "10.0.0.134:9092"
}
kafka {
type => "topic4personal2"
codec => "plain"
topics => ["topic4personal2"]
client_id => "es6"
group_id => "es6"
bootstrap_servers => "10.0.0.134:9092"
}
}
output {
if [type] == "topic12wxqyh8" {
elasticsearch {
index => "topic12wxqyh8"
hosts => ["10.0.0.7:9200"]
}
}
if [type] == "topic12wxqyh9" {
elasticsearch {
index => "topic12wxqyh9"
hosts => ["10.0.0.7:9200"]
}
}
if [type] == "topic24wxqyh6" {
elasticsearch {
index => "topic24wxqyh6"
hosts => ["10.0.0.7:9200"]
}
}
if [type] == "topic24wxqyh7" {
elasticsearch {
index => "topic24wxqyh7"
hosts => ["10.0.0.7:9200"]
}
}
if [type] == "topic4personal1" {
elasticsearch {
index => "topic4personal1"
hosts => ["10.0.0.7:9200"]
}
}
if [type] == "topic4personal2" {
elasticsearch {
index => "topic4personal2"
hosts => ["10.0.0.7:9200"]
}
}
}
4.kafka的问题:
4.1zookeeper--选主,提供访问入口
4.2 Kafka的原理
生产者将数据发送到Broker代理,Broker代理有多个话题topic,消费者从Broker获取数据。
参考: https://zhuanlan.zhihu.com/p/97030680
5.partion--相当于es的shard
用一个例子来演示会更加清晰