配置调度任务告警机器人,如果使用wechat插件,需要企业在微信中生成的秘钥,秘钥只有公司管理员才会生成,鉴于执行太过麻烦,因此直接通过webhook的方式配置机器人。
0X00 开通群组机器
在企业微信的群组中,在右上角的三个小点点中,点击下拉找到“添加群机器人”,生成一条webhook url。下图中左侧是告警的实际效果。



0X01 编写脚本
参考企微SDK接口,配合大数据调度平台的script alter的能力,编写符合SDK接口数据的格式。下面是一些常见的告警内容,dolphinAlterServer通过-c参数传入脚本,特别说明,DAS传入三个参数,-t 是告警标题,-c 是告警内容,-p 是用户自定义参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| [ { "projectCode": 9714355320480, "projectName": "煤炭交易", "owner": "admin", "processId": 458, "processDefinitionCode": 10046305882272, "processName": "【CCTD】所有指标调度流-5-20230630023000986", "processType": "RECOVER_TOLERANCE_FAULT_PROCESS", "processState": "SUCCESS", "recovery": "YES", "runTimes": 2, "processStartTime": "2023-06-30 02:30:01", "processEndTime": "2023-06-30 21:12:49", "processHost": "10.10.1.4:5678" } ]
[ { "projectCode": 9714355320480, "projectName": "煤炭交易", "owner": "admin", "processId": 11310, "processDefinitionCode": 10046305882272, "processName": "【CCTD】所有指标调度流-5-20230703023000443", "processType": "RECOVER_SERIAL_WAIT", "processState": "FAILURE", "runTimes": 1, "processStartTime": "2023-07-03 02:30:00", "processHost": "10.10.1.4:5678", "event": "TIME_OUT", "warnLevel": "MIDDLE" } ]
[ { "type": "WORKER", "host": "/nodes/worker/default/10.10.1.4:1234", "event": "SERVER_DOWN", "warningLevel": "SERIOUS" } ]
[ { "type": "MASTER", "host": "/nodes/master/10.10.1.5:5678", "event": "SERVER_DOWN", "warningLevel": "SERIOUS" } ]
[ { "projectCode": 9804816236064, "projectName": "煤炭工商", "owner": "admin", "processId": 4065, "processDefinitionCode": 9804833177504, "processName": "煤企工商信息-15-20230630092835811", "taskCode": 9804952245280, "taskName": "煤企同步到ADS", "taskType": "DATAX", "taskState": "FAILURE", "taskStartTime": "2023-06-30 22:45:21", "taskEndTime": "2023-06-30 22:47:16", "taskHost": "10.10.1.5:1234", "logPath": "/var/log/udp/2.0.0.0/dolphinscheduler/worker-server/logs/20230630/9804833177504_15-4065-11605.log" } ]
|
企业支持简单的markdown语法,可以将任务执行的结果、状态、耗时等信息组装为数据包传入钩子。

