首页 > 其他分享 >实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

时间:2023-08-29 15:35:10浏览次数:49  
标签:varchar name DolphinScheduler json 实操 Apache message ds user

file

作者 | sqlboy-yuzhenc

背景介绍

在实际应用中,我们经常需要将特定的任务通知给特定的人,虽然 Apache DolphinScheduler 在安全中心提供了告警组和告警实例,但是配置起来相对复杂,并且还需要在定时调度时指定告警组。通过这篇文章,你将学到一个简单的方法,无需任何配置,只需要在用户表(t_ds_user)表中增加字段钉钉名称(dignding_name),创建用户时指定用户的手机号码和维护对应的钉钉名称,就能轻松实现 Apache DolphinScheduler 任务失败时钉钉告警到指定的人。

安装插件plpython3u

psql etl -U postgres
create extension plpython3u

pip安装requests

cd /opt && wget https://bootstrap.pypa.io/get-pip.py
python get-pip.py
pip install requests

创建发送钉钉的存储过程

  • plpython3u为不受信语言,所以只能被超级用户使用
sql
create or replace function tool.sp_send(
      message json
     ,webhook varchar 
     ,secret varchar 
)
    returns text
    language plpython3u
    security definer 
as $function$
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
"""
/*
 * 作者 : v-yuzhenc
 * 功能 : 给钉钉发送一条消息
 * message : 需要发送的消息,json格式,详情参考https://open.dingtalk.com/document/robots/custom-robot-access
 * webhook : 钉钉机器人的webhook
 * secret : 钉钉机器人的secret
 * */
"""
v_timestamp = str(round(time.time() * 1000))
p_secret = secret
secret_enc = p_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(v_timestamp, p_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
v_sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

# 钉钉自定义机器人的webhook地址
p_webhook = webhook
webhook_url = p_webhook+"&timestamp="+v_timestamp+"&sign="+v_sign
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(webhook_url, data=json.dumps(p_message), headers={"Content-Type": "application/json"})

# 打印响应结果
return response.text
$function$;

alter function tool.sp_send(json,varchar,varchar) owner to tool;
grant execute on function tool.sp_send(json,varchar,varchar) to public;

测试发送钉钉的存储过程

select sp_send('{
    "msgtype": "actionCard",
    "actionCard": {
        "title": "我 20 年前想打造一间苹果咖啡厅,而它正是 Apple Store 的前身", 
        "text": "![screenshot](/i/l/?n=23&i=other/2685289/202308/2685289-20230829152524866-1747807117.png) \n\n #### 乔布斯 20 年前想打造的苹果咖啡厅 \n\n Apple Store 的设计正从原来满满的科技感走向生活化,而其生活化的走向其实可以追溯到 20 年前苹果一个建立咖啡馆的计划", 
        "btnOrientation": "0", 
        "btns": [
            {
                "title": "内容不错", 
                "actionURL": "https://www.dingtalk.com/"
            }, 
            {
                "title": "不感兴趣", 
                "actionURL": "https://www.dingtalk.com/"
            }
        ]
    }
}'::json);

file

参考

自定义机器人安全设置 - 钉钉开放平台

自定义机器人接入 - 钉钉开放平台

t_ds_user增加字段

alter table t_ds_user add column dingding_name varchar(100);
--人为将海豚账号对应的钉钉用户名更新上去

编写触发器

CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_ding()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
/*
 * 作者:v-yuzhenc
 * 功能:海豚调度工作流失败自动告警
 * */
declare
    i record;
    v_user varchar;
    v_mobile varchar;
    v_content text;
    v_message varchar;
begin
    if new.state in (4,5,6) then 
        for i in (
            select 
			     d.user_name
			    ,d.phone 
			    ,d.dingding_name
			    ,g.name project_name
			    ,e.name process_name
			    ,string_agg(distinct b.name||' '||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss'),'\r\n') task_name
			from t_ds_process_instance a 
			inner join t_ds_task_instance b 
			on (a.id = b.process_instance_id)
			inner join t_ds_task_definition c 
			on (b.task_code = c.code and b.task_definition_version = c."version")
			inner join t_ds_user d 
			on (c.user_id = d.id)
			inner join t_ds_process_definition e 
			on (a.process_definition_code = e.code and a.process_definition_version = e."version")
			inner join t_ds_project g 
            on (e.project_code = g.code)
			where c.task_type <> 'SUB_PROCESS'
			    and a.state = 6
			    and b.state = 6
			    and a.id = new.id
			group by d.user_name
				,d.phone 
				,d.dingding_name
				,g.name
				,e.name
        ) loop 
            v_mobile := i.phone;
            v_user := i.dingding_name;
            v_content := '海豚工作流执行失败,请尽快处理!\r\n项目名称:\r\n'||i.project_name||'\r\n工作流名称:\r\n'||i.process_name||'\r\n任务名称:\r\n'||i.task_name;
            v_message := $v_message${
    "at": {
        "atMobiles":[
            "$v_message$||v_mobile||$v_message$"
        ],
        "atUserIds":[
            "$v_message$||v_user||$v_message$"
        ],
        "isAtAll": false
    },
    "text": {
        "content":"$v_message$||v_content||$v_message$"
    },
    "msgtype":"text"
}$v_message$;
            --告警
            perform tool.sp_send(v_message::json);
        end loop;
    end if;
    return new;
end;
$function$
;

create trigger tg_state_ds_process_instance after update on t_ds_process_instance for each row execute procedure tg_ds_udef_alert_ding();

测试

file

本文转载自CSDN博主sqlboy-yuzhenc文章:https://blog.csdn.net/qq_33445829/article/details/131073349

本文由 白鲸开源 提供发布支持!

标签:varchar,name,DolphinScheduler,json,实操,Apache,message,ds,user
From: https://www.cnblogs.com/DolphinScheduler/p/17664927.html

相关文章

  • Apache RocketMQ 5.0 消息进阶:如何支撑复杂的业务消息场景?
    作者:隆基一致性首先来看RocketMQ的第一个特性-事务消息,事务消息是RocketMQ与一致性相关的特性,也是RocketMQ有别于其他消息队列的最具区分度的特性。以大规模电商系统为例,付款成功后会在交易系统中订单数据库将订单状态更新为已付款。然后交易系统再发送一条消息给Rocke......
  • centos7.9+php+apache 配置阿里云ssl
     1、前往阿里云官网申请免费证书选择 2、证书存放在/etc/cert 3、httpd.cof配置ssl<VirtualHost_default_:443> ServerName域名 DocumentRoot网站根目录 SSLEngineon SSLProtocolall-SSLv2-SSLv3 SSLCipherSuiteHIGH:!RC4:!MD5:!aNULL:!eNULL:!NULL:......
  • 使用Apache IoTDB进行IoT相关开发的架构设计与功能实现(11)
    账户管理报表IoTDB可以为用户提供账号权限管理操作,保障数据安全。接下来我将通过以下具体示例向朋友们展示基本的用户权限管理操作,介绍详细的SQL语法和用法详细信息。基本概念用户用户是数据库的合法用户。用户对应于唯一的用户名,并具有密码作为身份验证方式。在使用数据库之前,一......
  • Apache SeaTunnel 2.3.3 版本发布,CDC 支持 Schema Evolution!
    时隔两个月,ApacheSeaTunnel终于迎来大版本更新。此次发布的2.3.3版本在功能和性能上均有较大优化改进,其中大家期待已久的CDCSchemaevolution(DDL变更同步)、主键Split拆分、JDBCSink自动建表功能、SeaTunnelZeta引擎支持作业配置支持变量替换和传参等都是更新的亮......
  • 【专题】洞察营销的战略优势与实操报告PDF合集分享(附原数据表)
    全文链接:https://tecdat.cn/?p=33511根据报告合集显示,在消费者的亲友分享、社交平台、订单评价等环节,00后表现出活跃的参与度,而90后和95后在部分环节也较为活跃。相比之下,70后和80后在分享中的参与度最低,主要以亲友分享为主。阅读原文,获取专题报告合集全文,解锁文末335份品牌营销......
  • 实用指令_实操作_RPM包管理
    RPM和YUMRPM包的管理一种用于互联网下载包的打包及安装工具,它包含在某些Linux分发版中,它生成具有.rpm扩展名的文件。PRM是RedHatPackageManager(RedHat软件包管理工具)的缩写,类似window的setup.exe。这一文件格式名称虽然打上了RedHat的标志但理念是通用的。Linux的分布版本......
  • 实用指令_实操作_yum
    yumYum是一个shell前端软件包管理器,基于RPM包管理器,能够从指定的服务器自动下载RPM包并且安装,可以自动处理依赖关系,并且一次安装所有依赖的软件包yum的基本命令查询yum服务器是否有需要安装的软件yumlist|grepXX软件列表安装指定的yum包yuminstallxxx下载安装yu......
  • 实用指令_实操作_进程服务管理
    服务(service)管理服务本质就是进程,但是是运行在后台的,通常都会监听某个端口,等待其他程序的请求,比如(mysql,sshd防火墙等),因此我们又称为守护进程,是linux中非常重要的知识点service管理指令service服务名[start|stop|restart|reload|status]systemctlCento7以后基础......
  • 实用指令_实操作_进程管理_进程监控网络监控
    动态监控进程top与ps命令很相似,它们都用来显示正在执行的进程。类似于任务管理器。top与ps最大的不同之处,在于top在执行一段时间可以更新正在运行的进程。基本语法top[选项]选项说明选项功能-d秒数指定top命令每隔几秒更新。默认是3秒在top命令的交互......
  • 实用指令_实操作_进程管理
    进程管理基本介绍在linux中,每个执行的程序(代码)都称为一个进程。每一个进程都分配一个id号每一个进程,都会对应一个父进程,而这个父进程可以复制多个子进程,例如www服务器每个进程都可能以两种方式存在的。前台与后台,所谓前台进程就是用户目前的屏幕上可操作的。后台进程则是实......