概观
在这篇文章中,我们将介绍如何创建一个使用Web服务数据并将其插入MongoDB数据库的Spring Batch应用程序。
要求
阅读本文的开发人员必须熟悉Spring Batch(示例)和MongoDB。
环境
- Mongo数据库部署在MLab中。请按照本快速入门中的步骤操作。
- 批处理应用程序部署在Heroku PaaS中。详情 请看这里。
- IDE STS或IntelliJ或Eclipse。
- Java 8 JDK。
注意:批处理也可以在本地运行。
脚本
全局场景步骤是:
- 从Web服务读取数据,在这种情况下:https://sunrise-sunset.org/api
获取城市列表的坐标,然后调用API以读取日出和日落日期时间。
2.处理数据并提取业务数据
收集数据的业务处理
3.在MongoDB中插入已处理的数据
将处理过的数据保存为mongo文档
编码
- 输入:本地文件中JSON格式的城市数据列表,如下所示:
[{“名字”:“Danemark”,“城市”:[{“名字”:“Copenhague”,“lat”:55.676098,“lng”:12.568337,“timeZone”:“CET”},{“名字”:“奥胡斯”,“lat”:56.162939,“lng”:10.203921,“timeZone”:“CET”},{“名字”:“欧登塞”,“lat”:55.39594,“lng”:10.38831,“timeZone”:“CET”},{“名字”:“奥尔堡”,“lat”:57.046707,“lng”:9.935932,“timeZone”:“CET”}]}]我们的场景从本地json文件获取输入数据。映射bean如下:
国豆:
导入 java。io。可序列化 ;导入 java。util。清单 ;
进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)public class BCountry 实现 Serializable {
private static final long serialVersionUID = 1L ;
私有 字符串 名称 ;
私人 名单< BCity > 城市 ;
public BCountry(){super();}
public BCountry(String name,List < BCity > cities){super();这个。name = name ;这个。城市 = 城市 ;}
public BCountry(String name){super();这个。name = name ;}
public String getName(){返回 名称 ;}
public void setName(String name){这个。name = name ;}
public List < BCity > getCities(){返回 城市 ;}
public void setCities(List < BCity > cities){这个。城市 = 城市 ;}
@覆盖public int hashCode(){final int prime = 31 ;int result = 1 ;结果 = 黄金 * 结果 +((城市 == 空)? 0:城市。的hashCode());结果 = 黄金 * 结果 +((名称 == 空)? 0:名称。的hashCode());返回 结果 ;}
@覆盖public boolean equals(Object obj){if(this == obj)返回 true ;if(obj == null)返回 虚假 ;如果(的getClass()!= OBJ。的getClass())返回 虚假 ;BCountry other =(BCountry)obj ;if(cities == null){如果(其他。城市 != 空)返回 虚假 ;} 否则 如果(!城市。平等(等。城市))返回 虚假 ;if(name == null){如果(其他。名字 != 空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;返回 true ;}
@覆盖public String toString(){返回 “BCountry [name =” + name + “,cities =” + cities + “]” ;}
}和城市豆:
导入 java。io。可序列化 ;
进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)公共 类 BCity 实现 Serializable {
private static final long serialVersionUID = 1L ;
private String name,timeZone ;私人 双 拉特,lng ;
public BCity(){super();}
public BCity(String name,String timeZone,double lat,double lng){super();这个。name = name ;这个。timeZone = timeZone ;这个。lat = lat ;这个。lng = lng ;}
public String getName(){返回 名称 ;}
public void setName(String name){这个。name = name ;}
public String getTimeZone(){返回时 区 ;}
public void setTimeZone(String timeZone){这个。timeZone = timeZone ;}
public double getLat(){返回 纬度 ;}
public void setLat(double lat){这个。lat = lat ;}
public double getLng(){返回 lng ;}public void setLng(double lng){这个。lng = lng ;}
@覆盖public String toString(){返回 “BCity [name =” + name + “,timeZone =” + timeZone + “,lat =” + lat + “,lng =” + lng + “]” ;}
@覆盖public int hashCode(){final int prime = 31 ;int result = 1 ;长 温度 ;temp = Double。doubleToLongBits(lat);result = prime * result +(int)(temp ^(temp >>> 32));temp = Double。doubleToLongBits(lng);result = prime * result +(int)(temp ^(temp >>> 32));结果 = 黄金 * 结果 +((名称 == 空)? 0:名称。的hashCode());结果 = 素 * 结果 +((的timeZone == 空)? 0:的timeZone。的hashCode());返回 结果 ;}
@覆盖public boolean equals(Object obj){if(this == obj)返回 true ;if(obj == null)返回 虚假 ;如果(的getClass()!= OBJ。的getClass())返回 虚假 ;BCity other =(BCity)obj ;如果(双。doubleToLongBits的(LAT)!= 双。doubleToLongBits的(其他的。LAT))返回 虚假 ;如果(双。doubleToLongBits的(LNG)!= 双。doubleToLongBits的(其他的。LNG))返回 虚假 ;if(name == null){如果(其他。名字 != 空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;if(timeZone == null){如果(其他。的timeZone != 空)返回 虚假 ;} 否则 如果(!的timeZone。平等(其它。的timeZone))返回 虚假 ;返回 true ;}
}批量阅读器实现@LineMapper。您可以使读者适应我们的数据源(示例):
导入 java。util。清单 ;
进口 组织。弹簧框架。批次。项目。档案。LineMapper ;
进口 com。ahajri。批次。豆子。BCountry ;进口 com。fastxml。杰克逊。数据绑定。ObjectMapper ;进口 com。fastxml。杰克逊。数据绑定。类型。CollectionType ;
公共 类 BCountryJsonLineMapper 实现了 LineMapper < List < BCountry >> {
private final ObjectMapper mapper = new ObjectMapper();
@覆盖public List < BCountry > mapLine(String line,int lineNumber)throws Exception {CollectionType collectionType = mapper。getTypeFactory()。constructCollectionType(列表。类,BCountry。类);返回 映射器。readValue(line,collectionType);}
}处理数据批处理时,检查同一天某个城市的业务处理数据是否已保存在数据库中。在MongoDB的搜索数据的方式就是在这个详细的岗位。
ItemProcessor将@BCountry对象转换为MongoDB Document对象。该过程详述如下:
public class BCountryPrayTimeEventItemProcessor 实现 ItemProcessor < List < BCountry >,List < Document >> {
private static final String EVENTS_COLLECTION_NAME = “event” ;
private static final Logger LOG = LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。类);
@Autowiredprivate PrayTimeService prayTimeService ;
@Autowiredprivate CloudMongoService cloudMongoService ;
@覆盖public List < Document > 进程(List < BCountry > items)抛出 Exception {final List < Document > docs = new ArrayList <>();
物品。stream()。forEach(item - > {
final String countryName = item。getName();
项目。getCities()。stream()。forEach(c - > {final Document prayTimeCityEventDoc = new Document();//循环城市并为今天提取祈祷时间final String cityName = c。getName();final String cityTimeZone = c。getTimeZone();final double lat = c。getLat();final double lng = c。getLng();
final LocalDateTime nowOfCity = LocalDateTime。现在(了zoneid。的(cityTimeZone));
final QueryParam [] queryParams = new QueryParam [ 5 ];
queryParams [ 0 ] = 新 QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名称(),的cityName);queryParams [ 1 ] = 新 QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名称(),事件类型。PRAY_TIME。名称());queryParams [ 2 ] = 新 QueryParam(“月”,OperatorEnum。EQ。名称(),nowOfCity。getMonthValue());queryParams [ 3 ] = 新 QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名称(),nowOfCity。getDayOfMonth());queryParams [ 4 ] = 新 QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名称(),国家名称);List < Document > foundEvents = null ;尝试 {foundEvents = cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);} catch(BusinessException e1){记录。错误(“====>未找到城市祈祷时间” + 的cityName + “对” + nowOfCity。getDayOfMonth()+ “/”+ nowOfCity。getMonthValue());}
尝试 {如果(CollectionUtils。的isEmpty(foundEvents)){//祈祷时间尚未创建prayTimeCityEventDoc。put(“country_name”,countryName);prayTimeCityEventDoc。put(“city_name”,cityName);prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件类型。PRAY_TIME。名称());prayTimeCityEventDoc。把(“复发”,RecurringEnum。YEARLY。名称());prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());prayTimeCityEventDoc。put(“lat”,lat);prayTimeCityEventDoc。put(“lng”,lng);prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));
final Map < String,Object > prayInfos = prayTimeService。getPrayTimeByLatLngDate(lat,lng,日期。从(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);
prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos)));
docs。add(prayTimeCityEventDoc);
} else {记录。信息(字符串。格式(“====>祈祷的时间已经存在的城市:%S,月:%d,日:%d” ,cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));}} catch(BusinessException e){记录。错误(“计算祈祷时间时出问题:”,e);抛出 新的 RuntimeException(e);}});});返回 文档 ;}
}批量配置类:
@组态@EnableBatchProcessing@EnableScheduling公共 类 BatchConfiguration {
private static final String SCANDINAVIAN_COUNTRIES_JSON_FILE = “scandinavian-countries.json” ;private static final String EVENT_COLLECTION_NAME = “event_collection” ;
private static final Logger LOG = LoggerFactory。getLogger(BatchConfiguration。类);
@Autowiredprivate JobBuilderFactory jobBuilderFactory ;
@Autowiredprivate StepBuilderFactory stepBuilderFactory ;
@Autowired私有的 MLabMongoService mlabMongoService ;
@豆public ResourcelessTransactionManager transactionManager(){返回 新的 ResourcelessTransactionManager();}
@豆public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager txManager)抛出 异常 {MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(txManager);工厂。afterPropertiesSet();返回 工厂 ;}
@豆public JobRepository jobRepository(MapJobRepositoryFactoryBean factory)抛出 异常 {return(JobRepository)工厂。getObject();}
private SimpleJobLauncher jobLauncher ;
@豆public SimpleJobLauncher jobLauncher(JobRepository jobRepository){jobLauncher。setJobRepository(jobRepository);return jobLauncher ;}
@PostConstructprivate void initJobLauncher(){jobLauncher = new SimpleJobLauncher();}
@豆FlatFileItemReader < List < BCountry >> reader(){FlatFileItemReader < List < BCountry >> reader = new FlatFileItemReader <>();读者。setName(“scandinaviandCountriesReader”);读者。setResource(new ClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));读者。setLineMapper(new BCountryJsonLineMapper());回报 读者 ;}
@豆public ItemWriter < List < Document >> writer(){返回 新的 ItemWriter < List < Document >>(){@覆盖public void write(List <? extends List < Document >> items)抛出 Exception {尝试 {如果(!CollectionUtils。的isEmpty(项目)&& 项目。大小()> 0){List < Document > flatDocs = items。stream()。flatMap(List :: stream)。收集(收藏家。toList());mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);} else {记录。警告(“没有事件可以救......”);}} catch(BusinessException e){抛出 新的 RuntimeException(e);}}};}
@豆public BCountryTimeEventItemProcessor processor(){返回 新的 BCountryTimeEventItemProcessor();}
@豆public Job scandvTimeJob(){返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new RunIdIncrementer())。流程(step1())。结束()。build();}
@豆public Step step1(){返回 stepBuilderFactory。得到(“step1”)。< List < BCountry >,List < Document >> chunk(10)。读者(读者())。处理器(处理器())。作家(作家())。build();}// end :: jobstep []
//每天午夜15分钟@Scheduled(cron = “0 15 0 * * *”)public void startScandvEventTimeJob()throws Exception {记录。info(“====>工作开始时间:” + 新 日期());JobParameters param = new JobParametersBuilder()。addString(“作业ID” ,字符串。的valueOf(系统。的currentTimeMillis()))。toJobParameters();JobExecution 执行 = jobLauncher。run(scandvPrayTimeJob(),param);记录。信息(“====>工作完成了状态:” + 执行。的getStatus());}}部署de Batch到Heroku:
git add。git commit -m “Deploy Batch”git push heroku master注意:要禁用默认批量启动,请将此添加到application.yml
s p r i n g:b a t c h:j o b: Ë Ñ 一个b 升é d:˚F 一升小号ë
标签:返回,Web,String,Spring,List,Batch,final,public,name
From: https://blog.51cto.com/u_16145034/6515631