| #!/bin/bash while getopts "t:c:p:" opts; do case $opts in t) t_value=$OPTARG ;; c) c_value=$OPTARG ;; p) p_value=$OPTARG ;; ?) ;; esac done echo $c_value" \n" >> /srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/output.log
dataCount=$(echo "$c_value" | jq length) if [[ $dataCount -gt 0 ]]; then if echo "$c_value" | jq 'map(has("processType")) | any' | grep -q "true"; then projectCode=$(echo "$c_value" | jq '.[0].projectCode' | tr -d '"') projectName=$(echo "$c_value" | jq '.[0].projectName' | tr -d '"') owner=$(echo "$c_value" | jq '.[0].owner' | tr -d '"') runTimes=$(echo "$c_value" | jq '.[0].runTimes' | tr -d '"') processId=$(echo "$c_value" | jq '.[0].processId' | tr -d '"') processDefinitionCode=$(echo "$c_value" | jq '.[0].processDefinitionCode' | tr -d '"') processName=$(echo "$c_value" | jq '.[0].processName' | tr -d '"') processHost=$(echo "$c_value" | jq '.[0].processHost' | tr -d '"') processState=$(echo "$c_value" | jq '.[0].processState' | tr -d '"') processStartTime=$(echo "$c_value" | jq '.[0].processStartTime' | tr -d '"') if echo "$c_value" | jq '. | map(has("'"$key"'")) | contains([false])'; then endTimeNotExists=true processEndTime=$(date +"%Y-%m-%d %H:%M:%S") else processEndTime=$(echo "$c_value" | jq '.[0].processEndTime' | tr -d '"') fi startTimestamp=$(date -d "$processStartTime" +%s) endTimestamp=$(date -d "$processEndTime" +%s)
duration=$((endTimestamp - startTimestamp)) seconds=$((duration % 60)) minutes=$((duration / 60 % 60)) hours=$((duration / 3600)) if [[ $hours -eq 0 && $minutes -eq 0 ]]; then runTime="$seconds 秒" elif [[ $hours -eq 0 ]]; then runTime="$minutes 分钟 $seconds 秒" else runTime="$hours 小时 $minutes 分钟 $seconds 秒" fi
if [ "$endTimeNotExists" = "true" ]; then title="<font color=\\\"warning\\\">工作流执行预警</font>" else if [ "${processState}" = "SUCCESS" ]; then title="<font color=\\\"info\\\">工作流执行预警</font>" else title="<font color=\\\"warning\\\">工作流执行预警</font>" fi fi content="**${title}** \\n >**项目名称**:${projectName} >**负责人**:${owner} >**工作流实例**:${processName} >**工作流Code**:${processDefinitionCode} >**执行结果**:<font color=\\\"info\\\">${processState}</font> >**执行次数**:${runTimes} >**执行节点**:${processHost} >**开始时间**:${processStartTime}"
if [ -z "$endTimeNotExists" ]; then content="${content} >**结束时间**:${processEndTime}" fi if echo "$c_value" | jq 'has("event")' | grep -q "true"; then event=$(echo "$c_value" | jq '.[0].event' | tr -d '"') content=${content}" >**事件类型**: ${event}" fi
if [ "$endTimeNotExists" = "true" ]; then content=${content}" >**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>" else if [ "${processState}" = "SUCCESS" ]; then content=${content}" >**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>" else content=${content}" >**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>" fi fi elif echo "$c_value" | jq 'map(has("type")) | any' | grep -q "true"; then type=$(echo "$c_value" | jq '.[0].type' | tr -d '"') host=$(echo "$c_value" | jq '.[0].host' | tr -d '"') event=$(echo "$c_value" | jq '.[0].event' | tr -d '"') title="服务节点异常告警" content="**<font color=\\\"warning\\\">${title}</font>** >**服务类型**: ${type} >**主机节点**:${host} >**告警事件**:${event}" elif echo "$c_value" | jq 'map(has("taskType")) | any' | grep -q "true"; then projectCode=$(echo "$c_value" | jq '.[0].projectCode' | tr -d '"') projectName=$(echo "$c_value" | jq '.[0].projectName' | tr -d '"') owner=$(echo "$c_value" | jq '.[0].owner' | tr -d '"') processId=$(echo "$c_value" | jq '.[0].processId' | tr -d '"') processDefinitionCode=$(echo "$c_value" | jq '.[0].processDefinitionCode' | tr -d '"') processName=$(echo "$c_value" | jq '.[0].processName' | tr -d '"')
taskCode=$(echo "$c_value" | jq '.[0].taskCode' | tr -d '"') taskName=$(echo "$c_value" | jq '.[0].taskName' | tr -d '"') taskType=$(echo "$c_value" | jq '.[0].taskType' | tr -d '"') taskState=$(echo "$c_value" | jq '.[0].taskState' | tr -d '"')
taskStartTime=$(echo "$c_value" | jq '.[0].taskStartTime' | tr -d '"') taskEndTime=$(echo "$c_value" | jq '.[0].taskEndTime' | tr -d '"') taskHost=$(echo "$c_value" | jq '.[0].taskHost' | tr -d '"') startTimestamp=$(date -d "$taskStartTime" +%s) endTimestamp=$(date -d "$taskEndTime" +%s)
duration=$((endTimestamp - startTimestamp)) seconds=$((duration % 60)) minutes=$((duration / 60 % 60)) hours=$((duration / 3600))
if [[ $hours -eq 0 && $minutes -eq 0 ]]; then runTime="$seconds 秒" elif [[ $hours -eq 0 ]]; then runTime="$minutes 分钟 $seconds 秒" else runTime="$hours 小时 $minutes 分钟 $seconds 秒" fi
if [ "${taskState}" = "FAILURE" ]; then title="<font color=\\\"warning\\\">任务执行失败告警</font>" else title="<font color=\\\"info\\\">任务执行预警</font>" fi content="**${title}** \\n >**项目名称**:${projectName} >**负责人**:${owner} >**工作流实例**:${processName} >**工作流Code**:${processDefinitionCode} >**任务编码**:${taskCode} >**任务名称**:${taskName} >**任务类型**:${taskType} >**执行结果**:<font color=\\\"info\\\">${taskState}</font> >**执行节点**:${taskHost} >**开始时间**:${taskStartTime} >**结束时间**:${taskEndTime} >**任务耗时**:${runTime}"
fi
json='{"msgtype": "markdown","markdown": {"content": "'"${content}"'"}}'
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97d2d7e6-6e2c-439a-a830-82886008f76c' \ -H 'Content-Type: application/json' \ -d "$json" >> /srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/output.log
else echo "数据对象不存在" fi
exit 0
|
0x03 配置告警
将上面写好的脚本保存在/srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/wechat.sh
中,文件名随意取。
打开大数据调度平台的安全中心菜单,找到告警实例管理,添加告警组件配置。

选择shell脚本,实例名称自定义。
插件选择:script
告警类型:sucess 只有成功才发送,failure 只有失败才发送 all 成功失败均发送消息
自定义参数:无实际意义,一般用于三方接口的鉴权等,可指定参数
脚本路径:AlertServer部署服务器上文件存储的路径,也就是上面保存脚本的路径。切记,脚本备份,如果遇到故障,AlterServer可能会迁移到其他节点。
接下来配置告警组管理:

保存后,在项目管理的任务流以及任务流调度中,可以配置告警组。

一切就绪,上面第一张截图就是执行效果了!