首页 > 编程语言 >alertmanager源码:整体架构和流程分析

alertmanager源码:整体架构和流程分析

时间:2024-11-01 22:59:49浏览次数:5  
标签:... alertmanager 架构 err Alerts alerts 源码 func go

image.png

alertmanager整体的架构,官方的这张图说的很清楚,本文从源码的角度,分析其各个模块,以及模块间的交互流程。

alertmanager的代码使用v0.24.0版本。

一.API接收alerts

接口alerts的API为:

  • POST /api/v2/alerts

该API的handler如下:

  • 该handler先进行数据转换后,再进行数据校验,最后放入alerts数据结构;
// api/v2/api.go
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
    ...
    alerts := OpenAPIAlertsToAlerts(params.Alerts)
    ...
    for _, a := range alerts {
        removeEmptyLabels(a.Labels)

        if err := a.Validate(); err != nil {
            validationErrs.Add(err)
            api.m.Invalid().Inc()
            continue
        }
        validAlerts = append(validAlerts, a)
    }
    if err := api.alerts.Put(validAlerts...); err != nil {
        level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
        return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
    }
    ...
}

alerts对象在main中初始化:

  • 该对象类型=mem.Alerts类型;
// cmd/alertmanager/main.go

func run() int {
    ...
    alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger)
    api, err := api.New(api.Options{
        Alerts:      alerts,
        ...
    })
    ...
}

mem.Alerts将alerts数据,放入其中的alerts对象(store.Alerts类型)中:

  • store.Alerts中以map结构保存告警对象,保存在内存中;
// provider/mem/mem.go

func (a *Alerts) Put(alerts ...*types.Alert) error {
    for _, alert := range alerts {
        fp := alert.Fingerprint()
        ...
        if err := a.alerts.Set(alert); err != nil {
            level.Error(a.logger).Log("msg", "error on set alert", "err", err)
            continue
        }
        ...
    }
    return nil
}
// store/store.go

type Alerts struct {
    sync.Mutex
    c  map[model.Fingerprint]*types.Alert
    cb func([]*types.Alert)
}

func (a *Alerts) Set(alert *types.Alert) error {
    a.Lock()
    defer a.Unlock()

    a.c[alert.Fingerprint()] = alert
    return nil
}

最终,API收到的alerts数据,被放入了mem.Alerts对象中的store.Alerts对象内,保存在内存中。

二.Dispatcher订阅alerts

Dispatcher订阅了mem.Alerts的Subscribe()接口,并在内部通过run()函数处理Subscribe的aLert。

// dispatch/dispatch.go

func (d *Dispatcher) Run() {
    ...
    d.run(d.alerts.Subscribe())
    ...
}

先看一下mem.Alerts的Subscribe()接口:

  • 首先,得到store.Alerts中的所有告警;
  • 然后,将告警发送到chan中;
  • 最后,包装chan到Iterator中返回;
// provider/mem/mem.go

func (a *Alerts) Subscribe() provider.AlertIterator {
    ...
    var (
        done   = make(chan struct{})
        alerts = a.alerts.List()
        ch     = make(chan *types.Alert, max(len(alerts), alertChannelLength))
    )
    for _, a := range alerts {
        ch <- a
    }
    ...
    return provider.NewAlertIterator(ch, done, nil)
}

再看下Dispatcher如何读取并处理alerts:

  • 读取alerts是通过Iterator.Next()进行的,实际就是接收chan中的内容;
  • 处理alerts是通过d.processAlert()进行的;
// dispatch/dispatch.go

func (d *Dispatcher) run(it provider.AlertIterator) {
    ...
    for {
        select {
        case alert, ok := <-it.Next():    // 读取
            ....
            now := time.Now()
            for _, r := range d.route.Match(alert.Labels) {
                d.processAlert(alert, r)        // 处理
            }
        ...
        }
    }
}

d.processAlert()的过程比较复杂,它使用了aggrGroup对象:

// dispatch/dispatch.go

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
    ...
    ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
    ag.insert(alert)


    go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
        _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
        ...
        return err == nil
    })    
}

