首页 > 其他分享 > 围观技术大牛,互联网项目海量日志实时分析平台实战

围观技术大牛,互联网项目海量日志实时分析平台实战

时间:2022-10-06 22:33:07浏览次数:83  
标签:flume producer 海量 大牛 kafka storm 日志 public

 围观技术大牛,互联网项目海量日志实时分析平台实战

1 序

对ETL系统中数据转换和存储操作的相关日志进行记录以及实时分析有助于我们更好的观察和监控ETL系统的相关指标(如单位时间某些操作的处理时间),发现系统中出现的缺陷和性能瓶颈。

由于需要对日志进行实时分析,所以Storm是我们想到的首个框架。Storm是一个分布式实时计算系统,它可以很好的处理流式数据。利用storm我们几乎可以直接实现一个日志分析系统,但是将日志分析系统进行模块化设计可以收到更好的效果。模块化的设计至少有两方面的优点:

2 相关框架的介绍和安装

2.1.1 原理介绍

Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统。Flume支持在日志系统中定制日志发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接收方的能力。它拥有一个简单的、可扩展的流式数据流架构,如下图所示:

日志收集系统就是由一个或者多个agent(代理)组成,每个agent由source、channel、sink三部分组成,source是数据的来源,channel是数据进行传输的通道,sink用于将数据传输到指定的地方。我们可以把agent看做一段水管,source是水管的入口,sink是水管的出口,数据流就是水流。Agent本质上是一个jvm进程,agent各个组件之间是通过event来进行触发和协调的。

2.1.2 flumeng的安装

  • 从官方网站下载apache-flume-1.4.0-bin.tar.gz压缩包

  • 解压缩,并在conf目录下面新建一个文件flume-conf.properties,内容如下:

  1.   a1.sources = r1
  2.   a1.sinks = k1
  3.   a1.channels = c1
  4.   #source配置信息
  5.   #r1的type为avro表示该source接收的数据协议为avro,且接收数据由avro客户端事件驱动
  6.   #(也就是说resource要通过avro-cliet向其发送数据)
  7.   a1.sources.r1.type = avro
  8.   a1.sources.r1.bind = localhost
  9.   a1.sources.r1.port = 44444
  10.   #sink配置信息
  11.   # type为logger意将数据输出至日志中(也就是打印在屏幕上)
  12.   a1.sinks.k1.type = logger
  13.   #channel配置信息
  14.   #type为memory意将数据存储至内存中
  15.   a1.channels.c1.type = memory
  16.   a1.channels.c1.capacity = 1000
  17.   a1.channels.c1.transactionCapacity = 100
  18.   #将source和sink绑定至该channel上
  19.   a1.sources.r1.channels = c1
  20.   a1.sinks.k1.channel = c1
  21.   该配置文件,配置了一个source为avro的服务器端用于日志的收集。具体的情况将在后面ETL系统与flume整合中介绍。
  • 启动代理。flume-ng agent –n a1 –f flume-conf.properties

2.2 kafka

2.2.1 原理介绍

Kafka是linkedin用于日志处理的分布式消息队列。Kafka的架构如下图所示:

Kafka的存储策略有一下几点:

  • kafka以topic来进行消息管理,每个topic包括多个partition,每个partition包括一个逻辑log,由多个segment组成。

  • 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

  • 每个partition在内存中对应一个index,记录每个segment中的第一条消息的偏移。

  • 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

2.2.2 kafka集群的搭建

Kafka集群的搭建需要依赖zookeeper来进行负载均衡,所以我们需要在安装kafka之前搭建zookeeper集群。

  • zookeeper集群的搭建,本系统用到了两台机器。具体搭建过程见http://blog.csdn.net/itleochen/article/details/17453881

  • 分别下载kafka_2.9.2-0.8.1的安装包到两台机器,并解压该安装包。

  • 打开conf/server.properties文件,修改配置项broker.id、zookeeper.connect、partitions以及host.name为相应的值。

  • 分别启动kafka即完成了集群的搭建。

2.3 storm

2.3.1 原理介绍

Storm是一个分布式的、高容错的实时计算系统。Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。

