ThinkPHP6定时任务同步千万级流水数据
多数据源配置
自定义指令
<?php
declare (strict_types = 1);
namespace app\command\SyncDtaTask;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use think\facade\Db;
/**
* 分批同步数据
*/
class DevLogSyncCmd extends Command
{
protected function configure()
{
// 指令配置
$this->setName('DevLogSyncCmd')
->setDescription('分批次同步流水数据');
}
protected function execute(Input $input, Output $output)
{
$tableList = [
'dev_log',
];
//$citys = Db::query('SELECT id FROM city');
//同步城市固定写死
$citys = [37];
foreach ($tableList as $item) {
foreach ($citys as $city) {
// 检查今年的流水表
$todayTable = date('Y');
$tblName = "{$item}_{$city}_{$todayTable}";
$res = Db::execute("show tables like '{$tblName}'");
if ($res === 0) {
$output->writeln('补建流水表'.$item.'_'.$city['id'].'_'.$todayTable);
Db::execute("create table {$tblName} like {$item}");
}
// 获取源数据库和目标数据库的连接
$sourceDb = Db::connect("source1");
$targetDb = Db::connect();
//获取待同步最大值,现从目的数据库中获取
$maxSyncedIdResult = $targetDb->query("SELECT MAX(id) as max_id FROM {$tblName}");
//如果没获取到,则取源数据库的最大ID
$maxSyncedId = $maxSyncedIdResult[0]['max_id'] ?? 0;
if (!$maxSyncedId){
$maxSyncedIdResult = $sourceDb->query("SELECT MAX(id) as max_id FROM {$tblName}");
$maxSyncedId = ($maxSyncedIdResult[0]['max_id'] ?? 0) - 10000;
}
// 获取源库中的数据
$sourceData = $sourceDb->query("SELECT * FROM {$tblName} WHERE id > {$maxSyncedId} ORDER BY id desc LIMIT 1000");
//方案一: 直接使用插入方法
//$targetDb->name($tblName)->insertAll($sourceData);
//方案二: 需要处理字段
if (!empty($sourceData)) {
// 准备插入语句
$insertQuery = "INSERT INTO {$tblName} (" . implode(", ", array_keys($sourceData[0])).",flag" . ") VALUES ";
// 准备参数绑定的占位符和值
$insertValues = [];
$params = [];
foreach ($sourceData as $index => $row) {
$rowValues = [];
foreach ($row as $key => $value) {
//$paramKey = ":{$key}_{$index}";
//$params[$paramKey] = $value;
$rowValues[] = !empty($value)?"'".$value."'":"null";
}
$insertValues[] = "(" . implode(", ", $rowValues) .",1" . ")";
}
//每一行的value值用逗号分割
$insertQuery .= implode(", ", $insertValues);
// 执行插入操作
$targetDb->execute($insertQuery);
$output->writeln("已同步 " . count($sourceData) . " 条记录到 {$tblName}。");
} else {
$output->writeln("没有需要同步的数据。");
}
}
}
}
}