alertmanager整体的架构,官方的这张图说的很清楚,本文从源码的角度,分析其各个模块,以及模块间的交互流程。
alertmanager的代码使用v0.24.0版本。
一.API接收alerts
接口alerts的API为:
- POST /api/v2/alerts
该API的handler如下:
- 该handler先进行数据转换后,再进行数据校验,最后放入alerts数据结构;
// api/v2/api.go func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder { ... alerts := OpenAPIAlertsToAlerts(params.Alerts) ... for _, a := range alerts { removeEmptyLabels(a.Labels) if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() continue } validAlerts = append(validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } ... }
alerts对象在main中初始化:
- 该对象类型=mem.Alerts类型;
// cmd/alertmanager/main.go func run() int { ... alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger) api, err := api.New(api.Options{ Alerts: alerts, ... }) ... }
mem.Alerts将alerts数据,放入其中的alerts对象(store.Alerts类型)中:
- store.Alerts中以map结构保存告警对象,保存在内存中;
// provider/mem/mem.go func (a *Alerts) Put(alerts ...*types.Alert) error { for _, alert := range alerts { fp := alert.Fingerprint() ... if err := a.alerts.Set(alert); err != nil { level.Error(a.logger).Log("msg", "error on set alert", "err", err) continue } ... } return nil }
// store/store.go type Alerts struct { sync.Mutex c map[model.Fingerprint]*types.Alert cb func([]*types.Alert) } func (a *Alerts) Set(alert *types.Alert) error { a.Lock() defer a.Unlock() a.c[alert.Fingerprint()] = alert return nil }
最终,API收到的alerts数据,被放入了mem.Alerts对象中的store.Alerts对象内,保存在内存中。
二.Dispatcher订阅alerts
Dispatcher订阅了mem.Alerts的Subscribe()接口,并在内部通过run()函数处理Subscribe的aLert。
// dispatch/dispatch.go func (d *Dispatcher) Run() { ... d.run(d.alerts.Subscribe()) ... }
先看一下mem.Alerts的Subscribe()接口:
- 首先,得到store.Alerts中的所有告警;
- 然后,将告警发送到chan中;
- 最后,包装chan到Iterator中返回;
// provider/mem/mem.go func (a *Alerts) Subscribe() provider.AlertIterator { ... var ( done = make(chan struct{}) alerts = a.alerts.List() ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) ) for _, a := range alerts { ch <- a } ... return provider.NewAlertIterator(ch, done, nil) }
再看下Dispatcher如何读取并处理alerts:
- 读取alerts是通过Iterator.Next()进行的,实际就是接收chan中的内容;
- 处理alerts是通过d.processAlert()进行的;
// dispatch/dispatch.go func (d *Dispatcher) run(it provider.AlertIterator) { ... for { select { case alert, ok := <-it.Next(): // 读取 .... now := time.Now() for _, r := range d.route.Match(alert.Labels) { d.processAlert(alert, r) // 处理 } ... } } }
d.processAlert()的过程比较复杂,它使用了aggrGroup对象:
// dispatch/dispatch.go func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ... ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) ag.insert(alert) go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) ... return err == nil }) }
aggrGroup对象,使用d.stage.Exec()将告警分发给notify模块:
- d.stage对象,实际就是notify的pipeline,在main中初始化;
// cmd/alertmanager/main.go func run() int { ... pipeline := pipelineBuilder.New( receivers, waitFunc, inhibitor, silencer, timeIntervals, notificationLog, pipelinePeer, ) disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) ... }
三.Notify的pipeline
在Notify的pipeline中:
-
首先,先进行GossipSettle、inhibitor、silencer等操作;
- 这里面的每个Stage,若其中一个失败,则直接返回err;
-
然后,针对每个receiver,再进行Wait、Dedupe、Retry、SetNotifies操作;
- 这里面的每个Stage,若其中一个失败,则直接返回err;
// notify/notify.go func (pb *PipelineBuilder) New( receivers map[string][]Integration, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, times map[string][]timeinterval.TimeInterval, notificationLog NotificationLog, peer Peer, ) RoutingStage { rs := make(RoutingStage, len(receivers)) ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor) ss := NewMuteStage(silencer) tms := NewTimeMuteStage(times) tas := NewTimeActiveStage(times) for name := range receivers { st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) rs[name] = MultiStage{ms, is, tas, tms, ss, st} } return rs }
func createReceiverStage( name string, integrations []Integration, wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, ) Stage { var fs FanoutStage for i := range integrations { recv := &nflogpb.Receiver{ GroupName: name, Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } var s MultiStage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) s = append(s, NewRetryStage(integrations[i], name, metrics)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) fs = append(fs, s) } return fs }
具体的发送动作,是在RetryStage中进行的:
- 各种发送方式,比如webhook,都是一种Integration,它们都实现了Notify()方法;
// notify/notify.go func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { ... b := backoff.NewExponentialBackOff() tick := backoff.NewTicker(b) for { select { case <-tick.C: retry, err := r.integration.Notify(ctx, sent...) // 发送 if err != nil { ... iErr = err } else { ... return ctx, alerts, nil } case <-ctx.Done(): if iErr == nil { iErr = ctx.Err() } return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) } } }
四.Gossip的使用
Notify pipeline模块中的第一步,就是进行Gossip Settle,那么它是如何进行的呢?
可以看出,它进行了waitReady(),然后才继续:
// notify/notify.go func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if n.peer != nil { if err := n.peer.WaitReady(ctx); err != nil { return ctx, nil, err } } return ctx, alerts, nil }
集群版的alertmanager,在启动时,使用Peer.Settle(),检查集群是否ready:
// cmd/alertmanager/main.go func run() { ... if peer != nil { err = peer.Join( *reconnectInterval, *peerReconnectTimeout, ) .... go peer.Settle(ctx, *gossipInterval*10) } ... }
Peer.Settle()使用hashcorp/memlist监测集群状态:
- Settle()函数中,本次检测到的节点数量与上次检测到的节点数量相同,则认为本次检测ok;
- Settle()函数中,连续检测3次都ok的话,认为集群处于稳定状态,可以工作了;
- 与强一致的raft协议不同,Gossip协议中在集群处于稳定状态下,即使只有一个节点也可以工作;
// cluster/cluster.go func (p *Peer) Settle(ctx context.Context, interval time.Duration) { const NumOkayRequired = 3 nPeers := 0 nOkay := 0 ... for { select { case <-ctx.Done(): close(p.readyc) return case <-time.After(interval): } ... n := len(p.Peers()) // 使用hashicorp/memlist进行监测 if nOkay >= NumOkayRequired { break } if n == nPeers { nOkay++ } else { nOkay = 0 } nPeers = n } close(p.readyc) // 监测OK }
// cluster/cluster.go func (p *Peer) WaitReady(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() case <-p.readyc: return nil } }
Gossip是分布式的最终一致性协议,alertmanager使用的是hashicorp的memlist库实现。
标签:...,alertmanager,架构,err,Alerts,alerts,源码,func,go From: https://www.cnblogs.com/cheyunhua/p/18521436