首页 > 其他分享 >DataX-Web增量配置

DataX-Web增量配置

时间:2023-12-18 10:35:30浏览次数:40  
标签:Web 配置 job 任务 DataX 增量 lastTime id

一、根据日期进行增量数据抽取

1.页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中5个步骤进行配置

null

  • 1.任务类型选DataX任务

  • 2.辅助参数选择时间自增

  • 3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。

  • 4.增量时间字段,-DlastTime=’%s’ -DcurrentTime=’%s’ 先来解析下这段字符串

    1.-D是DataX参数的标识符,必配
    2.-D后面的lastTime和currentTime是DataX json中where条件的时间字段标识符,必须和json中的变量名称保持一致
    3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致
    4.注意-DlastTime='%s'和-DcurrentTime='%s'中间有一个空格,空格必须保留并且是一个空格

  • 5.时间格式,可以选择自己数据库中时间的格式,也可以通过json中配置sql时间转换函数来处理

注意,注意,注意: 配置一定要仔细看文档(后面我们也会对这块配置进行优化,避免大家犯错)

2.JSON配置

datax.json

{  
  "job": {  
    "setting": {  
      "speed": {  
        "channel": 16  
      }  
    },  
    "content": [  
      {  
        "reader": {  
          "name": "mysqlreader",  
          "parameter": {  
            "splitPk": "id",  
            "username": "root",  
            "password": "root",  
            "column": [  
              "*"  
  
            ],  
            "connection": [  
              {  
                  
                "jdbcUrl": [  
                  "jdbc:mysql://localhost:3306/test?characterEncoding=utf8"  
                ],  
                "querySql": [  
        "select * from test_list where operationDate >= FROM_UNIXTIME($ {lastTime}) and operationDate < FROM_UNIXTIME($ {currentTime})"  
                                ]  
              }  
            ]  
          }  
        },  
        "writer": {  
          "name": "mysqlwriter",  
          "parameter": {  
             
            "username": "root",  
            "password": "123456",  
            "column": [  
              "*"  
            ],  
            "batchSize": "4096",  
            "connection": [  
              {  
                "jdbcUrl": "jdbc:mysql://localhost:3307/test?characterEncoding=utf8",  
                "table": [  
                  "test_list"  
                ]  
              }  
            ]  
          }  
        }  
      }  
    ]  
  }  
}  

querySql解析

select * from test_list where operationDate >= $ {lastTime} and operationDate < $ {currentTime}  
  • 1.此处的关键点在{lastTime},{currentTime},${}是DataX动态参数的固定格式,lastTime,currentTime就是我们页面配置中 -DlastTime=’%s’ -DcurrentTime=’%s’中的lastTime,currentTime,注意字段一定要一致。

  • 2.如果任务配置页面,时间类型选择为时间戳但是数据库时间格式不是时间戳,例如是:2019-11-26 11:40:57 此时可以用FROM_UNIXTIME(${lastTime})进行转换。

    select * from test_list where operationDate >= FROM_UNIXTIME($ {lastTime}) and operationDate < FROM_UNIXTIME($ {currentTime})

二、根据自增Id进行增量数据抽取

1.页面任务配置

打开菜单任务管理页面,选择添加任务

按下图中4个步骤进行配置

  • 1.任务类型选DataX任务

  • 2.辅助参数选择主键自增

  • 3.增量主键开始ID选择,即sql中查询ID的开始ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该ID被更新为上一次的任务触发时最大的ID,任务失败不更新。

  • 4.增量时间字段,-DstartId=’%s’ -DendId=’%s’ 先来解析下这段字符串

    1.-D是DataX参数的标识符,必配
    2.-D后面的startId和endId是DataX json中where条件的id字段标识符,必须和json中的变量名称保持一致,endId是任务在每次执行时获取当前表maxId,也是下一次任务的startId
    3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致
    4.注意-DstartId='%s'和-DendId='%s' 中间有一个空格,空格必须保留并且是一个空格
    5.reader数据源,选择任务同步的读数据源
    6.配置reader数据源中需要同步数据的表名及该表的主键

注意,注意,注意: 一定要仔细看文档(后续会对这块配置进行优化,避免大家犯错)

2.JSON配置

datax.json

 
{  
   "job": {  
     "setting": {  
       "speed": {  
         "channel": 3,  
         "byte": 1048576  
       },  
       "errorLimit": {  
         "record": 0,  
         "percentage": 0.02  
       }  
     },  
     "content": [  
       {  
         "reader": {  
           "name": "mysqlreader",  
           "parameter": {  
             "username": "yRjwDFuoPKlqya9h9H2Amg==",  
             "password": "yRjwDFuoPKlqya9h9H2Amg==",  
             "splitPk": "",  
             "connection": [  
               {  
                 "querySql": [  
                   "select * from job_log where id>= $ {startId} and id< $ {endId}"  
                 ],  
                 "jdbcUrl": [  
                   "jdbc:mysql://localhost:3306/datax_web"  
                 ]  
               }  
             ]  
           }  
         },  
         "writer": {  
           "name": "mysqlwriter",  
           "parameter": {  
             "username": "mCFD+p1IMsa0rHicbQohcA==",  
             "password": "PhYxJmA/nuBJD1OxKTRzZH8sxuRddOv83hdqDOVR+i0=",  
             "column": [  
               "`id`",  
               "`job_group`",  
               "`job_id`",  
               "`job_desc`",  
               "`executor_address`",  
               "`executor_handler`",  
               "`executor_param`",  
               "`executor_sharding_param`",  
               "`executor_fail_retry_count`",  
               "`trigger_time`",  
               "`trigger_code`",  
               "`trigger_msg`",  
               "`handle_time`",  
               "`handle_code`",  
               "`handle_msg`",  
               "`alarm_status`",  
               "`process_id`",  
               "`max_id`"  
             ],  
             "connection": [  
               {  
                 "table": [  
                   "job_log"  
                 ],  
                 "jdbcUrl": "jdbc:mysql://47.98.125.243:3306/datax_web"  
               }  
             ]  
           }  
         }  
       }  
     ]  
   }  
 }  

