首页 > 数据库 >Spring Batch:将数据从Web服务处理到MongoDB

Spring Batch:将数据从Web服务处理到MongoDB

时间:2023-06-19 16:34:22浏览次数:48  
标签:返回 Web String Spring List Batch final public name


概观

在这篇文章中,我们将介绍如何创建一个使用Web服务数据并将其插入MongoDB数据库的Spring Batch应用程序。

要求

阅读本文的开发人员必须熟悉Spring Batch(示例)和MongoDB。

环境

注意:批处理也可以在本地运行。

脚本

全局场景步骤是:

  1. 从Web服务读取数据,在这种情况下:https://sunrise-sunset.org/api

获取城市列表的坐标,然后调用API以读取日出和日落日期时间。

2.处理数据并提取业务数据

收集数据的业务处理

3.在MongoDB中插入已处理的数据

将处理过的数据保存为mongo文档

编码

  • 输入:本地文件中JSON格式的城市数据列表,如下所示:
[{“名字”:“Danemark”,“城市”:[{“名字”:“Copenhague”,“lat”:55.676098,“lng”:12.568337,“timeZone”:“CET”},{“名字”:“奥胡斯”,“lat”:56.162939,“lng”:10.203921,“timeZone”:“CET”},{“名字”:“欧登塞”,“lat”:55.39594,“lng”:10.38831,“timeZone”:“CET”},{“名字”:“奥尔堡”,“lat”:57.046707,“lng”:9.935932,“timeZone”:“CET”}]}]我们的场景从本地json文件获取输入数据。映射bean如下:
国豆:
导入 java。io。可序列化 ;导入 java。util。清单 ; 
 
 
进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ; 
 
 
@JsonIgnoreProperties(ignoreUnknown  =  true)public  class  BCountry  实现 Serializable { 
 
 
private  static  final  long  serialVersionUID  =  1L ; 
 
 
私有 字符串 名称 ; 
 
 
私人 名单< BCity >  城市 ; 
 
 
public  BCountry(){super();} 
 
 
public  BCountry(String  name,List < BCity >  cities){super();这个。name  =  name ;这个。城市 =  城市 ;} 
 
 
public  BCountry(String  name){super();这个。name  =  name ;} 
 
 
public  String  getName(){返回 名称 ;} 
 
 
public  void  setName(String  name){这个。name  =  name ;} 
 
 
public  List < BCity >  getCities(){返回 城市 ;} 
 
 
public  void  setCities(List < BCity >  cities){这个。城市 =  城市 ;} 
 
 
 
 
 
 
@覆盖public  int  hashCode(){final  int  prime  =  31 ;int  result  =  1 ;结果 =  黄金 *  结果 +((城市 ==  空)? 0:城市。的hashCode());结果 =  黄金 *  结果 +((名称 ==  空)? 0:名称。的hashCode());返回 结果 ;} 
 
 
@覆盖public  boolean  equals(Object  obj){if(this  ==  obj)返回 true ;if(obj  ==  null)返回 虚假 ;如果(的getClass()!=  OBJ。的getClass())返回 虚假 ;BCountry  other  =(BCountry)obj ;if(cities  ==  null){如果(其他。城市 !=  空)返回 虚假 ;} 否则 如果(!城市。平等(等。城市))返回 虚假 ;if(name  ==  null){如果(其他。名字 !=  空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;返回 true ;} 
 
 
@覆盖public  String  toString(){返回 “BCountry [name =”  +  name  +  “,cities =”  +  cities  +  “]” ;} 
 
 
 
 
}和城市豆:
导入 java。io。可序列化 ; 
 
 
进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ; 
 
 
@JsonIgnoreProperties(ignoreUnknown  =  true)公共 类 BCity  实现 Serializable { 
 
 
private  static  final  long  serialVersionUID  =  1L ; 
 
 
private  String  name,timeZone ;私人 双 拉特,lng ; 
 
 
public  BCity(){super();} 
 
 
public  BCity(String  name,String  timeZone,double  lat,double  lng){super();这个。name  =  name ;这个。timeZone  =  timeZone ;这个。lat  =  lat ;这个。lng  =  lng ;} 
 
 
public  String  getName(){返回 名称 ;} 
 
 
public  void  setName(String  name){这个。name  =  name ;} 
 
 
public  String  getTimeZone(){返回时 区 ;} 
 
 
public  void  setTimeZone(String  timeZone){这个。timeZone  =  timeZone ;} 
 
 
public  double  getLat(){返回 纬度 ;} 
 
 
public  void  setLat(double  lat){这个。lat  =  lat ;} 
 
 
public  double  getLng(){返回 lng ;}public  void  setLng(double  lng){这个。lng  =  lng ;} 
 
 
@覆盖public  String  toString(){返回 “BCity [name =”  +  name  +  “,timeZone =”  +  timeZone  +  “,lat =”  +  lat  +  “,lng =”  +  lng  +  “]” ;} 
 
 
@覆盖public  int  hashCode(){final  int  prime  =  31 ;int  result  =  1 ;长 温度 ;temp  =  Double。doubleToLongBits(lat);result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));temp  =  Double。doubleToLongBits(lng);result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));结果 =  黄金 *  结果 +((名称 ==  空)? 0:名称。的hashCode());结果 =  素 *  结果 +((的timeZone  ==  空)? 0:的timeZone。的hashCode());返回 结果 ;} 
 
 
@覆盖public  boolean  equals(Object  obj){if(this  ==  obj)返回 true ;if(obj  ==  null)返回 虚假 ;如果(的getClass()!=  OBJ。的getClass())返回 虚假 ;BCity  other  =(BCity)obj ;如果(双。doubleToLongBits的(LAT)!=  双。doubleToLongBits的(其他的。LAT))返回 虚假 ;如果(双。doubleToLongBits的(LNG)!=  双。doubleToLongBits的(其他的。LNG))返回 虚假 ;if(name  ==  null){如果(其他。名字 !=  空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;if(timeZone  ==  null){如果(其他。的timeZone  !=  空)返回 虚假 ;} 否则 如果(!的timeZone。平等(其它。的timeZone))返回 虚假 ;返回 true ;} 
 
 
}批量阅读器实现@LineMapper。您可以使读者适应我们的数据源(示例):
导入 java。util。清单 ; 
 
 
进口 组织。弹簧框架。批次。项目。档案。LineMapper ; 
 
 
进口 com。ahajri。批次。豆子。BCountry ;进口 com。fastxml。杰克逊。数据绑定。ObjectMapper ;进口 com。fastxml。杰克逊。数据绑定。类型。CollectionType ; 
 
 
公共 类 BCountryJsonLineMapper  实现了 LineMapper < List < BCountry >> { 
 
 
private  final  ObjectMapper  mapper  =  new  ObjectMapper(); 
 
 
@覆盖public  List < BCountry >  mapLine(String  line,int  lineNumber)throws  Exception {CollectionType  collectionType  = mapper。getTypeFactory()。constructCollectionType(列表。类,BCountry。类);返回 映射器。readValue(line,collectionType);} 
 
 
}处理数据批处理时,检查同一天某个城市的业务处理数据是否已保存在数据库中。在MongoDB的搜索数据的方式就是在这个详细的岗位。
ItemProcessor将@BCountry对象转换为MongoDB Document对象。该过程详述如下:
public  class  BCountryPrayTimeEventItemProcessor  实现 ItemProcessor < List < BCountry >,List < Document >> { 
 
 
private  static  final  String  EVENTS_COLLECTION_NAME  =  “event” ; 
 
 
private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。类); 
 
 
@Autowiredprivate  PrayTimeService  prayTimeService ; 
 
 
@Autowiredprivate  CloudMongoService  cloudMongoService ; 
 
 
@覆盖public  List < Document >  进程(List < BCountry >  items)抛出 Exception {final  List < Document >  docs  =  new  ArrayList <>(); 
 
 
 
 
物品。stream()。forEach(item  - > { 
 
 
final  String  countryName  =  item。getName(); 
 
 
项目。getCities()。stream()。forEach(c  - > {final  Document  prayTimeCityEventDoc  =  new  Document();//循环城市并为今天提取祈祷时间final  String  cityName  =  c。getName();final  String  cityTimeZone  =  c。getTimeZone();final  double  lat  =  c。getLat();final  double  lng  =  c。getLng(); 
 
 
final  LocalDateTime  nowOfCity  =  LocalDateTime。现在(了zoneid。的(cityTimeZone)); 
 
 
 
 
final  QueryParam [] queryParams  =  new  QueryParam [ 5 ]; 
 
 
queryParams [ 0 ] =  新 QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名称(),的cityName);queryParams [ 1 ] =  新 QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名称(),事件类型。PRAY_TIME。名称());queryParams [ 2 ] =  新 QueryParam(“月”,OperatorEnum。EQ。名称(),nowOfCity。getMonthValue());queryParams [ 3 ] =  新 QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名称(),nowOfCity。getDayOfMonth());queryParams [ 4 ] =  新 QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名称(),国家名称);List < Document >  foundEvents  =  null ;尝试 {foundEvents  =  cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);} catch(BusinessException  e1){记录。错误(“====>未找到城市祈祷时间”  +  的cityName  +  “对”  +  nowOfCity。getDayOfMonth()+  “/”+  nowOfCity。getMonthValue());} 
 
 
尝试 {如果(CollectionUtils。的isEmpty(foundEvents)){//祈祷时间尚未创建prayTimeCityEventDoc。put(“country_name”,countryName);prayTimeCityEventDoc。put(“city_name”,cityName);prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件类型。PRAY_TIME。名称());prayTimeCityEventDoc。把(“复发”,RecurringEnum。YEARLY。名称());prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());prayTimeCityEventDoc。put(“lat”,lat);prayTimeCityEventDoc。put(“lng”,lng);prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity)); 
 
 
final  Map < String,Object >  prayInfos  =  prayTimeService。getPrayTimeByLatLngDate(lat,lng,日期。从(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone); 
 
 
prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos))); 
 
 
docs。add(prayTimeCityEventDoc); 
 
 
} else {记录。信息(字符串。格式(“====>祈祷的时间已经存在的城市:%S,月:%d,日:%d” ,cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));}} catch(BusinessException  e){记录。错误(“计算祈祷时间时出问题:”,e);抛出 新的 RuntimeException(e);}});});返回 文档 ;} 
 
 
}批量配置类:
@组态@EnableBatchProcessing@EnableScheduling公共 类 BatchConfiguration { 
 
 
private  static  final  String  SCANDINAVIAN_COUNTRIES_JSON_FILE  =  “scandinavian-countries.json” ;private  static  final  String  EVENT_COLLECTION_NAME  =  “event_collection” ; 
 
 
private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BatchConfiguration。类); 
 
 
@Autowiredprivate  JobBuilderFactory  jobBuilderFactory ; 
 
 
@Autowiredprivate  StepBuilderFactory  stepBuilderFactory ; 
 
 
@Autowired私有的 MLabMongoService  mlabMongoService ; 
 
 
@豆public  ResourcelessTransactionManager  transactionManager(){返回 新的 ResourcelessTransactionManager();} 
 
 
@豆public  MapJobRepositoryFactoryBean  mapJobRepositoryFactory(ResourcelessTransactionManager  txManager)抛出 异常 {MapJobRepositoryFactoryBean  factory  =  new  MapJobRepositoryFactoryBean(txManager);工厂。afterPropertiesSet();返回 工厂 ;} 
 
 
@豆public  JobRepository  jobRepository(MapJobRepositoryFactoryBean  factory)抛出 异常 {return(JobRepository)工厂。getObject();} 
 
 
private  SimpleJobLauncher  jobLauncher ; 
 
 
@豆public  SimpleJobLauncher  jobLauncher(JobRepository  jobRepository){jobLauncher。setJobRepository(jobRepository);return  jobLauncher ;} 
 
 
@PostConstructprivate  void  initJobLauncher(){jobLauncher  =  new  SimpleJobLauncher();} 
 
 
@豆FlatFileItemReader < List < BCountry >>  reader(){FlatFileItemReader < List < BCountry >>  reader  =  new  FlatFileItemReader <>();读者。setName(“scandinaviandCountriesReader”);读者。setResource(new  ClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));读者。setLineMapper(new  BCountryJsonLineMapper());回报 读者 ;} 
 
 
@豆public  ItemWriter < List < Document >>  writer(){返回 新的 ItemWriter < List < Document >>(){@覆盖public  void  write(List <? extends  List < Document >>  items)抛出 Exception {尝试 {如果(!CollectionUtils。的isEmpty(项目)&&  项目。大小()>  0){List < Document >  flatDocs  =  items。stream()。flatMap(List :: stream)。收集(收藏家。toList());mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);} else {记录。警告(“没有事件可以救......”);}} catch(BusinessException  e){抛出 新的 RuntimeException(e);}}};} 
 
 
@豆public  BCountryTimeEventItemProcessor  processor(){返回 新的 BCountryTimeEventItemProcessor();} 
 
 
@豆public  Job  scandvTimeJob(){返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new  RunIdIncrementer())。流程(step1())。结束()。build();} 
 
 
@豆public  Step  step1(){返回 stepBuilderFactory。得到(“step1”)。< List < BCountry >,List < Document >> chunk(10)。读者(读者())。处理器(处理器())。作家(作家())。build();}// end :: jobstep [] 
 
 
//每天午夜15分钟@Scheduled(cron  =  “0 15 0 * * *”)public  void  startScandvEventTimeJob()throws  Exception {记录。info(“====>工作开始时间:”  +  新 日期());JobParameters  param  =  new  JobParametersBuilder()。addString(“作业ID” ,字符串。的valueOf(系统。的currentTimeMillis()))。toJobParameters();JobExecution  执行 =  jobLauncher。run(scandvPrayTimeJob(),param);记录。信息(“====>工作完成了状态:”  +  执行。的getStatus());}}部署de Batch到Heroku:
git add。git commit -m  “Deploy Batch”git push heroku master注意:要禁用默认批量启动,请将此添加到application.yml
s p r i n g:b a t c h:j o b:      Ë Ñ 一个b 升é d:˚F 一升小号ë

