首页 > 数据库 >MySQL MHA信息的收集【Filebeat+logstash+MySQL】

MySQL MHA信息的收集【Filebeat+logstash+MySQL】

时间:2023-04-15 23:45:55浏览次数:59  
标签:status Filebeat NULL varchar log mha host MySQL MHA

一.项目背景

随着集团MHA集群的日渐增长,MHA管理平台话越来越迫切。而MHA平台的建设第一步就是将这些成百上千套的MHA集群信息收集起来,便于查询和管理。

MHA主要信息如下:

(1)基础配置信息;

(2)运行状态信息;

(3)启动及FailOver的log信息。

集团目前数据库的管理平台是在Archery的基础上打造,所以,需要将此功能嵌入到既有平台上。通过Archery系统进行查询展示。

二.架构

 简单来说,通过 Filebeat + Logstash + MySQL 架构 来收集保存各个集群的配置信息、启动及FailOver的log信息 和运行状态信息。

运行状态信息是通过一个小程序获取的,这个小程序每分钟执行一次,会把执行结果输出到文件中。当然这个文件是被failbeat监控的。

三.实现

3.1 获取MHA状态的脚本

文件为mha_checkstatus.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import io
import re
import ConfigParser

Path='/etc/mha'
#fout=open('输出文件名','w')
for Name in os.listdir(Path) :
  Pathname= os.path.join(Path,Name)
 ## print(Pathname)
 ## print(Name)
  config =ConfigParser.ConfigParser()
  try:
    config.read(Pathname)
    server_item = config.sections()
    server1_host = ''  ##MHA cnf 配置文件中的节点1
    server2_host = ''  ##MHA cnf 配置文件中的节点2
    server3_host = ''  ##MHA cnf 配置文件中的节点3
    mha_cnf_remark = ''
    if 'server1' in server_item:
      server1_host = config.get('server1','hostname')
    else:
       mha_cnf_remark = mha_cnf_remark + 'Server1未配置;'
    if 'server2' in server_item:
      server2_host = config.get('server2','hostname')
    else:
      mha_cnf_remark = mha_cnf_remark + 'Server2未配置;'
    if 'server3' in server_item:
      server3_host = config.get('server3','hostname')

      ##print(mha_cnf_remark)
  except Exception as e:
    print(e)

  mha_status_result =''
  ###20190330
  Name = Name.replace(".cnf", "")

  ###集群一主一从
  if server1_host <> '' and server2_host <> '' and server3_host == '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)

  ####集群一主两从
  if server1_host <> '' and server2_host <> '' and server3_host <> '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)

 概况说明,就是到存放MHA配置的文件夹,根据每个集群的配置文档,去逐一执行下masterha_check_status,把结果格式化,输出到指定的文件中。这个就是每个集群的状态数据。通过filebeat实时汇报上去。

触发的方式可以是crontab,每分钟执行一次。再本案中是输出到 /???/checkmhastatus/masterha_check_status.log 中。

形式类似如下:

*/1 * * * * python /???/????/mha_checkstatus.py >>   /???/????/masterha_check_status.log

3.2 表的设计及脚本

3.2.1 运行状态表 dbmha_status

CREATE TABLE `dbmha_status` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `confpath` varchar(500) NOT NULL,
  `mhstatus` varchar(100) NOT NULL,
  `serverip` varchar(100) NOT NULL,
  `info` varchar(2000) NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.2 mha log 信息表 dbmha_log

CREATE TABLE `dbmha_log` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `filename` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `message` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.3 MHA 基础配置表 dbmha_conf_info

CREATE TABLE `dbmha_conf_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL DEFAULT '',
  `confpath` varchar(500) NOT NULL DEFAULT '',
  `manager_log` varchar(500) NOT NULL DEFAULT '',
  `manager_workdir` varchar(500) NOT NULL DEFAULT '',
  `master_binlog_dir` varchar(500) NOT NULL DEFAULT '',
  `failover_script` varchar(500) NOT NULL DEFAULT '',
  `online_change_script` varchar(500) NOT NULL DEFAULT '',
  `password` varchar(128) NOT NULL DEFAULT '',
  `ping_interval` varchar(100) NOT NULL DEFAULT '',
  `remote_workdir` varchar(100) NOT NULL DEFAULT '',
  `repl_password` varchar(128) NOT NULL DEFAULT '',
  `repl_user` varchar(20) NOT NULL DEFAULT '',
  `ssh_user` varchar(20) NOT NULL DEFAULT '',
  `user` varchar(20) NOT NULL DEFAULT '',
  `serverip1` varchar(100) NOT NULL DEFAULT '',
  `port1` varchar(10) NOT NULL DEFAULT '',
  `candidate_master1` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay1` varchar(20) NOT NULL DEFAULT '',
  `serverip2` varchar(100) NOT NULL DEFAULT '',
  `port2` varchar(10) NOT NULL DEFAULT '',
  `candidate_master2` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay2` varchar(20) NOT NULL DEFAULT '',
  `serverip3` varchar(100) NOT NULL DEFAULT '',
  `port3` varchar(10) NOT NULL DEFAULT '',
  `candidate_master3` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay3` varchar(20) NOT NULL DEFAULT '',
  `info` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.3 filbeat 中关于读取文件的配置