Strom集群里面有两种节点,控制节点和工作节点,控制节点上面运行一个nimbus(类似于hadoop中的JobTracker)后台程序,Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。每一个工作节点上面运行一个叫做Supervisor(类似Hadoop中的TaskTracker)的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个Topology(类似hadoop中的Job)的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程 Worker(类似Hadoop中的Child)组成。结构如下图所示:

Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如:你可以把一个tweets流传输到热门话题的流。

storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。

Spout是流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。

Bolt可以接收任意多个输入stream。Bolt处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。

spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似 Job), 你可以把topology提交给storm的集群来运行。Topology的结构如下图所示:

2.3.2 storm集群的搭建

Storm集群的搭建也要依赖于zookeeper,本系统中storm与kafka共用同样一个zookeeper集群。

  • 下载安装包storm-0.9.0.1.tar.gz,并对该包进行解压。

  • 配置nimbus。修改storm的conf/storm.yaml文件如下:

  1.    
  2.   storm.zookeeper.servers: //zookeeper集群
  3.    
  4.    
  5.   -“10.200.187.71″
  6.    
  7.    
  8.   -“10.200.187.73″
  9.    
  10.    
  11.   storm.local.dir:“/usr/endy/fks/storm-workdir“
  12.    
  13.    
  14.   storm.messaging.transport:“backtype.storm.messaging.netty.Context”
  15.    
  16.    
  17.   storm.messaging.netty.server_worker_threads:1
  18.    
  19.    
  20.   storm.messaging.netty.client_worker_threads:1
  21.    
  22.    
  23.   storm.messaging.netty.buffer_size:5242880
  24.    
  25.    
  26.   storm.messaging.netty.max_retries:100
  27.    
  28.    
  29.   storm.messaging.netty.max_wait_ms:1000
  30.    
  31.    
  32.   storm.messaging.netty.min_wait_ms:100
  33.    

注意:在每个配置项前面必须留有空格,否则会无法识别。storm.messaging.* 部分是Netty的配置。如果没有该部分。那么Storm默认还是使用ZeroMQ。

  • 配置supervisor 修改storm的conf/storm.yaml文件如下:

  1.   storm.zookeeper.servers:
  2.   - “10.200.187.71″
  3.   - “10.200.187.73″
  4.   nimbus.host: “10.200.187.71″
  5.   supervisor.slots.ports:
  6.   - 6700
  7.   - 6701
  8.   - 6702
  9.   storm.local.dir: “/usr/endy/fks/storm-workdir”
  10.   storm.messaging.transport: “backtype.storm.messaging.netty.Context”
  11.   storm.messaging.netty.server_worker_threads: 1
  12.   storm.messaging.netty.client_worker_threads: 1
  13.   storm.messaging.netty.buffer_size: 5242880
  14.   storm.messaging.netty.max_retries: 100
  15.   storm.messaging.netty.max_wait_ms: 1000
  16.   storm.messaging.netty.min_wait_ms: 100

注意

  • nimbus.host是nimbus的IP或hostname

  • supervisor.slots.ports 是配置slot的ip地址。配了几个地址,就有几个slot,即几个worker。如果尝试提交的topology所声明的worker数超过当前可用的slot,该topology提交会失败。

  • storm.messaging 部分是Netty的配置。

2.4 drools

Drools是一个基于Java的、开源的规则引擎,可以将复杂多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得规则的变更不需要修正代码重启机器就可以立即在线上环境生效。日志分析系统中,drools的作用是利用不同的规则对日志信息进行处理,以获得我们想要的数据。但是,Drools本身不是一个分布式框架,所以规则引擎对log的处理无法做到分布式。我们的策略是将drools整合到storm的bolt中去,这就就解决了drools无法分布式的问题。这是因为bolt可以作为task分发给多个worker来处理,这样drools中的规则也自然被多个worker处理了。

2.5 redis

Redis是key-value存储系统,它支持较为丰富的数据结构,有String,list,set,hash以及zset。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。Redis是内存数据库,所以有非常快速的存取效率。日志分析系统数据量并不是特别大,但是对存取的速度要求较高,所以选择redis有很大的优势。

3 各个框架的整合

3.1 ETL系统整合flumeng

