首页 > 其他分享 >使用dataX进行大数据推送

使用dataX进行大数据推送

时间:2023-12-12 11:35:14浏览次数:32  
标签:echo target month source dataX table 推送 数据 datax

  针对大数据量推送,常规的推送工具推送效率很慢,比如kettle ,上千万的数据用时太长,因此,我使用了datax进行推送,1600万用时10分钟,2800万用时20分钟。用datax推送的效率很高

  在datax安装好了之后,推送的配置文件主要是配置 json 文件,全量推送可以放置在 /datax/job  中

  但是如果是增量推送,可以放置在 /datax/bin 主要用三个文件:json文件、sh文件、最大时间记录文件

       json 文件

{
    "job": {
        "content": [
            {
                "reader": {     #数据库读取信息
                  "name": "mysqlreader",
                  "parameter": {
                    "username": "账户",
                    "password": "密码",
                    "column": [],
                    "splitPk": "",
                    "connection": [
                      {
                        "querySql": [
                          "select id, name from table_name"
                        ],
                        "jdbcUrl": [
                          "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8"
                        ]
                      }
                    ]
                  }
                },
                "writer": {
                    "name": "mysqlwriter",  #数据库写入信息
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8",
                                "table": [
                                    "table_name"
                                ]
                            }
                        ],
                        "username": "账户",
                        "password": "密码",
                        "session": [],
                        "writeMode": "insert" #写入方式,可以插入、更新等等方式
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

文件配置好了之后,执行以下命令进行单次执行任务

python /root/datax/bin/datax.py /root/datax/bin/XXX.json   

为了实现增量方式,那么在tale_name 可以使用 变量来代替 :{source_table}  ,同时可以用 {source_db_where}进行条件判断

 增量 sh文件 内容如下

#!/bin/sh
function exitWhenError(){
  if [ $1 -eq 1 ]
  then
      echo 'datax执行出错'$2
      exit 1
  fi
}

n=0
#circle_count:循环次数,1为执行1次循环,利用循环的意思是,每次执行一部分数据,循环之前多少次跑完,其目的是防止一次datax执行失败之后,能够继续按照现在的进度推送数据,相对来说,这种所花的时间更长
circle_count=31
#begintime='2023-10-29 18:00:00'
#endtime='2023-10-29 20:00:00'
source_table='table_name'
target_table='table_name'
column_name='extime'
echo '同步的表:'$source_table

#以下是如果是将一张表的数据按照月份分成多张表推送,可以使用一下命令进行数据表的切换,这样就可以不用每个月都需要重新修改表名,如果不需要分表就可以不用这个命令 # this_month_long=$(date -d "$day" "+%Y%m") # this_month=${this_month_long:0:6} # last_month_long=$(date -d "$day - 1 month" "+%Y%m") # last_month=${last_month_long:0:6} # next_month_long=$(date -d "$day + 1 month" "+%Y%m") # next_month=${next_month_long:0:6} # source_table_this_month=$source_table$this_month # target_table_this_month=$target_table"_"$this_month # source_table_last_month=$source_table$last_month # target_table_last_month=$target_table"_"$last_month # source_table_next_month=$source_table$next_month # target_table_next_month=$target_table"_"$next_month # echo "this_month:"$this_month",last_month:"$last_month,"next_month:"$next_month files_path=$(cd $(dirname $0);pwd) echo 'files_path:'$files_path #取上次同步时间 RESULT_FILE=`ls $files_path/${target_table}_data_max_time` begintime=`cat $RESULT_FILE` endtime=$(date -d "$day" "+%Y-%m-%d %H:%M:%S") # 计算时间差,如果业务中时间存在时间差,比如业务在入库之后,会有一个小时或者是一段时间的时间差,可以使用以下命令进行推送 delta=$(($(date +%s -d "${endtime}")-$(date +%s -d "${begintime}"))); # 计算打印结果 diffSeconds=$delta #秒 diffMinutes=$(expr $delta / 60) #分钟 diffHours=$(expr $delta / 3600) #小时 echo “Time difference between $begintime and $endtime is $diffHours hours $diffMinutes minutes $diffSeconds seconds delta $delta” let time_interval=$delta/$circle_count+1 echo 'time_interval:'$time_interval #如果时间差大于1小时,则停止抽取 # if [ ${diffSeconds} -gt 36000 ]; then # echo echo '本次执行时间差为diffSeconds:'$diffSeconds',停止抽取数据' # exit 1 # fi #如果每次循环时间间隔小于60秒,则不进行拆分 if [ ${time_interval} -lt 60 ]; then echo '循环时间间隔小于60秒' time_interval=$delta circle_count=1 echo echo 'time_interval:'$time_interval',circle_count:'$circle_count fi while (( $n <= $circle_count-1 )) do echo '第'$n'循环' let m=$n+1 echo 'm:'$m #当前循环起始秒数、终止秒数 let circle_seconds=$n*$time_interval circle_timestamp=$(date +%s -d "${begintime}") let new_begin_time=$circle_timestamp+$circle_seconds let new_end_time=$circle_timestamp+$circle_seconds+$time_interval echo '第'$n'循环,''new_begin_time:'$new_begin_time',new_end_time:'$new_end_time sql_begin_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_begin_time}") sql_end_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_end_time}") echo 'sql_begin_time:'$sql_begin_time',sql_end_time:'$sql_end_time source_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " target_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " echo 'source_db_where:'$source_db_where echo 'target_db_where:'$target_db_where cur_time=$(date -d "$day - 0 minute" "+%Y-%m-%d %H:%M:%S") echo "cur_time:" $cur_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_this_month/g" \ -e "s/{target_table}/$target_table_this_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 1 echo "==============保存当次循环时间记录==============" echo $sql_end_time>$files_path/${target_table}_data_max_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_last_month/g" \ -e "s/{target_table}/$target_table_last_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 2 sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_next_month/g" \ -e "s/{target_table}/$target_table_next_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 3 (( n++ )) done echo "==============保存当前时间记录==============" echo $endtime>$files_path/${target_table}_data_max_time

最大时间记录文件

记录数据表中的最大时间

增量文件配置好了之后,使用以下命令执行

bash /root/datax/bin/XXXX.sh

 而定时调度可以通过Linux服务器的crontab定时器 进行配置

crontab -e #编辑定时调度
crontab -l #查看定时调度
*/30 * * * * su - root -c /root/datax/bin/hive/XXX.sh #定时执行一条任务 */30 * * * * su - root -c /root/datax/bin/hive/XXX.sh > /root/datax/bin/hive/XXX.log #定时执行一条任务 ,并打印日志
*/30 * * * *  bash /root/datax/bin/XXXX.sh #多种方式都可以尝试一下
*/30 * * * * python /root/datax/bin/datax.py /root/datax/bin/XXX.json#多种方式都可以尝试一下

 

标签:echo,target,month,source,dataX,table,推送,数据,datax
From: https://www.cnblogs.com/zhu-qi/p/17896311.html

相关文章

  • 数据库学习
    前言IDEA集成了众多插件,方便开发者使用,使用其自带的Database模块就可以很方便的配置、连接到数据库。查看MySQL运行状态先启动MySQL,快捷键win+r然后输入services.msc,查找MySQL然后启动MySQLServer。IDEA开启数据库连接模块点击View->ToolWindows->Database开启数据库模块......
  • 【教程】制作 iOS 推送证书
    ​ 【教程】制作iOS推送证书如需向iOS设备推送数据,您首先需要在消息推送控制台上配置iOS推送证书。iOS推送证书用于推送通知,本文将介绍消息推送服务支持的证书类型,并引导您制作iOS推送证书。证书类型消息推送服务仅支持ApplePushService类型的证书。有关苹果证......
  • 达梦数据库重新初始化
    1.删除原数据库实例服务进入数据库安装目录./script/root/dm_service_uninstaller.sh-nDmServicedmdbDmServicedmdb是数据库实例的默认名称忘记名称可以在系统服务下查询Dm相关service2.删除数据库目录rm-rf/data/dm-data/DAMENG/*3.创建数据库实例切换到达梦用......
  • 【HarmonyOS】Web组件使用setResponseIsReady+setResponseData实现异步自定义响应数据
    【问题描述】在web组件的自定义响应数据方法如下:Web().onInterceptRequest((event)=>{…})如果需要在callbak中如果使用Promise等获取异步信息,并读取该如何操作 【解决方案】通过setResponseIsReady+setResponseData的方式控制数据返回,先设置setResponseIsReady为fal......
  • Oracle-修改数据库密码
    当Oracle数据库用户的密码过期时,你可以采取以下步骤来处理:1、连接到数据库:使用具有管理员权限的账户(比如SYS或SYSTEM用户)连接到Oracle数据库。查看过期用户:运行以下SQL查询语句查看已过期的用户列表:SELECTusernameFROMdba_usersWHEREaccount_status='EXPIRED......
  • 【开源项目推荐】-支持GPT的智能数据库客户端与报表工具——Chat2DB
    2023年是人工智能爆火的一年,ChatGPT为首的一系列的大模型的出现,让生成式人工智能彻底火了一把。但有人会说,GPT对于我们数据开发来说并没有什么作用啊?今天为大家推荐的开源项目,就是GPT在数据领域的一个优秀实践项目。让我们一起来看看吧~Chat2DB是一个集成了ChatGPT功能的数据库S......
  • 数据库在线修改表结构
    在线修改表结构在业务系统运行的过程中随意删改字段,会造成重大事故。常规的做法是业务停机,维护表结构,但是不影响正常业务的表结构是允许在线修改的。【ALTERTABLE修改表结构的弊病】由于修改表结构是表级锁,因此在修改表结构时,影响表写入操作如果修改表结构失败,必须还原表结......
  • GridView的复制粘贴和Excel数据处理
    首先开启GridView的属性:view.OptionsClipboard.AllowCopy=DefaultBoolean.True;//允许复制view.OptionsClipboard.CopyColumnHeaders=DefaultBoolean.False;//是否复制表头view.OptionsClipboard.PasteMode=DevExpress.Export.PasteMode.Append;//粘贴模式view.Optio......
  • docker部署PostgreSQL数据库(带有postgis插件)
    1、拉PostgreSQL(带有postgis插件)镜像dockerpullmdillon/postgis2、启动数据库容器dockerrun--namepostgis-ePOSTGRES_PASSWORD=postgis-p5432:5432-dmdillon/postgis:latest参数解释:--namepostgis为容器指定一个名称;-p5432:5432:指定端口映射,格式为:......
  • ETL-txt数据转换为Excel数据
    前言:  将txt文件中的数据抽取出来,然后装载到Excel中。 具体操作步骤:  数据准备id,name,age,gender,province,city,region,phone,birthday,hobby,register_date392456197008193000,张三,20,0,北京市,昌平区,回龙观,18589407692,1970-8-19,美食;篮球;足球,2018-8-69442......