Java基础
ArrayList
ArrayList底层数据是动态数组,初始长度为10,每次扩容为原来的1.5倍。
扩容流程:
首先会创建一个新的长度的数组,然后使用Arrays.copyOf()方法将旧的数组中的元素复制到新的数组中,最后会将新插入的数据插入到新的数组中。
IO和NIO的区别
io指的是io流。可以实现数据从磁盘中读取和写入。除了磁盘以外内存、网络都可以作为io流的数据来源或目的地。
java中提供了字符流和字节流两种方式来实现数据流的操作,当程序面向网络进行数据io操作时,java里提供了socket来实现网络的io通信,通过这种方式可以实现数据的网络传输,它属于阻塞io。jdk1.4新增了NIO,对比io来说nio做了优化,也叫做非阻塞io。
HashMap
根据key的hash值取模得到这个插入的下标位置,这种设计可能存在hash冲突问题,导致不同的数据落入到一个下标上。hashmap采用链式寻址法解决hash冲突。对于存在冲突的key,hashmap会存储为一个单向链表,当数组长度超出64并且这个链表长度超出8,链表会进行树化,这种结构转化可以降低时间复杂度。
多线程start()方法和run()方法的区别
1,start方法是用来启动对应的线程的,线程会进入就绪状态,被分配到cpu后就开始执行
2,run方法是Thread里普通方法,将需要并行处理的代码放在run方法里
3,start方法启动线程后自动调用run方法。
一个线程调用两次start方法会出现什么:
会抛出异常,java中规定了线程只能调用一次start。第一次调用start线程会进入就绪状态。再次调用start的意思就是将正在运行的线程重新去运行,从线程安全角度来讲这样都是不可取的,为了避免重复调用start,当多次调用start方法时,会中断此次操作并且抛出异常。
Spring 源码分析
手动实现 IOC
1,创建一个类
public class MaYun {
public void say(){
System.out.println("我对钱不感兴趣");
}
}
2,bean.xml中定义该类
<?xml version="1.0" encoding="utf-8" ?>
<beans>
<bean id="MaYun" class="com.xyt.MaYun"></bean>
</beans>
3,添加解析xml的依赖
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
<version>1.1.6</version>
</dependency>
4,自定义一个IOC容器
public class Ioc {
static HashMap<String,Object> beanMap = new HashMap<>();
// 需要程序启动就获取bean.xml中对象实例,所以放在静态代码块中
static {
InputStream xmlStream = Ioc.class.getClassLoader().getResourceAsStream("bean.xml");
SAXReader saxReader = new SAXReader();
try {
Document document = saxReader.read(xmlStream);
Element rootElement = document.getRootElement();
List<Element> list = rootElement.selectNodes("//bean");
for(Element element:list){
String id = element.attributeValue("id");
String clazz = element.attributeValue("class");
Class<?> aClass = Class.forName(clazz);
Object o = aClass.newInstance();
beanMap.put(id,o);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static Object getBean(String id){
return beanMap.get(id);
}
}
5,测试类中测试自己写的Ioc容器
public class Test {
public static void main(String[] args) {
MaYun maYun = (MaYun)Ioc.getBean("MaYun");
maYun.say();
}
}
Spring 事务
spring事务特性:
原子性:一个事务中 的操作,要么全部执行成功要么全部失败
一致性:事务前后,数据的完整性不变。比如两个账户转账 转账之后两个账户总金额是不会变的
隔离性:允许多个事务,互相隔离
持久性:事务结束后数据会持久化到本地
隔离级别:
defaul默认以连接的数据库隔离级别为准
读未提交 -> 脏读,不可重复读,幻读
读已提交 -> 不可重复读,幻读
可重复读 -> 幻读
串行化 -> 效率低,一般不用
SpringIOC 工作流程
ioc的意思是控制反转,它的核心思想是将对象的控制权交给容器。当需要使用某个对象的实例时,直接从ioc容器中获取即可。这样设计可以降低对象之间的耦合性。spring中提供了很多方式声明bean,比如xml文件中,@Service注解,或者是@Configuration配置类里加@Bean。
SpringIOC容器工作流程可以分为两个阶段。
ioc容器初始化阶段,这个阶段主要是根据程序中定义的xml,或者注解等 bean的声明方式,通过解析和加载后生成BeanDefinition。然后将BeanDefinition注册进IOC容器中。并且会将得到的BeanDefinition的实体放入到Map集合里,从而完成IOC的初始化。IOC容器的作用就是对这些注册的Bean定义信息的管理。
第二阶段是完成Bean的初始化和依赖注入。通过java反射机制将没有设置lazy-init属性的单例bean进行初始化。然后进行依赖注入。最后就可以使用了,通常可以通过@Autowired或者通过BeanFactory里的geBean方法从IOC容器中获取bean的实例。
Spring中有哪些注入方式
1,属性注入 @Autowired @Resource
@Autowired
Service service;
2,setter方法注入
Service service;
@Autowired
public void setService(Service service){
this.service = service;
}
3,构造方法注入: 如果类只有一个构造方法,那么@Autowired注解可以省略。如果类中有多个构造方法,那么需要添加上@Autowired来明确指定到底使用哪个构造方法。
public class AService{
Service service;
@Autowired
public Aservice(Service service){
this.service = service;
}
}
SpringMVC理解
首先springmvc它是属于springframework里的一个模块。它是在servlet基础上构建并且实现了mvc设计模式的web框架。简化了传统的servlet+jsp的开发方式。提高了mvc模式下开发的效率。
Mybatis面试题
#和$的区别
mybatis中提供了#和$两种动态传参的方式。#它相当于是jdbc里的?占位符,通过perparedStatement进行预编译处理的,能够对特殊字符进行转义。可以预防sql注入问题。而$是通过statement赋值的,相当于是字符串的直接拼接,不会进行特殊处理,无法预防sql注入问题。正常情况使用#即可,在动态传递表名和字段名的情况下我们可以使用$。
Mybatis 如何实现分页
1,直接在select语句上添加数据库提供的分页关键字limit,然后在应用程序里传递当前页以及每页的长度
2,使用Mybatis提供的RowBounds(肉 帮的)对象,实现内存级别的分页。
3,通过Mybatis里的Interceptor拦截器实现,在select语句执行前动态拼接分页关键字
Mybatis 缓存机制
一级缓存sqlSession也叫本地缓存,sql在执行前如果命中一级缓存就可以直接读取到缓存中的数据。
如果想要实现跨sqlSession级别的缓存,那一级缓存则无法实现。需要开启二级缓存,使用二级缓存就是当多个用户查询数据的时候。有一个用户查询到了就会把数据放到二级缓存中,这样其他sqlSession就可以读取到二级缓存中的数据。
MYSQL
库操作
create database 库名;
drop database 库名;
use 库名; //使用该库
alter database 库名 character set gbk; //修改数据库编码为gbk
表操作
create table 表名(
id int(20),
name varchar(50)
);
show tables; //查看表
desc 表名; //查看表结构
show create table 表名; //查看建表语句
create table 新表名 like 旧表名; //根据旧表创建相同表
drop table 表名;
alter table 表名 add 字段名 类型; //添加字段
alter table 表名 drop 字段名
慢SQL:
show variables like 'slow_query_log%'; //查看是否开启慢SQL监控,默认OFF关闭
set global slow_query_log=1; // 开启慢SQL监控,1开启,0关闭
增删改查
insert into 表名 values (值1,值2...); //全字段插入
insert into 表名 (字段1,字段2) values (值1,值2); //指定字段插入
update 表名 set 字段名 = 值,字段名 = 值; //修改记录
update 表名 set age = age+1; //修改时加入运算
delete from 表名;
select * from 表名;
其他:去重distinct,升序asc,降序desc,分组group by,
左连接left join左表为基表查出所有 右表查出关联部分
右连接right join右表为基表查出所有 左表查出关联部分
内连接inner join查出两张表交集
面试题
自增主键可能遇到的问题:
使用自增主键分库分表后,会出现主键重复问题,可以考虑使用UUID
可能会出现表锁,主键用完...等问题
InnoDB引擎行锁如何实现的:
InnoDB基于索引实现行锁,有索引的列作为where查询条件,并且使用for update完成行锁,使用不是索引列将完成表锁
索引失效情况:
like以%开头;or前后有不是索引字段时;使用is null,is not null,not,<>,!= 索引失效
索引字段上使用函数时; 全表扫描速度比索引快时
组合索引:alter table tName add index index_name ('col_1','col_2','col_3')
根据col_1,col_2,col_3三列创建名为index_name的组合索引,最左匹配规则。
mysql性能优化
mysql是一个磁盘io访问 非常频繁的关系型数据库。在高并发和高性能的场景中mysql优化方式主要分为几种,
1,搭建mysql主从集群,保证高可用(binlog日志)
2,读写分离设计,在读多写少的场景下通过读写分离,可以避免读写冲突带来的性能问题。实现:mysql-proxy,mycat
3,引入分库分表机制,通过分库可以降低单个服务器节点的io压力,通过分表可以降低单表数据量,从而提升sql执行效率。实现:mycat
4,热点数据可以引入内存数据库,像redis这种,不仅可以缓解mysql数据库的压力,同时还能提升数据检索效率
聚簇索引和非聚簇索引
mysql中主键索引叫做聚簇索引。除了主键索引以为的,其他的比如唯一索引,联合索引,都叫做非聚簇索引,也叫做二级索引。
mysql索引失效情况
1,没有使用索引列作为where的查询条件
2,对索引列进行函数操作
3,对索引列进行类型转换
4,like关键字以%开头
5,使用or关键字前后有一个不是索引列时,索引会失效
6,使用 is null,is not null,not,<>,!=
mysql中各种锁
MVCC 多版本控制
间隙锁:间隙锁可以锁定范围内数据,主要是为了解决幻读问题。
实现:select * from tName where id > 1 and id < 4 for update;
Oracle
建表
create table 表名(
id number,
name varchar2(50),
primary key(id)
)
create table table1 as (select * from table2); //根据table2创建table1 包含数据
插入
insert into tableName(col_1,col_2) select col_1,col_2 from tab2; //要求源表和目标表数据类型匹配
分页查询
** 注意rownum是行数,行数没有0,所以加等于号 **
select * from (
select A.*,rownum rows from tabName A where rownum <= 页数*页长
)
where rows >= (页数-1)*页长+1
Redis
redis哨兵机制和集群的区别
redis集群有几种实现方式,一个是主从集群,一个是Redis Cluster。
主从集群就是在redis集群中包含一个Master节点和多个Slave节点,Master负责数据的读写,而slave负责数据读取。当Master节点收到数据变更会自动同步到slave节点上。通过这样一个架构可以实现redis读写分离,提升数据的查询效率。但是redis的主从集群不提供容错和恢复的功能,一旦Master节点挂掉不会自动选取出新的Master。所以redis还提供哨兵机制,哨兵会监控主从节点的状态,当Master节点出现故障会自动从剩下的slave节点中选举出新的Master。
Redis Cluster实现了redis分布式存储,就是说它每个节点储存不同的数据,实现数据的分片存储功能。redis cluster默认实现哨兵机制。
redis使用场景
1,数据库中的热点数据的缓存
2,排行榜数据
3,作为计数器,比如阅读量,点赞数
4,集群模式下保存session、token等
SpringCloudAlibaba
Nacos注册中心使用
1,下载nacos应用,启动后可以向nacos中注册自己的微服务,启动:startup.cmd -m standalone
2,微服务中添加nacos依赖
3,yml配置文件中添加nacos注册中心地址
4,启动类中添加nacos注册的 注解@EnableDiscoveryClient
Feign客户端使用
1,添加openfeign依赖
2,启动类上加@EnableFeignClients
3,在需要调用其他微服务的接口上添加注解@EnableFeignClient("微服务名")
Sentinel熔断器
常见容错思路:
要防止服务雪崩的扩散,我们就要做好服务的容错。容错说白了就是保护自己不被其他服务拖垮的一些措施。
常见容错思路:
隔离:给调用的每个服务分配最大线程数,即使其中一个服务挂了,也不会导致调用其他服务出问题。
超时:服务超出时间未做出响应,就断开请求释放线程。
限流:限制系统流量的输入和输出
熔断:当下游服务访问压力过大,上游为了保护系统整体可用性暂时切断对下游服务的调用,
熔断三种状态:
熔断关闭:服务没有故障时熔断器所处的状态,对调用不做任何限制
熔断开启:服务接口的调用不再经过网络,直接执行本地fallback方法
半熔断:尝试恢复调用,允许有限的流量调用该服务,监控成功率达到预期则熔断关闭,否则重新进入熔断开启状态
降级:当服务出错时提供一个兜底方案,一旦服务无法调用则使用该方案
Sentinel 引入:
1,添加sentinel依赖
2,下载sentinel应用,是一个jar包,启动后可以访问sentinel控制台
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.0.jar
3,yml文件中添加sentinel配置
spring:
cloud:
sentinel:
transport:
port: 9999 //与sentinel控制台交互的端口
dashboard: localhost:8080 //指定sentinel控制台地址
QPS:每秒查询率
Sentinel 控制台使用:
Sentinel链路控流
1,yml中添加sentinel链路控流配置
sentinel:
web-context-unify: false
2,需要限流的接口/方法上添加注解 @SentinelResource("自定义名称")
@RequestMapping("/trance1")
public String trance1(){
sentinelService.resourceMethod();
return "trance1";
}
@RequestMapping("/trance2")
public String trance2(){
sentinelService.resourceMethod();
return "trance2";
}
@Service
public class SentinelServiceImpl {
@SentinelResource(value = "tranceService")
public String resourceMethod(){
return "业务资源";
}
}
3,trance2和trance1都调用了servic层的resourceMethod方法,现在对trance2的请求控流,设置如下:
服务降级:
最小请求数:每秒钟的最小请求数
慢调用比例
异常比例
sentinel授权
比如对一个接口,只能PC端访问,ios,android不能访问。请求的PC还是ios类型会再header里,可以获取这些信息判断
添加配置类 @Component public class RequestOriginParserDefinition implements RequestOriginParser { @Override public String parseOrigin(HttpServletRequest request) { //在请求中如何获取授权信息 //request.getHeader("type"); String type = request.getParameter("type"); System.out.println("type="+type); return type; } }
对访问auth1接口的终端做限制:只能PC访问
@RequestMapping("auth1") public String auth1(String type){ System.err.println("auth1--"+type); return "auth1"; }
自定义异常返回:
@Component public class ExceptionHandlerPage implements BlockExceptionHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception { response.setContentType("application/json;charset=utf-8"); ResultData data = null; if(e instanceof FlowException){ data = new ResultData(-1,"接口被限流了"); }else if(e instanceof DegradeException){ data = new ResultData(-2,"接口被降级了"); }else if(e instanceof ParamFlowException){ data = new ResultData(-3,"参数限流异常"); }else if(e instanceof AuthorityException){ data = new ResultData(-4,"授权异常"); }else if(e instanceof SystemBlockException){ data = new ResultData(-5,"接口被降级了"); } response.getWriter().write(JSON.toJSONString(data)); } } @Data @AllArgsConstructor @NoArgsConstructor class ResultData{ private int code; private String message; }
接口上使用@SentinelResource:
@RestController public class AnnoController { @RequestMapping("/anno1") @SentinelResource( value = "anno1", blockHandler = "anno1BlockHandler", //限流或降级走这里 fallback = "anno1Fallback" //接口出错兜底fallback方案 ) public String anno1(String name){ if("wolfcode".equals(name)){ throw new RuntimeException(); } return "anno1"; } public String anno1BlockHandler(String name, BlockException e){ return "接口被限流或降级了"; } public String anno1Fallback(String name, Throwable throwable){ return "接口报错"; } }
Feign远程调用时整合Sentinel:
1,yml中添加feing整合sentinel配置
feign: sentinel: enabled: true #开启feign整合sentinel
2,feign远程调用 接口处使用sentinel
@FeignClient(value = "product-service",fallback = ProductFeignFallback.class) public interface ProductFeignApi { @RequestMapping("/product/{pid}") Product findByPid(@PathVariable("pid") Long pid); } ####################################################################################### @Component public class ProductFeignFallback implements ProductFeignApi { @Override public Product findByPid(Long pid) { System.out.println("兜底方案"); return new Product(); } }
Gateway网关
网关的作用:对外暴露访问接口,访问其他微服务类似于在内网。这样其他微服务无法直接访问,只能通过网关。
引入gateway服务:
1,添加gateway依赖,和nacos依赖(因为gateway服务也需要注册进注册中心) 2,yml添加配置 spring: cloud: gateway: discovery: locator: enabled: true # 让gateway网关可以发现nacos中的微服务地址 3,引入gateway网关后,网关默认的转发规则,也可以手动配置路由规则 4,启动类上加不加@EnableDiscoveryClient好像都能注册进注册中心???
自定义网关路由规则:
# 当请求的url,匹配上断言规则 /product-serv/**时,就会将请求转发给product-service服务 spring: cloud: gateway: routes: - id: product_route # 自定义 唯一即可 uri: lb://product-service # 请求的微服务名称,且加了load balance predicates: - Path=/product-serv/** # 请求的路由规则 filters: - StripPrefix=1 # 网关真实请求地址时忽略第一个/product-service/ - id: order_route uri: lb://order-service predicates: - Path=/order-serv/** filters: - StripPrefix=1
自定义Gateway局部过滤器:
** 我们编写完自定义局部过滤器后,需要在yml配置文件 filters: 里添加。编写Filter类名称是有固定格式的 xxGatewayFilterFactory
filters: - StripPrefix=1 #Filter类名称格式固定,所以一定有叫StripPrefixGatewayFilterFactory的类,并且yml这里配置名称也要固定写成 StripPrefix
这里测试,自定义一个时间打印过滤器TimeGatewayFilterFactory.class
filters: - StripPrefix=1 - Time=true,1 # 此处可以写一个或多个参数
然后编写Filter过滤器类
@Component public class TimeGatewayFilterFactory extends AbstractGatewayFilterFactory<TimeGatewayFilterFactory.Config> { public TimeGatewayFilterFactory() { super(Config.class); } @Override public List<String> shortcutFieldOrder() { //return Arrays.asList("show"); 只有一个参数时这样写 return Arrays.asList("show","num"); } @Override public GatewayFilter apply(Config config) { return new GatewayFilter() { public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //过滤器前置执行的逻辑 config.show; config.num; //获取yml中配的数据 ... return chain.filter(exchange).then(Mono.fromRunnable(() -> { //过滤器后置执行的逻辑 ... })); } }; } @Getter @Setter static class Config{ //此处要和yml文件中filters配置的参数数量一致 private boolean show; private Long num; } }
Gateway全局过滤器:
全局过滤器默认拦截所有请求,可以用于身份认证/授权等。我们这里模拟验证token
// 全局过滤器实现GlobalFilter接口,重写方法即可 @Component public class AuthGlobalFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //前置逻辑,这里模拟验证token String token = exchange.getRequest().getQueryParams().getFirst("token"); if(StringUtils.isEmpty(token) || !"模拟值".equals(token)){ //验证token失败,返回401身份认证错误码 System.out.println("token验证失败"); exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } }
网关集成Sentinel:
gateway集成sentinel主要就是,添加依赖和配置,然后在sentinel控制台对gateway模块进行流控规则的添加
1,添加依赖 <dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId> </dependency> 2,网关服务 yml配置sentinel spring: cloud: sentinel: transport: port: 9999 # 与sentinel控制台交互的端口 dashboard: localhost:8080 # 指定sentinel控制台地址
1,网关限流,直接对指定路由名称限流
2,API分组限流,先在控制台的API管理中添加限流规则,然后在请求链路中指定刚添加的API分组
Gateway对错误返回信息 自定义处理
@Component public class GatewayConfiguration { @PostConstruct public void initBlockHandlers(){ BlockRequestHandler blockRequestHandler = new BlockRequestHandler() { @Override public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable throwable) { Map map = new HashMap<>(); map.put("code",0); map.put("message","接口被限流了"); return ServerResponse.status(HttpStatus.OK). contentType(MediaType.APPLICATION_JSON). body(BodyInserters.fromValue(map)); } }; GatewayCallbackManager.setBlockHandler(blockRequestHandler); } }
链路追踪组件
Sleuth日志链:
微服务模式下,一次请求可能会经过多个服务,如果没有日志链将单次请求的日志串起来,定位问题时很容易陷入海量的日志中,无法快速定位问题。
Sleuth使用:
1,在一次请求链路上的服务,添加以下依赖即可在日志中区别这是一次请求 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> 2,多个服务中可以看到关联日志,网关 -> Order订单服务 -> Product商品服务
ZipKin控制台 使用:
ZipKin一般配合sleuth来用,启动后可在ZipKin应用中查看日志
1,启动zipkin jar包,java -jar xx.jar; localhost:9411 2,添加zipkin依赖,因为zipkin默认集成了sleuth,所以把sleuth的依赖替换成zipkin的即可 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency> 3,服务的yml文件中添加zipkin服务地址 spring: zipkin: base-url: http://127.0.0.1:9411/ # zipkin应用地址 discoveryClientEnabled: false # 让nacos把他当作url不要当作服务名去解析 sleuth: sampler: probability: 1.0 # 对日志的采样比例,因为日志太大不可能全部采集
面试题:请求你们是如何定位微服务项目中出现的问题?
可以使用ZipKin+Sleuth的链路追踪组件
Nacos配置中心
微服务架构下配置文件出现的问题:
1,配置文件分散,随着微服务的增多配置文件越来越多,分散在各个微服务中不好统一配置和管理 2,配置文件一旦需要修改,就要手动去各个微服务上维护 3,配置文件服务实时更新,修改后需要重启项目
nacos配置中心的使用:
1,添加nacos配置中心依赖 <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> 2,添加nacos的配置,服务的配置文件不能使用原来的application.yml了,需要使用bootstrap.yml 优先级:bootstrap.properties -> bootstrap.yml -> application.properties -> application.yml spring: application: name: product-service cloud: nacos: config: server-addr: 127.0.0.1:8848 # nacos服务地址 file-extension: yaml # 配置文件格式 profiles: active: dev # 环境标识
3,然后在nacos控制台,添加配置文件。命名根据bootstrap.yml中的application.name和profiles.active组成
@RefreshScope注解可以动态获取nacos配置文件中热更新数据
nacos配置热更新的一些使用场景:
比如系统上线新功能,可以在配置文件中添加一个true/false的开关,这样代码中就可以根据这个参数执行新功能代码还是旧代码(因为怕上线的新功能可能出现问题,好及时切换为之前旧代码)
bootstrap.yml加载后,将加载nacos配置中心的配置文件。applicationName.yml这种不区分环境的先加载,然后会加载类似applicationName-dev.yml或者applicationName-test.yml文件。可以把一些公共的参数放在所有环境都会加载的配置文件中。
读取nacos中其他配置文件:
1,比如读取redis-config.yml文件,需要在bootstrap.yml中指定读取的文件名 spring: cloud: nacos: config: shared-configs: - data-id: redis-config.yml #要读取nacos中的配置 refresh: true #是否热更新
分布式调度Elastic-Job
Elastic-Job的使用依赖zookeeper,需要zookeeper作为注册中心
为什么使用分布式调度:
因为使用Spring定时器,在集群情况下可能会导致任务重复执行。使用分布式锁,保证任务不会重复执行。
Elastic-Job入门使用
1,导入Elastic-Job依赖 <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> 2,自定义任务类,需要实现SimpleJob public class MyElasticJob implements SimpleJob { public void execute(ShardingContext shardingContext) { System.out.println("执行任务:"+new Date()); } } 3,定义配置zookeeper等信息 public class JobDemo { public static void main(String[] args) { // 2个参数:注册中心对象,任务配置对象 new JobScheduler(createRegistryCenter(),createJobConfiguration()).init(); } //注册中心的配置 private static CoordinatorRegistryCenter createRegistryCenter() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"); zookeeperConfiguration.setSessionTimeoutMilliseconds(100); ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } //定时任务配置 private static LiteJobConfiguration createJobConfiguration() { JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("myElasticJob","0/5 * * * * ?",1).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,MyElasticJob.class.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); return simpleJobRootConfig; } } 4,任务调度代码写好后,需要启动zookeeper服务,这也是一个控制台应用,启动后可以用可视化工具连接
Springboot集成Elastic-Job
1,添加springboot整合elastic-job的依赖
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
2,自定义任务类
@Component public class MyElasticJob implements SimpleJob { public void execute(ShardingContext shardingContext) { System.out.println("执行任务:"+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
3,定义配置类:zookeeper地址,任务执行规则
@Configuration public class JobConfig { @Bean(initMethod = "init") public SpringJobScheduler testScheduler(CoordinatorRegistryCenter registryCenter, MyElasticJob job) { return new SpringJobScheduler(job,registryCenter,createJobConfiguration(job.getClass(),"0/5 * * * * ?",1)); } @Bean public CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName); zookeeperConfiguration.setSessionTimeoutMilliseconds(100); ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } public LiteJobConfiguration createJobConfiguration(Class clazz,String cron,int shardingCount) { JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(clazz.getSimpleName(),cron,shardingCount).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } }
Elastic-Job分片案例
案例:一个集群有有两台机器,现在要处理数据库中20条数据。我们不能只让一台机器处理,需要两台机器分配这20条数据,如何分配数据是我们自己定义的,这就叫做分片。比如下面数据,我们按照type类型,分成text / image / vedio / radio 四种数据。
任务调度配置类:
@Configuration public class JobConfig { @Bean(initMethod = "init") public SpringJobScheduler fileScheduler(CoordinatorRegistryCenter registryCenter, FileCustomElasticJob job) { // 此处根据分片为4 ,分片参数为“0=text,1=image,2=radio,3=vedio” return new SpringJobScheduler(job,registryCenter,createJobConfiguration(job.getClass(),"0 0/1 * * * ?",4,"0=text,1=image,2=radio,3=vedio")); } @Bean public CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName); zookeeperConfiguration.setSessionTimeoutMilliseconds(100); ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } public LiteJobConfiguration createJobConfiguration(Class clazz,String cron,int shardingCount, String shardingParam) { JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingCount); if(!StringUtils.isEmpty(shardingParam)){ builder.shardingItemParameters(shardingParam); } JobCoreConfiguration simpleCoreConfig = builder.build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } }
任务调度:
@Component @Slf4j public class FileCustomElasticJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("线程ID:{},任务名称:{},任务参数:{},分片个数:{},分片索引:{},分片参数:{}", Thread.currentThread().getId(), shardingContext.getJobName(), shardingContext.getJobParameter(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()); String type = shardingContext.getShardingParameter(); // 获取分片参数,根据参数 执行不同任务 if(type.equals("image")){ ... }else if(type.equals("text")){ ... } ... } }
Dataflow类型任务调度
使用场景:当数据量很大,我们不可能一次性取出所有数据去处理
Dataflow类型的定时任务需要实现DataflowJob接口。该接口提供2个覆盖方法,分别用于抓取数据和处理数据。以数据流的方式执行调用fetchData抓取数据,直到抓取不到才会停止此次任务
1,编写Dataflow任务代码,需要实现DataflowJob接口 重写抓取和处理2方法 @Component public class FileDataflowJob implements DataflowJob<FileCustom> { @Autowired private FileCustomMapper fileCustomMapper; //抓取数据 @Override public List fetchData(ShardingContext shardingContext) { return fileCustomMapper.selectLimit(2); } //处理数据 @Override public void processData(ShardingContext shardingContext, List<FileCustom> list) { for (FileCustom fileCustom : list) { backUp(fileCustom); } } private void backUp(FileCustom fileCustom){ System.out.println("备份的方法名:"+fileCustom.getName()+";备份的类型:"+fileCustom.getType()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } fileCustomMapper.changeState(fileCustom.getId(),1); } } 2,配置类需要修改成DataflowJob类型 @Configuration public class JobConfig { @Bean(initMethod = "init") public SpringJobScheduler fileDataflowScheduler(CoordinatorRegistryCenter registryCenter, FileDataflowJob job) { return new SpringJobScheduler(job,registryCenter,createJobConfiguration(job.getClass(),"0 0/1 * * * ?",1,null,true)); } @Bean public CoordinatorRegistryCenter registryCenter(@Value("${zookeeper.url}") String url, @Value("${zookeeper.groupName}") String groupName) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(url, groupName); zookeeperConfiguration.setSessionTimeoutMilliseconds(100); ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); regCenter.init(); return regCenter; } public LiteJobConfiguration createJobConfiguration(Class clazz,String cron,int shardingCount, String shardingParam,boolean isDataFlowJob) { JobCoreConfiguration.Builder builder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingCount); if(!StringUtils.isEmpty(shardingParam)){ builder.shardingItemParameters(shardingParam); } JobCoreConfiguration simpleCoreConfig = builder.build(); JobTypeConfiguration jobConfiguration; if(isDataFlowJob){ jobConfiguration = new DataflowJobConfiguration(simpleCoreConfig,clazz.getCanonicalName(),true); }else{ jobConfiguration = new SimpleJobConfiguration(simpleCoreConfig,clazz.getCanonicalName()); } LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build(); return simpleJobRootConfig; } }
日志保存到数据库
@Autowired private DataSource dataSource; @Bean(initMethod = "init") public SpringJobScheduler fileDataflowScheduler(CoordinatorRegistryCenter registryCenter, FileDataflowJob job) { //会在任务执行的时候,将日志自动持久化到数据源中 JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource); return new SpringJobScheduler(job,registryCenter,createJobConfiguration(job.getClass(),"0 0/1 * * * ?",1,null,true),jobEventRdbConfiguration); }
消息中间件
RocketMQ入门案例
1,消息生产者
public class Product { public static void main(String[] args) throws Exception { //定义一个生产对象,名称自定义不重复即可 DefaultMQProducer producer = new DefaultMQProducer("helloGroup"); producer.setNamesrvAddr("127.0.0.1:9876");//rocketmq服务地址 producer.start(); String topic = "helloTopic";//主题 for(int i=0;i<10;i++){ Message msg = new Message(topic,("RocketMQ普通消息:"+i).getBytes(Charset.defaultCharset())); SendResult send = producer.send(msg); System.out.println("发送状态:"+send.getSendStatus()); } producer.shutdown(); } }
2,消息消费者
public class Consumer { public static void main(String[] args) throws Exception{ //定义消息消费者(消费者的名称不能重复) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //设置订阅主题,和生产者设置的主题一样 consumer.subscribe("helloTopic","*"); //设置消息的监听器 consumer.setMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println("线程:"+Thread.currentThread()+ ",消息内容:"+new String(messageExt.getBody(), Charset.defaultCharset())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
三种发送消息方式
同步发送:生产者发送消息给MQ,需要等待消息中间件将消息存储完后,才会响应回去,代码才能继续往下执行
异步发送:发送给MQ后,消息中间件直接响应(此时消息还未完成存储),存储完成后通过回调函数通知存储的结果(成功/失败)
public class Product { public static void main(String[] args) throws Exception { //定义一个生产对象 DefaultMQProducer producer = new DefaultMQProducer("helloGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String topic = "helloTopic"; Message msg = new Message(topic,("RocketMQ普通消息:"+i).getBytes(Charset.defaultCharset())); producer.send(msg, new SendCallback(){ //回调函数 public void onSuccess(SendResult sendResult) { System.out.println("消息存储状态"+sendResult.getSendStatus()); } public void onException(Throwable throwable) { System.out.println("消息发送出现异常"); } }); TimeUnit.SECONDS.sleep(5); producer.shutdown(); } }
一次性发送:没有响应结果,不需要知道消息是否存储在消息中间件中。用于日志存储
Message msg = new Message(topic,"RocketMQ一次性消息:".getBytes(Charset.defaultCharset())); producer.sendOneway(msg);
消息发送模式:
默认就是集群模式(不用添加任何代码)
广播模式:一个消息可以多个消费者同时消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup"); consumer.setNamesrvAddr("127.0.0.1:9876"); //设置订阅主题 consumer.subscribe("helloTopic","*"); //设置消费模式 consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
顺序消费 的实现
将需要顺序消费的消息存储到同一个队列中。
默认情况下RocketMQ没有实现顺序消费,因为RocketMQ里有四个队列 并且它是多线程的去消费消息的,没办法保证顺序消费。 首先要保证同一组的消息发送到同一个队列中,并且该队列只有一个消费者
SpringBoot 整合RocketMQ
1,RocketMQ应用启动:start mqnamesrv.cmd; start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,
RocketMQ启动后需要再启动管控台页面去连接rocketmq,这样就可以进入可视化页面
2,创建消息生产者服务,添加springboot整合rocketmq的依赖,并且yml配置RocketMQ服务地址
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
rocketmq: name-server: 127.0.0.1:9876 producer: group: my-group # 自定义名称
3,生产消息
@SpringBootTest public class RocketMQTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendMsg(){ Message<String> msg = MessageBuilder.withPayload("boot发送同步消息").build(); rocketMQTemplate.send("helloTopicBoot",msg); } }
4,创建消费者服务,添加springboot整合rocketmq的依赖,并且yml配置RocketMQ服务地址
5,消费者监听器
@Component @RocketMQMessageListener(consumerGroup = "conGroup",topic = "helloTopic") public class HelloTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt ext) { System.out.println("监听到的消息:"+new String(ext.getBody(), Charset.defaultCharset())); } }
RabbitMQ和RocketMQ的区别
标签:Java,String,202306,class,return,面试,sentinel,new,public From: https://www.cnblogs.com/xyt666/p/17478667.html