contriller:
package batch; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class BatchController { @Autowired User userMapper; @Autowired BatchServiceImpl batchService; int total = 0; @RequestMapping("/ThreadBatch") public String ThreadBatch(){ ExecutorService pool = Executors.newFixedThreadPool(10); //计数器设置为10个,用来调度主线程和子线程之间关系 CountDownLatch downLatch = new CountDownLatch(10); Object lock = new Object(); for(int i=0;i<10;i++){ pool.submit(() -> { try { int j = 0; while (true) { //处理逻辑 先去数据库更新1000条数据,再查询这一千条数据 //查询数据,若查询出的数据为空,直接break List<JSONObject> datas = new ArrayList<>(); datas = batchService.updataSelect(Thread.currentThread().getId(), j); synchronized (lock) { total += datas.size();//验证total和数据总数是否一样 } System.out.println("线程" + Thread.currentThread().getId() + "查询到" + datas.size() + "条记录"); if (CollectionUtils.isEmpty(datas)) { break; } j++; } } finally { downLatch.countDown(); } }); } try { downLatch.await(); System.out.println("所有子线程执行完毕,total="+total); } catch (InterruptedException e) { throw new RuntimeException(e); } return "ThreadBatch success"; } }
service:
package batch; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @Service public class BatchServiceImpl { @Autowired User userMapper; @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED) public List<JSONObject> updataSelect(long id,int j){ //更新 userMapper.updateBatch(id,j); //查询 List<JSONObject> datas = userMapper.selectBatch(id,j); return datas; } }
Mapper:
mark字段是处理次数,默认值0,更新一次值就加1,如果结束后有大于1的值说明该条数据被重复处理了,deal表示当前线程while循环的次数,为了查出当前线程本次循环
更新的数据.
<update id="updateBatch" > update user set threadNum=${id}, mark=mark+1,deal=${j} where mark=0 limit 1000 </update> <select id="selectBatch" resultType="com.alibaba.fastjson.JSONObject"> select id from user where threadNum=${id} and deal=${j} </select>
标签:java,重复,org,springframework,util,查询数据库,import,多线程,datas From: https://www.cnblogs.com/1--2/p/17097290.html