aggrGroup对象,使用d.stage.Exec()将告警分发给notify模块:

  • d.stage对象,实际就是notify的pipeline,在main中初始化;
// cmd/alertmanager/main.go

func run() int {
    ...
    pipeline := pipelineBuilder.New(
        receivers,
        waitFunc,
        inhibitor,
        silencer,
        timeIntervals,
        notificationLog,
        pipelinePeer,
    )
    disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
    ...
}

三.Notify的pipeline

在Notify的pipeline中:

  • 首先,先进行GossipSettle、inhibitor、silencer等操作;

    • 这里面的每个Stage,若其中一个失败,则直接返回err;
  • 然后,针对每个receiver,再进行Wait、Dedupe、Retry、SetNotifies操作;

    • 这里面的每个Stage,若其中一个失败,则直接返回err;
// notify/notify.go

func (pb *PipelineBuilder) New(
    receivers map[string][]Integration,
    wait func() time.Duration,
    inhibitor *inhibit.Inhibitor,
    silencer *silence.Silencer,
    times map[string][]timeinterval.TimeInterval,
    notificationLog NotificationLog,
    peer Peer,
) RoutingStage {
    rs := make(RoutingStage, len(receivers))

    ms := NewGossipSettleStage(peer)
    is := NewMuteStage(inhibitor)
    ss := NewMuteStage(silencer)
    tms := NewTimeMuteStage(times)
    tas := NewTimeActiveStage(times)

    for name := range receivers {
        st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
        rs[name] = MultiStage{ms, is, tas, tms, ss, st}
    }
    return rs
}
func createReceiverStage(
    name string,
    integrations []Integration,
    wait func() time.Duration,
    notificationLog NotificationLog,
    metrics *Metrics,
) Stage {
    var fs FanoutStage
    for i := range integrations {
        recv := &nflogpb.Receiver{
            GroupName:   name,
            Integration: integrations[i].Name(),
            Idx:         uint32(integrations[i].Index()),
        }
        var s MultiStage
        s = append(s, NewWaitStage(wait))
        s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
        s = append(s, NewRetryStage(integrations[i], name, metrics))
        s = append(s, NewSetNotifiesStage(notificationLog, recv))

        fs = append(fs, s)
    }
    return fs
}

具体的发送动作,是在RetryStage中进行的:

  • 各种发送方式,比如webhook,都是一种Integration,它们都实现了Notify()方法;
// notify/notify.go

func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
    ...
    b := backoff.NewExponentialBackOff()
    tick := backoff.NewTicker(b)
    for {
        select {
        case <-tick.C:
            retry, err := r.integration.Notify(ctx, sent...)      // 发送
            if err != nil {
                ...
                iErr = err
            } else {
                ...
                return ctx, alerts, nil
            }
        case <-ctx.Done():
            if iErr == nil {
                iErr = ctx.Err()
            }
            return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
        }
    }
}

四.Gossip的使用

Notify pipeline模块中的第一步,就是进行Gossip Settle,那么它是如何进行的呢?

可以看出,它进行了waitReady(),然后才继续:

// notify/notify.go

func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
    if n.peer != nil {
        if err := n.peer.WaitReady(ctx); err != nil {
            return ctx, nil, err
        }
    }
    return ctx, alerts, nil
}

集群版的alertmanager,在启动时,使用Peer.Settle(),检查集群是否ready:

// cmd/alertmanager/main.go

func run() {
    ...
    if peer != nil {
        err = peer.Join(
            *reconnectInterval,
            *peerReconnectTimeout,
        )
        ....
        go peer.Settle(ctx, *gossipInterval*10)
    }
    ...
}

Peer.Settle()使用hashcorp/memlist监测集群状态:

  • Settle()函数中,本次检测到的节点数量与上次检测到的节点数量相同,则认为本次检测ok;
  • Settle()函数中,连续检测3次都ok的话,认为集群处于稳定状态,可以工作了;
  • 与强一致的raft协议不同,Gossip协议中在集群处于稳定状态下,即使只有一个节点也可以工作;
// cluster/cluster.go