..............
- type: log
  paths:
    - /???/????/masterha_check_status.log
  fields:
    log_type: mha-status
    db_host: 111.111.XXX.1XX    ###这个IP为mha Mnaager所在serverip

- type: log
  paths:
    - /???/mhaconf/*.cnf
  fields:
    log_type: mha-cnf
    db_host: 111.111.XXX.XXX
  multiline.type: pattern
  multiline.pattern: '^\[server [[:space:]] default'
  multiline.negate: true
  multiline.match: after


- type: log
  paths:
    - /???/????/mha/*/*.log
  fields:
    log_type: mysql-mha
    db_host: 111.111.XXX.XXX
................

3.4 Logstash 的配置文件

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

filter {

    if [fields][log_type] == "mysql-mha" {
        grok {
            match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"]
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
        copy => { "[log][file][path]" => "logpath"
                 "[fields][db_host]" => "manager_ip" }
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }


    if [fields][log_type] == "mha-cnf" {
        mutate {
        split => ["message","server"]
        add_field => {"message1" => "%{[message][1]}"}
        add_field => {"messages1" => "%{[message][2]}"}
        add_field => {"messages2" => "%{[message][3]}"}
        add_field => {"messages3" => "%{[message][4]}"}
        add_field => {"dft_password" => "*********"}
        add_field => {"dft_repl_password" => "*********"}
        }
        kv {
             source => "message1" 
             field_split => "\n"
             include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ]
             prefix => "dft_"
             remove_char_value => "<>\[\]," 
        }
        kv {
             source => "messages1"
             field_split => "\n"
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s1_"
        }
        kv {
             source => "messages2"
             field_split => "\n"
             default_keys => [ "s2_candidate_master", "",
                         "s2_check_repl_delay", "",
                         "s2_hostname","",
                          "s2_port",""
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s2_"
        }
        kv {
             source => "messages3"
             field_split => "\n"
             default_keys => [ "s3_candidate_master", "",
                         "s3_check_repl_delay", "",
                         "s3_hostname","",
                          "s3_port","" 
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s3_"
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
             copy => { "[log][file][path]" => "conf_path" }
             gsub => [
                      "message", "需要加密的***密***码", "*********",
                      "message", "需要加密的其他字符", "*********"
                      ]
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

    if [fields][log_type] == "mha-status" {
       mutate {
        split => ["message",":::"]
        add_field => {"cluster_name" => "%{[message][0]}"}
        add_field => {"conf_path" => "%{[message][1]}"}
        add_field => {"masterha_check_status" => "%{[message][2]}"}
        add_field => {"server" => "%{[message][3]}"}
        add_field => {"info" => "%{[message][4]}"}
         }

        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

}


output {
    if [fields][log_type] == "mysql-mha" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"]
       }
    }

    if [fields][log_type] == "mha-status" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"]
       }
   }
    if [fields][log_type] == "mha-cnf" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"]
       }
   }

}

 这个配置还是相对复杂难懂的。这个文件配置了对三种文件的读取,我们就看读取mha配置文件的部分【[fields][log_type] == "mha-cnf"】,我们挑其中的几个点说下,更多的内容可参照logstash官网--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

首先,我们是 “server” 关键字,把文件中的配置信息,分割成不同的部分。

接着,因为配置文件的格式是 key=value的样式,所以需要借助 kv{},其中的参数说下:field_split---定义字段间的分隔符;include_keys--定义只读去规定的特定key;prefix---格式化字段名字,加个前缀名字,主要是用来区分server 1 部分和 server2、、、之间的分别。

 通过【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}】,获取product字段,我们是通过mha的配置文件的名字来定义集群的名字,即规范了mha配置文件的名字的命名来自于集群的名字,反推得知了配置文件的名字,就知道了集群的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}】这个地方的filename包含了文件的后缀。

四.平台前端

我们是把此项目嵌入到既有的Archery平台中,增加了3个查询界面,界面的实现,在此就不具体展开了。

需要注意的是,界面需要支持模糊查询,例如支持MHA Manager Server IP查询(方便查询各个Manager节点上有多少集群);支持集群名字的模糊查询;支持节点serverIP的模糊查询。

五.补充说明

Q.1 为什么用MySQL存储信息,ELK是更成熟的架构啊?

是的,用elasticsearch来存储这种文本信息更常见。我们用MySQL替代elasticsearch基于以下考虑:(1)我们既有的管理平台使用的是MySQL,把他们保存到MySQL 便于集成;(2)这些数据,不仅仅是Log,还有些是基础数据,放到MySQL便于相互管理、聚合展示(3)这是数据量并不大,例如mha.log,只有在启动或者failover时才有变化,conf信息也是很少的,所以,从数据量也一点考虑,也不需要保存到MySQL。

Q.2 Logstash 可以把数据写入到MySql中吗?

是可以的。主要是logstash-output-jdbc、logstash-codec-plain插件的安装。

如果是离线的环境下安装,可以参考 《logstash 离线安装logstash-output-jdbc》