标签:返回,Web,String,Spring,List,Batch,final,public,name
From: https://blog.51cto.com/u_16145034/6515631

相关文章

  • BUUCTF:[CFI-CTF 2018]webLogon capture
    https://buuoj.cn/challenges#[CFI-CTF%202018]webLogon%20capturelogon.pcapng包的数量很少,随便抓个包跟踪一下流即可发现passwordPSC:\Users\Administrator>php-r"var_dump(urldecode('%20%43%46%49%7b%31%6e%73%33%63%75%72%33%5f%6c%30%67%30%6e%7d%20'));"Co......
  • window wsl 无法访问flink webui
    https://blog.csdn.net/weixin_38988171/article/details/126012785修改flink配置文件rest.bind-address:localhost为rest.bind-address:0.0.0.0......
  • java WebUploader 分块上传
    ​ 我们平时经常做的是上传文件,上传文件夹与上传文件类似,但也有一些不同之处,这次做了上传文件夹就记录下以备后用。首先我们需要了解的是上传文件三要素:1.表单提交方式:post(get方式提交有大小限制,post没有)2.表单的enctype属性:必须设置为multipart/form-data.3.表单必须......
  • php WebUploader 分块上传
    ​ PHP用超级全局变量数组$_FILES来记录文件上传相关信息的。1.file_uploads=on/off 是否允许通过http方式上传文件2.max_execution_time=30 允许脚本最大执行时间,超过这个时间就会报错3.memory_limit=50M 设置脚本可以分配的最大内存量,防止失控脚本占用过多内存,此......
  • IntelliJ IDEA 中设置 Spring Boot 测试单元的调试环境为test
    在IntelliJIDEA中设置SpringBoot测试单元的调试环境为test,你可以按照以下步骤进行操作:打开IntelliJIDEA,并导航到你的项目。打开你的测试类,或者创建一个新的测试类。在测试类中找到你要调试的测试方法。在测试方法的左侧,你会看到一个灰色的调试按钮(一个带有虫......
  • Spring高手之路5——彻底掌握Bean的生命周期
    1.理解Bean的生命周期1.1生命周期的各个阶段在SpringIOC容器中,Bean的生命周期大致如下:实例化:当启动Spring应用时,IOC容器就会为在配置文件中声明的每个<bean>创建一个实例。属性赋值:实例化后,Spring就通过反射机制给Bean的属性赋值。调用初始化方法:如果Bean配置了初始化方法,Spring......
  • 小鹿线Web前端好不好?
    现在web前端开发开发技术在不断地迭代更新,有很多从事前端开发的程序员在技术上会遇到瓶颈,这个时候小伙伴就应该通过不断的学习开发技术知识,来提升自身的开发技术水平,那小伙伴应该怎么来学习呢?1.梳理清楚知识体系框架学习前端开发技术,不管是入门还是进阶,一定都要有知识体系建设......
  • spring-boot 自动切换环境
    spring.profiles.active:@[email protected]<profiles><profile><!--开发环境--><id>dev</id><properties><profiles.active>dev</profiles.active......
  • 4. SpringMVC获取请求参数
    1.通过ServletAPI获取‍将HttpServletRequest作为控制器方法的形参,此时HttpServletRequest类型的参数表示封装了当前请求的请求报文的对象‍​​‍2.通过控制器方法的形参获取请求参数‍在控制器方法的形参位置,设置和请求参数同名的形参,当浏览器发送请求,匹配到......
  • SprintBoot JavaWeb访问提示 Full authentication is required to access this resour
    SprintBoot部署好网站之后访问没有异常,但是配置域名地址至Nginx上时登录请求报错了,经查询是因为项目是前后端分离,请求的路由会加上工程的主路径,所以需要在Nginx多配置一个地址,如Location/{http://localhost:8080/project}location/project/{http://loc......