func (p *Peer) Settle(ctx context.Context, interval time.Duration) {
    const NumOkayRequired = 3
    nPeers := 0
    nOkay := 0
    ...
    for {
        select {
        case <-ctx.Done():
            close(p.readyc)
            return
        case <-time.After(interval):
        }
        ...
        n := len(p.Peers())        // 使用hashicorp/memlist进行监测
        if nOkay >= NumOkayRequired {
            break
        }
        if n == nPeers {
            nOkay++
        } else {
            nOkay = 0
        }
        nPeers = n
    }
    close(p.readyc)    // 监测OK
}
// cluster/cluster.go

func (p *Peer) WaitReady(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-p.readyc:
        return nil
    }
}

Gossip是分布式的最终一致性协议,alertmanager使用的是hashicorp的memlist库实现。

标签:...,alertmanager,架构,err,Alerts,alerts,源码,func,go
From: https://www.cnblogs.com/cheyunhua/p/18521436

相关文章

  • 基于Python爬虫与文本挖掘的网络舆情监控系统【附源码】
    基于Python爬虫与文本挖掘的网络舆情监控系统效果如下:系统登录界面注册页面界面管理员主界面用户界面网络舆情管理界面看板详细页面系统简介界面用户主界面网络舆情界面研究背景随着网络空间舆论的日益活跃,其对社会事件的影响愈发显著。企业和组织需要......
  • ssm026校园美食交流系统+vue(论文+源码)_kaic
     毕业论文题目  校园美食交流系统院   系:                   专   业:                    学   号:                    姓   名:                    指导老师......
  • prometheus源码分析:discovery自动发现
    discovery支持文件、http、consul等自动发现targets,targets会被发送到scrape模块进行拉取。一.整体框架discovery组件通过Manager对象管理所有的逻辑,当有数据变化时,通过syncChannel将数据发送给scrape组件。discovery组件会为每个Job_name创建一个provider对象,它包含Discover对......
  • 基于java中的springboot框架实现旅游管理系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现旅游管理系统项目演示【内附项目源码+LW说明】摘要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本旅游管理系统就是在这样的大环境下诞生,其可以帮助使用者在短时......
  • 基于java中的springboot框架实现经方药食两用服务平台项目演示【内附项目源码+论文说
    基于java中的springboot框架实现经方药食两用服务平台项目演示【内附项目源码+LW说明】摘要近年来,信息化管理行业的不断兴起,使得人们的日常生活越来越离不开计算机和互联网技术。首先,根据收集到的用户需求分析,对设计系统有一个初步的认识与了解,确定经方药食两用服务平台......
  • 基于java中的springboot框架实现旅游管理系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现旅游管理系统项目演示【内附项目源码+LW说明】摘要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本旅游管理系统就是在这样的大环境下诞生,其可以帮助使用者在短时......
  • 基于Java中的SSM框架实现企业办公自动化系统项目【项目源码+论文说明】计算机毕业设计
    基于java中的SSM框架实现企业办公自动化系统演示【内附项目源码+LW说明】摘要互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。针对小型企业办公信息管理混乱,出错......
  • 基于Java中的SSM框架实现菜匣子优选系统项目【项目源码+论文说明】计算机毕业设计
    基于java中的SSM框架实现菜匣子优选系统演示【内附项目源码+LW说明】摘要随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势;对于菜匣子优选生鲜电商系统当然也不能排除在外,随着网络技术的不断成熟,带动了菜匣子优选生鲜电商系统,它彻底......
  • 基于Java中的SSM框架实现营业厅宽带系统项目【项目源码+论文说明】
    基于java中的SSM框架实现营业厅宽带系统演示【内附项目源码+LW说明】摘要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本营业厅宽带系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理......
  • 基于Java中的SSM框架实现网上花店管理平台项目【项目源码+论文说明】
    基于java中的SSM框架实现网上花店管理平台演示【内附项目源码+LW说明】摘要网络技术和计算机技术发展至今,已经拥有了深厚的理论基础,并在现实中进行了充分运用,尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代,所以对于信息的宣传和管理就很关......