querySql解析

select * from job_log where id>= $ {startId} and id< $ {endId}  
  • 1.此处的关键点在{startId},{endId},${}是DataX动态参数的固定格式,startId,endId就是我们页面配置中 -DstartId=’%s’ -DendId=’%s’中的startId,endId,注意字段一定要一致

更多详情:http://www.ai2news.com/blog/2844737/

标签:Web,配置,job,任务,DataX,增量,lastTime,id
From: https://www.cnblogs.com/zhipeng-wang/p/17910473.html

相关文章

  • Etcd Web UI
    etcdkeeper支持v3的api1.安装dockerrun-it-d--nameetcdkeeper\-p8080:8080\deltaprojects/etcdkeeper2.访问web本机ip:80803.登陆etcd数据库......
  • 【Loading】Web_ctfshow_WriteUp | _新手必刷_菜狗杯
    1-web签到题目分析读代码:<?php//注释信息/*#-*-coding:utf-8-*-#@Author:h1xa#@Date:2022-11-1017:20:38#@LastModifiedby:h1xa#@LastModifiedtime:2022-11-1109:38:59#@email:h1xa@ctfer.com#@link:https://ctfer.com*/error_r......
  • 阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
    湖仓一体(LakeHouse)是大数据领域的重要发展方向,提供了流批一体和湖仓结合的新场景。阿里云AnalyticDB for MySQL基于 Apache Hudi 构建了新一代的湖仓平台,提供日志、CDC等多种数据源一键入湖,在离线计算引擎融合分析等能力。本文将主要介绍AnalyticDB for MySQL基于Apache ......
  • 使用Selenium进行Web自动化操作
    我们可使用Selenium进行Web自动化操作。一、环境搭建:1.下载安装Pycharm2.在Pycharm中新建项目3.在Pycharm中安装Selenium(FIle->Settings->Project:项目名->PythonInterpreter->加号->搜索添加Selenium)4.下载符合浏览器版本的chromedriver.exe114之前chromedriver驱动版本:淘......
  • 使用JS和C#完成websocket双向通讯
    写在前面:微软官方对websocket的直接支持很差,教程也写得不用心。还要用户自己去转字节数组和字符串,太过分了!毕竟主推SignalR。本文是在官方教程的基础上,对其进行了一些简单的讲解,和方法提取、封装,以期降低学习难度。步骤描述:1、随便建了个普通的mvc项目(任意带控制器的.net项......
  • [前端][Vue] 利用webstorage API存储数据
    关于webstorageAPI官方文档https://developer.mozilla.org/en-US/docs/Web/API/Web_Storage_API省流说明1️⃣两种--localStorage&sessionStorage2️⃣存多久--local一直存到除非用户主动删,session等浏览器关掉就删3️⃣API--一样的,存setItem,读getItem,删一个remove,删全部......
  • 赣CTF-WEB-WP
    赣CTF-WEB-WP第一次出题,✌们轻喷~~~水果忍者~出的JS小游戏签到题,看前端中的all.js文件,即可找到flag。Ez_RCe反引号命令执行preg_match("/flag|cat|tac|system|\\$|\\.|\\\\|exec|pass/i")这边过滤了许多关键字我们知道在linux中反引号``也可以执行系统命令,只不过不会回......
  • asp.net基于WEB层面的云LIS系统平台源码
    随着计算机技术在检验管理方面的广泛应用,以及各种先进的检验仪器在检验医学领域的使用,检验科室对信息化管理提出了更高的要求。正是在这样的背景下开发出了实验室信息管理系统(简称LIS)结合当今各检验科管理及实验室规模的不同状况,充分吸收当今IT科技的最新成就,开发出以高度产品化......
  • 使用DNS查询Web服务器IP地址
    浏览器并不具备访问网络的功能,其最终是通过操作系统实现的,委托操作系统访问服务器时提供的并不是浏览器里面输入的域名而是ip地址,因此第一步需要将域名转换为对应的ip地址域名:www.baidu.comip地址是一串数字tcp/ip的网络结构:计算机通过集线器连接在一起构成一个个子网,子网......
  • 使用js和nodejs完成websocket双向通讯
    如题。感谢AI。先用js完成一个最简单的例子。web端:html:<!DOCTYPEhtml><html><head><metacharset="utf-8"><title></title><scriptsrc="j1.js"></script></head>......