Flume如何收集ETL系统中的日志是我需要考虑的第一个问题。log4j2提供了专门的Appender-FlumeAppender用于将log信息发送到flume系统,并不需要我们来实现。我们在log4j2的配置文件中配置了ETL系统将log信息发送到的目的地,即avro服务器端。该服务器端我们在flume的配置文件中进行了配置。配置信息如下所示:

  1.   producer.sources=s
  2.    
  3.   producer.channels=c
  4.    
  5.   producer.sinks=r
  6.    
  7.   producer.sources.s.type=avro
  8.    
  9.   producer.sources.s.channels=c
  10.    
  11.   producer.sources.s.bind=10.200.187.71
  12.    
  13.   producer.sources.s.port=4141

3.2 flumeng与kafka的整合

我们从ETL系统中获得了日志信息,将该信息不作任何处理传递到sink端,sink端发送数据到kafka。这个发送过程需要我们编写代码来实现,我们的实现代码为KafkaSink类。主要代码如下所示:

  1.   public class KafkaSink extends AbstractSink implements Configurable {
  2.   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
  3.   private Properties parameters;
  4.   private Producer<String, String> producer;
  5.   private Context context;
  6.   @Override
  7.   public void configure(Context context) {
  8.   this.context = context;
  9.   ImmutableMap<String, String> props = context.getParameters();
  10.    
  11.   parameters = new Properties();
  12.   for (String key : props.keySet()) {
  13.   String value = props.get(key);
  14.   this.parameters.put(key, value);
  15.   }
  16.   }
  17.   @Override
  18.   public synchronized void start() {
  19.   super.start();
  20.   ProducerConfig config = new ProducerConfig(this.parameters);
  21.   this.producer = new Producer<String, String>(config);
  22.   }
  23.   @Override
  24.   public Status process() throws EventDeliveryException {
  25.   Status status = null;
  26.   // Start transaction
  27.   Channel ch = getChannel();
  28.   Transaction txn = ch.getTransaction();
  29.   txn.begin();
  30.   try {
  31.   // This try clause includes whatever Channel operations you want to do
  32.   Event event = ch.take();
  33.   String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
  34.   String encoding = StringUtils.defaultIfEmpty(
  35.   (String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
  36.   KafkaFlumeConstans.DEFAULT_ENCODING);
  37.   String topic = Preconditions.checkNotNull(
  38.   (String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
  39.   “custom.topic.name is required”);
  40.   String eventData = new String(event.getBody(), encoding);
  41.   KeyedMessage<String, String> data;
  42.   // if partition key does’nt exist
  43.   if (StringUtils.isEmpty(partitionKey)) {
  44.   data = new KeyedMessage<String, String>(topic, eventData);
  45.   } else {
  46.   data = new KeyedMessage<String, String>(topic, String.valueOf(new Random().nextInt(Integer.parseInt(partitionKey))), eventData);
  47.   }
  48.   if (LOGGER.isInfoEnabled()) {
  49.   LOGGER.info(“Send Message to Kafka : [" + eventData + "] — [" + EventHelper.dumpEvent(event) + "]“);
  50.   }
  51.   producer.send(data);
  52.   txn.commit();
  53.   status = Status.READY;
  54.   } catch (Throwable t) {
  55.   txn.rollback();
  56.   status = Status.BACKOFF;
  57.   // re-throw all Errors
  58.   if (t instanceof Error) {
  59.   throw (Error) t;
  60.   }
  61.   } finally {
  62.   txn.close();
  63.   }
  64.   return status;
  65.   }
  66.   @Override
  67.   public void stop() {
  68.   producer.close();
  69.   }
  70.   }

该类中,我们读取了一些配置信息,这些配置信息我们在flumeng的flume-conf.properties文件中进行了定义,定义内容如下:

  1.   producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
  2.    
  3.   producer.sinks.r.metadata.broker.list=10.200.187.71:9092
  4.    
  5.   producer.sinks.r.partition.key=0
  6.    
  7.   producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
  8.    
  9.   producer.sinks.r.request.required.acks=0
  10.    
  11.   producer.sinks.r.max.message.size=1000000
  12.    
  13.   producer.sinks.r.producer.type=sync
  14.    
  15.   producer.sinks.r.custom.encoding=UTF-8
  16.    
  17.   producer.sinks.r.custom.topic.name=fks1
  18.    
  19.   producer.sinks.r.channel=c
  20.    
  21.   producer.channels.c.type=memory
  22.    
  23.   producer.channels.c.capacity=1000

将上面的KafkaSink类打包成flumeng-kafka.jar,并将该jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目录下,启动flume,我们就可以将ETL系统中产生的日志信息发送到kafka中的fks1这个topic中去了。

3.3 kafka与storm的整合

Storm中的spout如何主动消费kafka中的消息需要我们编写代码来实现,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus实现了一个kafka与storm整合的插件,下载该插件,将插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目录下。利用插件中的StormSpout类,我们就可以消费kafka中的消息了。主要代码如下所示:

  1.   public class KafkaSpout extends BaseRichSpout {
  2.   public static class MessageAndRealOffset {
  3.   public Message msg;
  4.   public long offset;
  5.    
  6.   public MessageAndRealOffset(Message msg, long offset) {
  7.   this.msg = msg;
  8.   this.offset = offset;
  9.   }
  10.   }
  11.   static enum EmitState {
  12.   EMITTED_MORE_LEFT,
  13.   EMITTED_END,
  14.   NO_EMITTED
  15.   }
  16.   public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
  17.   String _uuid = UUID.randomUUID().toString();
  18.   SpoutConfig _spoutConfig;
  19.   SpoutOutputCollector _collector;
  20.   PartitionCoordinator _coordinator;
  21.   DynamicPartitionConnections _connections;
  22.   ZkState _state;
  23.   long _lastUpdateMs = 0;
  24.   int _currPartitionIndex = 0;
  25.   public KafkaSpout(SpoutConfig spoutConf) {
  26.   _spoutConfig = spoutConf;
  27.   }
  28.   @Override
  29.   public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
  30.   _collector = collector;
  31.    
  32.   Map stateConf = new HashMap(conf);
  33.   List zkServers = _spoutConfig.zkServers;
  34.   if (zkServers == null) {
  35.   zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
  36.   }
  37.   Integer zkPort = _spoutConfig.zkPort;
  38.   if (zkPort == null) {
  39.   zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
  40.   }
  41.   stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
  42.   stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
  43.   stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
  44.   _state = new ZkState(stateConf);
  45.   _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
  46.   // using TransactionalState like this is a hack
  47.   int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
  48.   if (_spoutConfig.hosts instanceof StaticHosts) {
  49.   _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
  50.   } else {
  51.   _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
  52.   }
  53.   context.registerMetric(“kafkaOffset”, new IMetric() {
  54.   KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
  55.   @Override
  56.   public Object getValueAndReset() {
  57.   List pms = _coordinator.getMyManagedPartitions();
  58.   Set latestPartitions = new HashSet();
  59.   for (PartitionManager pm : pms) {
  60.   latestPartitions.add(pm.getPartition());
  61.   }
  62.   _kafkaOffsetMetric.refreshPartitions(latestPartitions);
  63.   for (PartitionManager pm : pms) {
  64.   _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
  65.   }
  66.   return _kafkaOffsetMetric.getValueAndReset();
  67.   }
  68.   }, 60);
  69.   context.registerMetric(“kafkaPartition”, new IMetric() {
  70.   @Override
  71.   public Object getValueAndReset() {
  72.   List pms = _coordinator.getMyManagedPartitions();
  73.   Map concatMetricsDataMaps = new HashMap();
  74.   for (PartitionManager pm : pms) {
  75.   concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
  76.   }
  77.   return concatMetricsDataMaps;
  78.   }
  79.   }, 60);
  80.   }
  81.   @Override
  82.   public void close() {
  83.   _state.close();
  84.   }
  85.   @Override
  86.   public void nextTuple() {
  87.   List managers = _coordinator.getMyManagedPartitions();
  88.   for (int i = 0; i < managers.size(); i++) { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now – _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
  89.   commit();
  90.   }
  91.   }
  92.   @Override
  93.   public void ack(Object msgId) {
  94.   KafkaMessageId id = (KafkaMessageId) msgId;
  95.   PartitionManager m = _coordinator.getManager(id.partition);
  96.   if (m != null) {
  97.   m.ack(id.offset);
  98.   }
  99.   }
  100.   @Override
  101.   public void fail(Object msgId) {
  102.   KafkaMessageId id = (KafkaMessageId) msgId;
  103.   PartitionManager m = _coordinator.getManager(id.partition);
  104.   if (m != null) {
  105.   m.fail(id.offset);
  106.   }
  107.   }
  108.   @Override
  109.   public void deactivate() {
  110.   commit();
  111.   }
  112.   @Override
  113.   public void declareOutputFields(OutputFieldsDeclarer declarer) {
  114.   declarer.declare(_spoutConfig.scheme.getOutputFields());
  115.   }
  116.    
  117.   private void commit() {
  118.   _lastUpdateMs = System.currentTimeMillis();
  119.   for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
  120.   manager.commit();
  121.   }
  122.   }
  123.    
  124.   }
  125.    
  126.    

