首页 > 其他分享 >SpringBoot定时任务实现数据同步

SpringBoot定时任务实现数据同步

时间:2022-11-02 23:14:07浏览次数:58  
标签:同步 SpringBoot url param api org import 定时 String

业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。

方案一:通过轮询接口的方式执行 pullData() 方法实现数据同步

该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。

方案二:通过轮询接口的方式执行 pullDataNew() 方法实现数据同步

该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。

该方法的优点,减少对数据库的频繁操作,提升性能。

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 package com.hxtx.spacedata.task;    import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.api.client.util.Lists; import com.hxtx.spacedata.common.domain.ResponseDTO; import com.hxtx.spacedata.config.SpringContextUtil; import com.hxtx.spacedata.controller.file.FilesMinioController; import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity; import com.hxtx.spacedata.service.entityconfig.EntityPointService; import com.hxtx.spacedata.util.HttpProxyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional;    import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors;       /**  * 中台设备数据 定时任务执行  *  * @author Tarzan Liu  * @version 1.0.0  * @description  * @date 2020/12/07  */ @Component @Slf4j public class EntityPointTask {        @Autowired     private EntityPointService entityPointService;        @Value("${middleGround.server.host}")     private String host;        @Value("${middleGround.server.port}")     private String port;        private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class);        /**      * 设备定义点数据拉取      *      * @author tarzan Liu      * @date 2020/12/2      */     @Scheduled(cron = "0/30 * * * * ?"// 30秒校验一次     public void pullDataTaskByCorn() {         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list");         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray array = jsonObject.getJSONArray("data");             if (array != null && array.size() != 0) {                 for (int i = 0; i < array.size(); i++) {                     JSONObject obj = array.getJSONObject(i);                     String systemId = obj.getString("id");                     pullDataNew(systemId);                 }             }         }     }           @Transactional(rollbackFor = Throwable.class)     public ResponseDTO<String> pullData(String code) {         List<EntityPointEntity> list = Lists.newArrayList();         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray array = jsonObject.getJSONArray("data");             if (array != null && array.size() != 0) {                 for (int i = 0; i < array.size(); i++) {                     JSONObject obj = array.getJSONObject(i);                     String pointId = obj.getString("pointId");                     String name = obj.getString("name");                     list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build());                 }                 List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue));                 if (CollectionUtils.isNotEmpty(existList)) {                     Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue));                     list.forEach(e -> {                         String value = existMap.get(e.getPointId());                         if (value != null) {                             e.setValue(value);                         }                     });                 }                 entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));                 entityPointService.saveBatch(list);             }         }         return ResponseDTO.succ();     }           @Transactional(rollbackFor = Throwable.class)     public ResponseDTO<String> pullDataNew(String code) {         String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code);         JSONObject jsonObject = JSON.parseObject(result);         if (Objects.nonNull(jsonObject)) {             JSONArray data = jsonObject.getJSONArray("data");             List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class);             if (CollectionUtils.isNotEmpty(list)) {                 list.forEach(e -> e.setCode(code));                 List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code));                 if (CollectionUtils.isNotEmpty(existList)) {                     //存在map                     Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));                     //传输map                     Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName));                     //增量                     List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(increment)) {                         entityPointService.saveBatch(increment);                     }                     //减量                     List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(decrement)) {                         entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList()));                     }                     //变量                     List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList());                     if (CollectionUtils.isNotEmpty(variable)) {                         variable.forEach(e -> {                             e.setName(dataMap.get(e.getPointId()));                         });                         entityPointService.updateBatchById(variable);                     }                 else {                     entityPointService.saveBatch(list);                 }             }         }         return ResponseDTO.succ();     }       }

 

数据库对应实体类 

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;    import java.io.Serializable; import java.util.Date;    @Builder @NoArgsConstructor @AllArgsConstructor @Data @TableName(value = "t_entity_point") public class EntityPointEntity implements Serializable {        private static final long serialVersionUID = 2181036545424452651L;        /**      * 定义点id      */     @TableId(value = "id", type = IdType.ASSIGN_ID)     private Long id;        /**      * 定义点id      */     private String pointId;        /**      * 名称      */     private String name;        /**      * 绘制数据      */     private String value;        /**      * 编码      */     private String code;        /**      * 创建时间      */     private Date createTime;    }

 

HTTP请求代理工具类 

 

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 import lombok.extern.slf4j.Slf4j; import org.apache.http.Consts; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils;    import javax.net.ssl.SSLContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URL; import java.net.URLConnection; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map;       /**  * HTTP请求代理类  *  * @author tarzan Liu  * @description 发送Get Post请求  */ @Slf4j public class HttpProxyUtil {        /**      * 使用URLConnection进行GET请求      *      * @param api_url      * @return      */     public static String sendGet(String api_url) {         return sendGet(api_url, """utf-8");     }        /**      * 使用URLConnection进行GET请求      *      * @param api_url      * @param param      * @return      */     public static String sendGet(String api_url, String param) {         return sendGet(api_url, param, "utf-8");     }        /**      * 使用URLConnection进行GET请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空      * @param charset 字符集      * @return      */     public static String sendGet(String api_url, String param, String charset) {         StringBuffer buffer = new StringBuffer();         try {             // 判断有无参数,若是拼接好的url,就不必再拼接了             if (param != null && !"".equals(param)) {                 api_url = api_url + "?" + param;             }             log.info("请求的路径是:" + api_url);             URL realUrl = new URL(api_url);             // 打开联接             URLConnection conn = realUrl.openConnection();             // 设置通用的请求属性             conn.setRequestProperty("accept""*/*");             conn.setRequestProperty("connection""Keep-Alive");             conn.setRequestProperty("user-agent""Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");             conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)             conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)             conn.connect();    // 建立实际的联接                // 定义 BufferedReader输入流来读取URL的相应             try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {                 String line;                 while ((line = in.readLine()) != null) { //                    buffer.append("\n"+line);                     buffer.append(line);                 }             }         catch (Exception e) {             log.error("发送GET请求出现异常! " + e.getMessage());             return null;         }         //  log.info("响应返回数据:" + buffer.toString());         return buffer.toString();     }           /**      * 使用URLConnection进行POST请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空      * @return      */     public static String sendPost(String api_url, String param) {         return sendPost(api_url, param, "utf-8");     }        /**      * 使用URLConnection进行POST请求      *      * @param api_url 请求路径      * @param param   请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空      * @param charset 字符集      * @return      */     public static String sendPost(String api_url, String param, String charset) {         StringBuffer buffer = new StringBuffer();         try {             log.info("请求的路径是:" + api_url + ",参数是:" + param);                URL realUrl = new URL(api_url);             // 打开联接             URLConnection conn = realUrl.openConnection();             // 设置通用的请求属性             conn.setRequestProperty("accept""*/*");             conn.setRequestProperty("connection""Keep-Alive");             conn.setRequestProperty("user-agent""Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)");             conn.setConnectTimeout(12000);    //设置连接主机超时(单位:毫秒)             conn.setReadTimeout(12000);    // 设置从主机读取数据超时(单位:毫秒)                // 发送POST请求必须设置如下两行             conn.setDoOutput(true);             conn.setDoInput(true);                // 获取URLConnection对象对应的输出流             try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {                 out.print(param); // 发送请求参数                 out.flush();// flush输出流的缓冲             }             // 定义 BufferedReader输入流来读取URL的相应,得指明使用UTF-8编码,否则到API<a href="http://www.zhuxianfei.com/server/" target="_blank" class="infotextkey">服务器</a>XML的中文不能被成功识别             try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) {                 String line;                 while ((line = in.readLine()) != null) { //                    buffer.append("\n"+line);                     buffer.append(line);                 }             }         catch (Exception e) {             log.error("发送POST请求出现异常! " + e.getMessage());             e.printStackTrace();         }         log.info("响应返回数据:" + buffer.toString());         return buffer.toString();     }        public static CloseableHttpClient createSSLClientDefault() throws Exception {         SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(nullnew AllTrustStrategy()).build();         SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext);         return HttpClients.custom().setSSLSocketFactory(sslSf).build();     }        // 加载证书     private static class AllTrustStrategy implements TrustStrategy {         public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {             return true;         }     }        /**      * 支持https请求      *      * @param url      * @param param      * @return      * @throws Exception      */     public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception {         CloseableHttpClient httpClient = createSSLClientDefault();         HttpPost httpPost = null;         CloseableHttpResponse response = null;         String result = "";         try {             // 发起HTTP的POST请求             httpPost = new HttpPost(url);             List<NameValuePair> paramList = new ArrayList<NameValuePair>();             for (String key : param.keySet()) {                 paramList.add(new BasicNameValuePair(key, param.get(key)));             }             log.info("http请求地址:" + url + ",参数:" + paramList.toString());             // UTF8+URL编码             httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));             httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build());             response = httpClient.execute(httpPost);             HttpEntity entity = response.getEntity();             int statusCode = response.getStatusLine().getStatusCode();             if (HttpStatus.SC_OK == statusCode) { // 如果响应码是200                }             result = EntityUtils.toString(entity);             log.info("状态码:" + statusCode + ",响应信息:" + result);         finally {             if (response != null) {                 response.close();             }             if (httpPost != null) {                 httpPost.releaseConnection();             }             httpClient.close();         }         return result;     } }

 

希望对大家的学习有所帮助,也希望大家多多支持

标签:同步,SpringBoot,url,param,api,org,import,定时,String
From: https://www.cnblogs.com/yizhiamumu/p/16852876.html

相关文章