针对大数据量推送,常规的推送工具推送效率很慢,比如kettle ,上千万的数据用时太长,因此,我使用了datax进行推送,1600万用时10分钟,2800万用时20分钟。用datax推送的效率很高
在datax安装好了之后,推送的配置文件主要是配置 json 文件,全量推送可以放置在 /datax/job 中
但是如果是增量推送,可以放置在 /datax/bin 主要用三个文件:json文件、sh文件、最大时间记录文件
json 文件
{ "job": { "content": [ { "reader": { #数据库读取信息 "name": "mysqlreader", "parameter": { "username": "账户", "password": "密码", "column": [], "splitPk": "", "connection": [ { "querySql": [ "select id, name from table_name" ], "jdbcUrl": [ "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8" ] } ] } }, "writer": { "name": "mysqlwriter", #数据库写入信息 "parameter": { "column": [ "id", "name" ], "connection": [ { "jdbcUrl": "jdbc:mysql://IP:端口号/数据库名?useUnicode=true&characterEncoding=UTF8", "table": [ "table_name" ] } ], "username": "账户", "password": "密码", "session": [], "writeMode": "insert" #写入方式,可以插入、更新等等方式 } } } ], "setting": { "speed": { "channel": "5" } } } }
文件配置好了之后,执行以下命令进行单次执行任务
python /root/datax/bin/datax.py /root/datax/bin/XXX.json
为了实现增量方式,那么在tale_name 可以使用 变量来代替 :{source_table} ,同时可以用 {source_db_where}进行条件判断
增量 sh文件 内容如下
#!/bin/sh function exitWhenError(){ if [ $1 -eq 1 ] then echo 'datax执行出错'$2 exit 1 fi } n=0 #circle_count:循环次数,1为执行1次循环,利用循环的意思是,每次执行一部分数据,循环之前多少次跑完,其目的是防止一次datax执行失败之后,能够继续按照现在的进度推送数据,相对来说,这种所花的时间更长 circle_count=31 #begintime='2023-10-29 18:00:00' #endtime='2023-10-29 20:00:00' source_table='table_name' target_table='table_name' column_name='extime' echo '同步的表:'$source_table
#以下是如果是将一张表的数据按照月份分成多张表推送,可以使用一下命令进行数据表的切换,这样就可以不用每个月都需要重新修改表名,如果不需要分表就可以不用这个命令 # this_month_long=$(date -d "$day" "+%Y%m") # this_month=${this_month_long:0:6} # last_month_long=$(date -d "$day - 1 month" "+%Y%m") # last_month=${last_month_long:0:6} # next_month_long=$(date -d "$day + 1 month" "+%Y%m") # next_month=${next_month_long:0:6} # source_table_this_month=$source_table$this_month # target_table_this_month=$target_table"_"$this_month # source_table_last_month=$source_table$last_month # target_table_last_month=$target_table"_"$last_month # source_table_next_month=$source_table$next_month # target_table_next_month=$target_table"_"$next_month # echo "this_month:"$this_month",last_month:"$last_month,"next_month:"$next_month files_path=$(cd $(dirname $0);pwd) echo 'files_path:'$files_path #取上次同步时间 RESULT_FILE=`ls $files_path/${target_table}_data_max_time` begintime=`cat $RESULT_FILE` endtime=$(date -d "$day" "+%Y-%m-%d %H:%M:%S") # 计算时间差,如果业务中时间存在时间差,比如业务在入库之后,会有一个小时或者是一段时间的时间差,可以使用以下命令进行推送 delta=$(($(date +%s -d "${endtime}")-$(date +%s -d "${begintime}"))); # 计算打印结果 diffSeconds=$delta #秒 diffMinutes=$(expr $delta / 60) #分钟 diffHours=$(expr $delta / 3600) #小时 echo “Time difference between $begintime and $endtime is $diffHours hours $diffMinutes minutes $diffSeconds seconds delta $delta” let time_interval=$delta/$circle_count+1 echo 'time_interval:'$time_interval #如果时间差大于1小时,则停止抽取 # if [ ${diffSeconds} -gt 36000 ]; then # echo echo '本次执行时间差为diffSeconds:'$diffSeconds',停止抽取数据' # exit 1 # fi #如果每次循环时间间隔小于60秒,则不进行拆分 if [ ${time_interval} -lt 60 ]; then echo '循环时间间隔小于60秒' time_interval=$delta circle_count=1 echo echo 'time_interval:'$time_interval',circle_count:'$circle_count fi while (( $n <= $circle_count-1 )) do echo '第'$n'循环' let m=$n+1 echo 'm:'$m #当前循环起始秒数、终止秒数 let circle_seconds=$n*$time_interval circle_timestamp=$(date +%s -d "${begintime}") let new_begin_time=$circle_timestamp+$circle_seconds let new_end_time=$circle_timestamp+$circle_seconds+$time_interval echo '第'$n'循环,''new_begin_time:'$new_begin_time',new_end_time:'$new_end_time sql_begin_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_begin_time}") sql_end_time=$(date "+%Y-%m-%d %H:%M:%S" -d "@${new_end_time}") echo 'sql_begin_time:'$sql_begin_time',sql_end_time:'$sql_end_time source_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " target_db_where=" where $column_name > '$sql_begin_time' and $column_name <= '$sql_end_time' and $column_name <= '$endtime' " echo 'source_db_where:'$source_db_where echo 'target_db_where:'$target_db_where cur_time=$(date -d "$day - 0 minute" "+%Y-%m-%d %H:%M:%S") echo "cur_time:" $cur_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_this_month/g" \ -e "s/{target_table}/$target_table_this_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 1 echo "==============保存当次循环时间记录==============" echo $sql_end_time>$files_path/${target_table}_data_max_time sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_last_month/g" \ -e "s/{target_table}/$target_table_last_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 2 sed -e "s/{source_db_where}/$source_db_where/g" \ -e "s/{target_db_where}/$target_db_where/g" \ -e "s/{source_table}/$source_table_next_month/g" \ -e "s/{target_table}/$target_table_next_month/g" \ $files_path/$target_table.json > $files_path/${target_table}_tmp.json python /root/datax/bin/datax.py $files_path/${target_table}_tmp.json exitWhenError $? 3 (( n++ )) done echo "==============保存当前时间记录==============" echo $endtime>$files_path/${target_table}_data_max_time
最大时间记录文件
记录数据表中的最大时间
增量文件配置好了之后,使用以下命令执行
bash /root/datax/bin/XXXX.sh
而定时调度可以通过Linux服务器的crontab定时器 进行配置
crontab -e #编辑定时调度
crontab -l #查看定时调度
*/30 * * * * su - root -c /root/datax/bin/hive/XXX.sh #定时执行一条任务 */30 * * * * su - root -c /root/datax/bin/hive/XXX.sh > /root/datax/bin/hive/XXX.log #定时执行一条任务 ,并打印日志
*/30 * * * * bash /root/datax/bin/XXXX.sh #多种方式都可以尝试一下
*/30 * * * * python /root/datax/bin/datax.py /root/datax/bin/XXX.json#多种方式都可以尝试一下
标签:echo,target,month,source,dataX,table,推送,数据,datax From: https://www.cnblogs.com/zhu-qi/p/17896311.html