3.4 Storm中bolt与drSools的整合

storm中bolt与drools的整合 Drools可以将storm中处理数据的规则提取到一个drl文件中,该文件就成了唯一处理规则的文件。任何时候规则出现变化,我们只需要修改该drl文件,而不会改变其它的代码。Bolt与drools的整合代码如下所示:

  1.   publicclassLogRulesBoltimplementsIBasicBolt{
  2.    
  3.   Loggerlogger=LoggerFactory.getLogger(LogRulesBolt.class);
  4.    
  5.   privatestaticfinallongserialVersionUID=1L;
  6.    
  7.   publicstaticfinalStringLOG_ENTRY=“str”;
  8.    
  9.   privateStatelessKnowledgeSessionksession;
  10.    
  11.   privateStringdrlFile;
  12.    
  13.   publicLogRulesBolt()
  14.   {}
  15.    
  16.   publicLogRulesBolt(StringdrlFile)
  17.   {
  18.   this.drlFile=drlFile;
  19.   }
  20.    
  21.    
  22.   @Override
  23.   publicvoidprepare(MapstormConf,TopologyContextcontext){
  24.   KnowledgeBuilderkbuilder=KnowledgeBuilderFactory.newKnowledgeBuilder();
  25.   try{
  26.   kbuilder.add(ResourceFactory.newInputStreamResource(newFileInputStream(newFile(drlFile))),ResourceType.DRL);
  27.   }catch(FileNotFoundExceptione){
  28.    
  29.    
  30.   logger.error(e.getMessage());
  31.    
  32.    
  33.   }
  34.    
  35.    
  36.   KnowledgeBasekbase=KnowledgeBaseFactory.newKnowledgeBase();
  37.    
  38.    
  39.   kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
  40.    
  41.    
  42.   ksession=kbase.newStatelessKnowledgeSession();
  43.    
  44.    
  45.   }
  46.    
  47.    
  48.   @Override
  49.    
  50.    
  51.   publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){
  52.    
  53.    
  54.   StringlogContent=(String)input.getValueByField(LOG_ENTRY);
  55.    
  56.    
  57.   logContent=logContent.trim();
  58.    
  59.    
  60.   if(!””.equals(logContent)&&logContent!=null)
  61.    
  62.    
  63.   {
  64.    
  65.    
  66.   LogEntryentry=newLogEntry(logContent);
  67.    
  68.    
  69.   try{
  70.    
  71.    
  72.   ksession.execute(entry);
  73.    
  74.    
  75.   }catch(Exceptione)
  76.    
  77.    
  78.   {
  79.    
  80.    
  81.   logger.error(“droolstohandlelog["+logContent+"]isfailure!”);
  82.    
  83.    
  84.   logger.error(e.getMessage());
  85.    
  86.    
  87.   }
  88.    
  89.   collector.emit(newValues(entry));
  90.   }
  91.   else
  92.   {
  93.   logger.error(“logcontentisempty!”);
  94.   }
  95.   }
  96.    
  97.    
  98.   @Override
  99.   publicvoidcleanup(){
  100.   }
  101.    
  102.    
  103.   @Override
  104.   publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
  105.    
  106.   declarer.declare(newFields(LOG_ENTRY));
  107.    
  108.   }
  109.    
  110.    
  111.   @Override
  112.    
  113.    
  114.   publicMap<String,Object>getComponentConfiguration(){
  115.   returnnull;
  116.    
  117.   }
  118.    
  119.   }
  120.    