https://blog.csdn.net/sxw1065430201/article/details/123663108

Q.3 MHA log 文件夹中 原有一个 .health ,里面是MHA每分钟的健康性报告,那为什么还要自己写Python程序获取呢?

因为.healthy 的内容不是换行符结尾,而filebeat是以换行符来判断的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有详细说明)。

简单来说,filebeat读取不了。

Q.4 MHA 健康性检查的原理

具体的原理可以参考此文章的分析说明--《mha检测mysql状况方式》

https://blog.csdn.net/weixin_35411438/article/details/113455263

Q.5 历史数据的删除

mha log 信息表 dbmha_log

MHA 基础配置表 dbmha_conf_info

以上两张表基本上很少变化,量不大,其数据无需定期删除。

运行状态表 dbmha_status,此表每个集群每分钟(具体crontab定义)都会有新数据插入,数据量增长较大,应设置定时任务,定期删除历史数据,例如删除7天前的数据。

 

标签:status,Filebeat,NULL,varchar,log,mha,host,MySQL,MHA
From: https://www.cnblogs.com/xuliuzai/p/17320183.html

相关文章

  • 面试官的灵魂一击: MySQL 事务日志是什么?
    SQL(StructuredQueryLanguage)和NoSQL(NotOnlySQL)是两种不同类型的数据库系统。SQL数据库系统采用了关系模型来存储数据,通过使用SQL语言进行数据管理和查询。SQL数据库系统适用于大规模、复杂的数据和事务处理,并且具有数据一致性和完整性的特点。常见的SQL数据库系统包括MySQL、Or......
  • MySQL
    1.回表的原因mysql回表:是指在查询过程中,mysql使用了索引来查找数据的行位置,但因为索引不包含所有需要查询的列,mysql还需要从主键索引或者聚簇索引中进一步读取数据来获取完整记录的过程。回表的原因:因为非聚集索引(即普通的索引)只包含索引列及主键的值,而不是整个数据行。假设一个......
  • MySql安装与连接
    MySql简介MySQL是一种关系型数据库管理系统,它是开源软件,在广泛使用的LAMP(Linux、Apache、MySQL、PHP/Python/Perl)技术栈中扮演重要角色。MySQL支持多种操作系统,包括Linux、Windows、macOS等,可以处理大量数据并提供高效的性能。MySQL使用SQL(StructuredQueryLanguage)语言......
  • mysql 8.0.25
    ######################     mysql-8.0.25/boost/boost_1_73_0/boost/utility.hppmysql-8.0.25/boost/boost_1_73_0/boost/variant.hppmysql-8.0.25/boost/boost_1_73_0/boost/version.hppmysql-8.0.25/boost/boost_1_73_0/boost/visit_each.hppmysql-8.0.25/boos......
  • Mysql索引
    索引优化速度首先创建了一个数据库,并创建了一个表,里面有800w条记录对其中的一条记录进行查询,使用了4.5s此时存储这个表的文件已经有500M的大小了添加索引后发现,刚刚存储表的文件变大了,变成了655m索引需要占用磁盘空间索引创建使用索引查询我们创建索引只对创建......
  • 存储引擎-mysql体系结构
    mysql体系结构:连接层:最上层是一些客户端和链接服务,主要完成一些连接处理,授权认证,以及相关的安全方案,服务器也会为安全接入的每一个客户端验证它所具有的操作权限服务层:第二层架构主要用于完成大多数的核心服务功能,如sql接口,并完成缓存的查询,sql的分析和优化,部分内置函数的执行......
  • mysql left join 查询时主表为null统计count为0的解决方法(join后面加group by)
     如果没有加groupby则会出UserCount为0外其它都是nullselecta.*,count(b.ID)asUserCountfromerp_roleasaleftjoinerp_userasbona.ID=b.RoleIdwhere1=1anda.TenantID=2anda.RoleName='string' 加上groupby一切正常selecta.*,count(b.ID)as......
  • mysql如何查询所有表和字段信息
    1MySQL中information_schema是什么information_schema数据库是MySQL自带的,它提供了访问数据库元数据的方式。元数据:元数据是关于数据的数据,如数据库名或表名,列的数据类型,或访问权限等。有些时候用于表述该信息的其他术语包括“数据字典”和“系统目录”。在MySQL中,把informat......
  • Linux中如何通过yum或者apt下载安装MySQL
    一、 yummysql5.7以下mysql5.7以上Centos8可以,但是需要重新配置文件可以,但是需要重新配置文件可以,但是需要重新配置文件Centos7可以直接yum,但是是安装mariadb-server。如果是mysql-server需要配置文件直接yum后启动就好yum后需要修改密码才可以进入Ubuntu......
  • Mysql 中,为什么 WHERE 使用别名会报错,而 ORDER BY 不会报错?
       Mysql中,为什么WHERE使用别名会报错,而ORDERBY不会报错? 我们先对salary*12命名一个别名annual_salSELECTemployee_id,salary,salary*12annual_salFROMemployeesORDERBYannual_sal; 这段代码以annual_sal升序输出且正常执行没有报错。说明orderby......