首页 > 其他分享 >Metamorphosis分布式消息中间件

Metamorphosis分布式消息中间件

时间:2023-09-15 14:03:16浏览次数:47  
标签:存储 顺序 MetaQ meta Metamorphosis 消息 消息中间件 服务器 分布式


一 简介

1.1定义

    Metamorphosis是淘宝开源的一个Java消息中间件。关于消息中间件,你应该听说过JMS(1)规范,以及一些开源实现,如ActiveMQ和HornetQ等。Metamorphosis也是其中之一。

    Metamorphosis是一款完全的队列模型消息中间件,服务器使用Java语言编写,可在多种软硬件平台上部署。客户端支持Java、C++编程语言。单台服务器可支持1万以上个消息队列,通过扩容服务器,队列数几乎可任意横向扩展。每个队列都是持久化、长度无限(取决于磁盘空间大小)、并且可从队列任意位置开始消费。

    Metamorphosis (MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在淘宝和支付宝有着广泛的应用,现已开源。

1.2起源

       Metamorphosis的起源是linkedin的开源MQ,这是一个设计很独特的MQ系统,它采用pull机制,而不是一般MQ的push模型,它大量利用了zookeeper做服务发现和offset存储,总体上说Metamorphosis的设计跟它是完全一致的。但是为什么还需要meta呢?

       简单概括下重新写出meta的原因:

·       Kafka是scala写,作者对scala不熟悉,并且kafka整个社区的发展太缓慢了。

·       有一些功能是kakfa没有实现,但是我们却需要:事务、多种offset存储、高可用方案(HA)等。

       Meta相对于kafka特有的一些功能:

·       文本协议设计,非常透明,支持类似memcached stats的协议来监控broker

·       纯Java实现,从通讯到存储,从client到server都是重新实现。

·       提供事务支持,包括本地事务和XA(2)分布式事务。

·       支持HA(High Availability)复制,包括异步复制和同步复制,保证消息的可靠性。

·      

·      

·       多种offset存储支持,数据库、磁盘、zookeeper,可自定义实现。

·       支持group commit,提升数据可靠性和吞吐量。

·      

·       一系列配套项目:python客户端、twitter storm的spout、tail4j等。

 

1.3内部机制与总体架构

       从实现角度看,MetaQ充分利用zookeeper这个优秀的服务中心,作为服务注册和查找中心、客户端负载均衡和数据偏移量的分布式存储使用。Zookeeper在MetaQ整个集群中扮演非常关键的角色。

       MetaQ的存储实现与kafka是一致的,充分利用传统磁盘顺序读写非常高效的特点,并且利用group commit、sendfile系统调用等技术来充分提高IO效率。

       MetaQ的事务实现跟ActiveMQ是类似的,采用redo日志的方式,并遵循JTA(4)协议规范来实现分布式事务。

       MetaQ的网络协议跟memcached文本协议类似,但MetaQ的协议引入了opaque的映射字段,提高请求的并行度。正因为采用文本协议,为MetaQ编写其他语言客户端是相对容易,并且管理MetaQ服务器也变的很容易,比如MetaQ提供了stats协议,通过telent就可以获取服务器的运行状况。

      

Metamorphosis分布式消息中间件_服务器

1.4应用

    因此meta相比于kafka的提升是巨大的。meta在淘宝和支付宝都得到了广泛应用,现在每天支付宝每天经由meta路由的消息达到120亿,淘宝也有每天也有上亿的消息量。总计METAQ在阿里巴巴各个子公司被广泛应用,每天会转发250亿+条消息。下面是使用METAQ的各个公司或项目名:淘宝网,支付宝,阿里巴巴金融,阿里云,Alibaba.com,Alipay.com.

 

 

       据说,腾讯和京东也在尝试使用。

       Meta适合的应用:

·       日志传输,高吞吐量的日志传输本来就是kafka的强项

·      

·       数据的顺序同步功能,如mysql binlog复制

·       分布式环境下(broker,producer,consumer都为集群)的消息路由,对顺序和可靠性有极高要求的场景。

·       作为一般MQ来使用的其他功能

注释:

1 JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务

2  XA 就是 X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束以及提交、回滚等。 XA 接口函数由数据库厂商提供。 

3 Diamond是淘宝网Java中间件团队的核心产品之一,服务于集团线上很多核心应用。目前已经开源,Diamond主要针对的是持久数据,可以和ZookKeeper类比。

4 JTA,即Java Transaction API,译为Java事务API。

二 可靠性、顺序和重复

2.1可靠性

       Metamorphosis的可靠性保证贯穿客户端和服务器。

2.1.1生产者的可靠性保证

消息生产者发送消息后返回SendResult,如果isSuccess返回为true,则表示消息已经确认发送到服务器并被服务器接收存储。整个发送过程是一个同步的过程。保证消息送达服务器并返回结果。

2.1.2服务器的可靠性保证

消息生产者发送的消息,meta服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。

写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os,os对写有缓冲。Meta有两个特性来保证数据落到磁盘上

1:每1000条(可配置),即强制调用一次force来写入磁盘设备。

2:每隔10秒(可配置),强制调用一次force来写入磁盘设备。

因此,Meta通过配置可保证在异常情况下(如磁盘掉电)10秒内最多丢失1000条消息。当然通过参数调整你甚至可以在掉电情况下不丢失任何消息。

服务器通常组织为一个集群,一条从生产者过来的消息可能按照路由规则存储到集群中的某台机器。Meta已经实现高可用的HA方案,类似mysql的同步和异步复制,将一台meta服务器的数据完整复制到另一台slave服务器,并且slave服务器还提供消费功能(同步复制不提供消费)。

2.1.3消费者的可靠性保证

消息的消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消息(默认最大5次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线程继续往后走,消费后续的消息。因此,只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。

消费者的另一个可靠性的关键点是offset的存储,也就是拉取数据的偏移量。我们目前提供了以下几种存储方案:

1 zookeeper,默认存储在zoopkeeper上,zookeeper通过集群来保证数据的安全性。

2 mysql,可以连接到您使用的mysql数据库,只要建立一张特定的表来存储。完全由数据库来保证数据的可靠性。

3 file,文件存储,将offset信息存储在消费者的本地文件中。

Offset会定期保存,并且在每次重新负载均衡前都会强制保存一次。

2.2顺序

很多人关心的消息顺序,希望消费者消费消息的顺序跟消息的发送顺序是一致的。比如,我发送消息的顺序是A、B、C,那么消费者消费的顺序也应该是A、B、C。乱序对某些应用可能是无法接受的。

Metamorphosis对消息顺序性的保证是有限制的,默认情况下,消息的顺序以谁先达到服务器并写入磁盘,则谁就在先的原则处理。并且,发往同一个分区的消息保证按照写入磁盘的顺序让消费者消费,这是因为消费者针对每个分区都是按照从前到后递增offset的顺序拉取消息。

Meta可以保证,在单线程内使用该producer发送的消息按照发送的顺序达到服务器并存储,并按照相同顺序被消费者消费,前提是这些消息发往同一台服务器的同一个分区。为了实现这一点,你还需要实现自己的PartitionSelector用于固定选择分区


public interface PartitionSelector {
    public Partition getPartition(String topic, Listpartitions, Message message) throws MetaClientException;
}



选择分区可以按照一定的业务逻辑来选择,如根据业务id来取模。或者如果是传输文件,可以固定选择第n个分区使用。当然,如果传输文件,通常建议只配置一个分区,那也就无需选择了。

2.3消息重复

消息的重复包含两个方面,生产者重复发送消息以及消费者重复消费消息。

针对生产者来说,有可能发生这种情况,生产者发送消息,等待服务器应答,这个时候发生网络故障,服务器实际已经将消息写入成功,但是由于网络故障没有返回应答。那么生产者会认为发送失败,则再次发送同一条消息,如果发送成功,则服务器实际存储两条相同的消息。这种由故障引起的重复,meta是无法避免的,因为meta不判断消息的data是否一致,因为它并不理解data的语义,而仅仅是作为载荷来传输。

针对消费者来说也有这个问题,消费者成功消费一条消息,但是此时断电,没有及时将前进后的offset存储起来,则下次启动的时候或者其他同个分组的消费者owner到这个分区的时候,会重复消费该条消息。这种情况meta也无法完全避免。

Meta对消息重复的保证只能说在正常情况下保证不重复,异常情况无法保证,这些限制是由远程调用的语义引起的,要做到完全不重复的代价很高,meta暂时不会考虑。

三 如何开始



下载服务器



下载页面选择最新版本的服务器(目前是1.4.4)并下载到本地,假设下载后的文件在folder目录,执行下列命令解压缩文件:

cd foldertar zxvf   taobao-metamorphosis-server-wrapper-1.4.4.tar.gz


解压缩文件,解压后目录结构大概为:



taobao
    metamorphosis-server-wrapper
         bin
            env.bat               
            env.sh                
            log4j.properties      
            metaServer.bat        
            metaServer.sh         
            tools_log4j.properties
         logs
         conf
            server.ini
            ......
         lib
            ......

启动脚本放在bin目录,主要的脚本是metaServer.sh,日志在logs目录,而配置文件主要是conf目录下server.ini,lib存放所有的依赖jar包。



配置Broker



默认server.ini提供了一个topic——test用于测试,你可以添加自己定义的topic:


[topic=mytopic]                                                                                                                                                                              
;
是否启用统计
                                                                                                                                                                            
stat=true                                                                                                                                                                                   
;
这个
topic
指定分区数目,如果没有设置,则使用系统设置
                                                                                                                                         
numPartitions=10                                                                                                                                                                                                                                                                                                                                                    
;
删除策略的执行时间,
cron
表达式
                                                                                                                                                               
deleteWhen=0 0 6,18 * * ?


大多数system参数都可以topic参数所覆盖。

启动和关闭服务器

确保你的机器上安装了JDK并正确设置JAVA_HOME和PATH变量,启动服务器:

bin/metaServer.sh start local


关闭服务器:


bin/metaServer.sh stop


在windows上,双击执行"bin/metaServer.bat"即可(windows不支持local模式启动,需配置zookeeper,见下文的zookeeper配置一节)。

修改JAVA_HOME,JMX等变量,请修改bin/env.sh(for linux)或者bin/env.bat(for windows)。

更多metaServer.sh支持命令请使用help:


 bin/metaServer.sh help
=>
  Usage: metaServer.sh {start|status|stop|restart|reload|stats|open-partitions|close-partitions|move-partitions|delete-partitions|query}
  ......


验证服务器正常运行

除了通过观察日志logs/metaServer.log外,你还可以通过stats命令来观察服务器运行状况。



 bin/metaServer.sh stats
=>
   STATS
   pid 7244
   broker_id 0
   port 8123
   uptime 2057
   version 1.4.2
   curr_connections 1
   threads 35
   cmd_put 0
   cmd_get 0
   cmd_offset 0
   tx_begin 0
   tx_xa_begin 0
   tx_commit 0
   tx_rollback 0
   get_miss 0
   put_failed 0
   total_messages 0
   topics 2
   config_checksum 718659887
   END

你也可以telnet到默认的8123端口执行stats命令查看。

集群模式配置



Local模式



上文提到的启动方式是以本地模式也就是单机模式启动,它将启动一个内置的zookeeper,并将broker注册到该zookeeper。这对于单机应用或者测试开发是最便捷方式的。

但是MetaQ是作为分布式软件设计的,更通常作为一个集群提供服务。MetaQ的集群管理是利用zookeeper实现的,因此首先需要配置zookeeper。

标签:存储,顺序,MetaQ,meta,Metamorphosis,消息,消息中间件,服务器,分布式
From: https://blog.51cto.com/u_2650279/7480482

相关文章

  • 分布式协议与算法 概要
    最近系统性的学习了分布式协议与算法,在此做个小小笔记。理论拜占庭将军问题拜占庭将军问题(ByzantineGeneralsProblem)是一个著名的分布式系统中的问题,用于探讨在存在故障节点或恶意行为的情况下如何进行可靠的信息传递和共识达成。问题描述如下:假设有一组拜占庭将军围绕一座......
  • 生产环境实战spark (7)分布式集群 5台设备 Hadoop集群安装
    生产环境实战spark(7)分布式集群5台设备Hadoop集群安装1,Hadoop下载。下载地址:http://hadoop.apache.org/releases.html下载版本:hadoop2.6.5版本  hadoop2.6.x版本比较稳定2,使用winscp工具上传到master节点。检查:[root@masterrhzf_spark_setupTools]#lshadoop-2......
  • java分布式特点_java分布式架构是什么?分布式架构的优缺点有哪些?
    作为目前互联网最流行的技术之一,分布式是当仁不让的,小伙伴们都了解什么是分布式架构吗?它的优缺点又有哪些呢?快听小编为你介绍介绍吧。一、什么是分布式架构?分布式概念要想理解什么是分布式,我们一定要知道以下这些性质:1)、首先,一个分布式系统它一定是由多个节点组成的系......
  • 京东一面:分布式 ID 生成方案怎么选?写得太好了!
    背景在分布式系统中,经常需要用到全局唯一ID发生器,标识需要存储的数据。我们需要什么样的ID生成器?ID生成器除了是数据的唯一标识以外,一般需要在系统中承担更多的责任,概括起来有以下几点:唯一性:“全局唯一”vs“业务唯一”?分布式系统使用唯一的ID生成器,会有非常严重的申请互斥......
  • HBASE完全分布式安装
    介绍HBase–HadoopDatabase,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HadoopHDFS为HBase提供了高可靠性的底层存储支持,HadoopMapReduce为HBase提供了高性能的计算能力,Zookeeper为HBase提供了稳定服务和failover机制。下载HBasehttps://hbase.apache.org/down......
  • 开源消息中间件ActiveMQ回顾:Java客户端实现
    前一段时间工作中经常使用到ApacheActiveMQ用作消息传输。今天在公司不是很忙,于是又深入研究了一下,总结一下分享出来。基于ActiveMQ的Java客户端实现例子。接口定义:publicinterfaceMQService{publicvoidstart();publicvoidsendQueueMessage(Stringtext)throws......
  • 分布式ActiveMQ集群
    回顾总结前一段时间学习的ActiveMQ分布式集群相关的知识,分享出来希望对看到的人有所帮助。一、分布式ActiveMQ集群的部署配置细节:官方资料:http://activemq.apache.org/clustering.html基本上看这个就足够了,本文就不具体分析配置文件了。1、Queueconsumerclusters:同一个queue,如果......
  • 公司某产品MySql分布式架构总结
    这个是目前公司某产品Server端MySql分布式架构总结(内容总结自wiki),该产品同时使用了Mysql和MongoDB。本篇Blog只做Mysql分布式架构的介绍。----------------------------------------------------一、共4台Linux服务器A\B\C\DAmasterwithslaveBC......
  • dubbo分布式项目开发____配置经验和心得
    //1.zookeeper配置文件中不要出现中文会报错 //2.分布式开发对象一定要序列化//摘抄如下在面向对象程序语言中做分布式计算的时候,经常需要将对象在不同的主机之间传输,我这次在实现分布式计算的时候,需要将一个计算对象从中央服务器Server分发给所有的客户端client。通过......
  • dubbo分布式项目开发____dubbo控制台管理
    //1.将dubbo-admin放到tomcat下自行解压修改dubbo.properties文件结构如下为了避免冲突如用到多个tomcat时修改端口号即可8080  其他8081...2..3..4等//从命名下war包的名称不带版本号放到tomcat下解压出来的就不带版本号了//启动tomcat访问这个项目我个人用的是8081单独......