通过规则处理数据之后,我们就可以将处理过的数据发送到下一个bolt中,然后将数据存储到redis中。

4 相关思考

4.1 系统的优点

  • 模块化的设计,使功能分散到各个模块中,对各个功能进行了解耦,使系统的容错性更高。

  • kafka作为中间缓冲,解决了flume和storm速度不匹配的问题。

  • 利用drools将规则和数据进行了解耦。把规则写到一个配置文件中,避免了每次修改规则就要修改代码的缺点。

  • storm和drools整合解决了drools的规则引擎无法并行化的问题。

  • redis是内存数据库,可以很快速的写数据到数据库中,加快了整个系统的处理速度,避免了数据库的瓶颈。

4.2 待考虑的问题

  • 整个系统还没有用大量数据进行测试,稳定性以及性能瓶颈需要进一步的考虑、发现和改进。

  • 在现有的系统中,flume只能发送数据到kafka的单个broker的单个partition中,后期需要修改代码以适应多个broker多个partition。这点是可以实现的,我已经实现了一部分。可以将数据发送到单个broker的多个partition中。

  • 现有的系统,修改规则文件之后,需要重新启动topology,无法进行热加载。这点是需要进一步考虑的。

  • drools是一个优异的规则引擎。但是它的速度仍然让我有点担心。这个问题可能在以后数据变大之后会体现出来。我们思考了esper这个开源的规则引擎,它的速度更快,但是它类sql语言的规则处理语言不是太适合我们的日志分析系统。以后是不是能够作进一步的开发,用esper代替drools是我们要考虑的一个问题。

  • 思考现有的架构,flume并不是缺一不可的模块,我们可以在ETL系统中直接将log信息发送到kafka中,然后利用storm进行处理。但是为了整个系统的可扩展性(例如我们还想要将log信息发送到HDFS中,利用flume可以直接配置)和易配置性,利用flume会更好。是否要用flume,flume是否会影响整个系统的速度,需要以后进一步的论证。

  • flume、kafka、storm、redis的各个参数的取值对系统的影响也较大。所以这些参数需要在以后的应用中选定合适的值。

4.3 框架层面的思考

  • flume是纯java实现的框架,比较有趣的是各种source接口(如avro source、thrift source)以及sink(HDFS sink、Logger sink)接口的实现。以后有兴趣可以进一步阅读源代码。

  • kafka的思路很好,充分利用了磁盘顺序写入和顺序读取的路子,存储的性能很好,只要几个节点就能处理大量的消息了;另外,它突破了常规的一些消息中间件由服务端来记录消息消费状态的传统,彻底由客户端自己来记录究竟处理到哪里了,失败也罢成功也罢,客户端本来是最清楚的了,由它来记录消费状态是最适合不过了。Kafka中这种处理思路是我们值得学习的地方,我们也可以看代码来体会这种设计。Kafka是由scala实现的,没有scala基础的可以先看看scala编程。

  • storm主要是用clojure、java来实现的,还包括部分的Python代码。代码量25000行左右。在它的源代码中,用java实现框架结构,clojure实现功能细节。storm中的模拟本地集群的实现,保证消息只处理一次的功能的实现,都很巧妙,值得我们去看代码,不管是现在用得到还是用不到。

  • redis是c实现的,速度很快,代码量不大。

推荐阅读 ↓↓↓

1.不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

2.如何才能成为优秀的架构师?

3.从零开始搭建创业公司后台技术栈

4.“37岁,985毕业,年薪50万,被裁掉只用了10分钟”

5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6.副业&接私活必备的 10 个开源项目!

7.你知道哪10大算法统治着全球吗?

8.15张图看懂瞎忙和高效的区别!

 

标签:flume,producer,海量,大牛,kafka,storm,日志,public
From: https://www.cnblogs.com/go1166/p/16758715.html

相关文章