首页 > 数据库 >通过Redis+Mysql来自定义Spring-Statemachine的持久化

通过Redis+Mysql来自定义Spring-Statemachine的持久化

时间:2023-07-30 17:14:48浏览次数:52  
标签:return Statemachine val workflow Spring Redis fun id WAIT

我们在使用Spring状态机的时候,往往需要对于StateMachine持久化操作,但是官方为我们提供的基于redis的持久化并不是特别好,一方面是因为只存redis容易导致数据丢失,另一方面因为状态机的特性需要对应的StateMachine的数据永久有效,导致redis中的key永不过期。

我现在希望实现将StateMachine持久化到数据库跟redis中,redis的有效期为3天,查询redis中没有再去查询数据库,然后更新到redis中。

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>1.6.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.statemachine</groupId>
            <artifactId>spring-statemachine-kryo</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.statemachine</groupId>
            <artifactId>spring-statemachine-starter</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

我们这里拿一个简单的审批流程举例,流程有这么几种状态CREATE、WAIT_REVIEW、WAIT_MODIFY、COMPLETED

enum class WorkflowState(val key: Int, val desc: String) {
    CREATE(1,"已创建"),
    WAIT_REVIEW(2,"已提交,待审核"),
    WAIT_MODIFY(3,"已驳回,待提交"),
    COMPLETED(4,"已完成");

    companion object {
        fun getByKey(key: Int): WorkflowState {
            for (e in WorkflowState.values()) {
                if (e.key == key) {
                    return e
                }
            }
            throw RuntimeException("enum not exists.")
        }
    }
}

对于流程的推进有这么几个事件

enum class WorkflowStateChangeEvent {
    /**
     *  提交
     */
    SUBMIT,

    /**
     *  拒绝
     */
    REJECT,

    /**
     *  重新提交
     */
    RE_SUBMIT,

    /**
     *  同意
     */
    AGREE

}

我们来配置一下流程中各个状态与事件的关系

@Configuration
@EnableStateMachine(name = ["workflowStateMachine"])
class WorkflowStateMachineConfig:StateMachineConfigurerAdapter<WorkflowState, WorkflowStateChangeEvent>() {
    override fun configure(states: StateMachineStateConfigurer<WorkflowState, WorkflowStateChangeEvent>) {
        states.withStates()
      // 初始状态为CREATE
            .initial(WorkflowState.CREATE)
            .states(EnumSet.allOf(WorkflowState::class.java))
    }

    override fun configure(transitions: StateMachineTransitionConfigurer<WorkflowState, WorkflowStateChangeEvent>) {
     // CREATE->WAIT_REVIEW 通过SUBMIT事件
      transitions
           .withExternal().source(WorkflowState.CREATE).target(WorkflowState.WAIT_REVIEW).event(WorkflowStateChangeEvent.SUBMIT)
            .and() // WAIT_REVIEW->WAIT_MODIFY 通过REJECT事件
           .withExternal().source(WorkflowState.WAIT_REVIEW).target(WorkflowState.WAIT_MODIFY).event(WorkflowStateChangeEvent.REJECT)
            .and() // WAIT_MODIFY->WAIT_REVIEW 通过RE_SUBMIT事件
            .withExternal().source(WorkflowState.WAIT_MODIFY).target(WorkflowState.WAIT_REVIEW).event(WorkflowStateChangeEvent.RE_SUBMIT)
            .and() // WAIT_REVIEW->COMPLETED 通过AGREE事件
            .withExternal().source(WorkflowState.WAIT_REVIEW).target(WorkflowState.COMPLETED).event(WorkflowStateChangeEvent.AGREE)

    }

}

自定义StateMachinePersister来持久化

@Component
class CustomStateMachinePersister<S,E>(
    stateMachinePersist:CustomStateMachinePersist<S,E>
): AbstractStateMachinePersister<S, E, String>(stateMachinePersist)
@Component
class CustomStateMachinePersist<S,E>(
    private val processStateMapper: ProcessStateMapper,
    redisConnectionFactory: RedisConnectionFactory
) :StateMachinePersist<S,E,String>{
    val persistenceRepository = PersistenceRepository<S,E>(redisConnectionFactory)

    override fun write(context: StateMachineContext<S, E>, contextObj: String) {
        val values = persistenceRepository.serialize(context)
        persistenceRepository.save(context,contextObj,3L,TimeUnit.DAYS)

        val wrapper = KtQueryWrapper(ProcessState::class.java).eq(ProcessState::code,contextObj)

        val processState = processStateMapper.selectOne(wrapper)
        if (Objects.nonNull(processState)){
            processState.value = values
            processStateMapper.updateById(processState)
            return
        }
        processStateMapper.insert(ProcessState(null,contextObj,values))

    }

    override fun read(contextObj: String): StateMachineContext<S, E>? {
        val context = persistenceRepository.getContext(contextObj)
        if (Objects.nonNull(context)){
            return context
        }

        val wrapper = KtQueryWrapper(ProcessState::class.java).eq(ProcessState::code,contextObj)
        val result = processStateMapper.selectOne(wrapper)
        if (Objects.isNull(result)){
            return null
        }
        val str = result.value
        persistenceRepository.save(str,contextObj,3L,TimeUnit.DAYS)
        return persistenceRepository.deserialize(str)
    }


}

这里的ProcessStateMapper为我们存储数据库中的mapper

interface ProcessStateMapper :BaseMapper<ProcessState>{
}

@TableName("t_process_state")
data class ProcessState(
    @TableId(type = IdType.AUTO)
    var id:Long?,
    var code:String,
    var value:ByteArray
)

PersistenceRepository是用来序列化跟反序列化StateMachineContext,以及存入redis中的方法。

class PersistenceRepository<S, E>(redisConnectionFactory: RedisConnectionFactory) {
    private val kryoThreadLocal = ThreadLocal.withInitial {
        val kryo = Kryo()
        kryo.addDefaultSerializer(
            StateMachineContext::class.java,
            StateMachineContextSerializer<S, E>()
        )
        kryo.addDefaultSerializer(MessageHeaders::class.java, MessageHeadersSerializer())
        kryo.addDefaultSerializer(UUID::class.java, UUIDSerializer())
        kryo
    }
    private val redisOperations: RedisOperations<String, ByteArray>

    init {
        redisOperations = createDefaultTemplate(redisConnectionFactory)
    }
 		fun save(byteArray: ByteArray,id: String, time: Long, timeUnit: TimeUnit){
        redisOperations.opsForValue()[id] = byteArray
        redisOperations.expire(id, time, timeUnit)
    }
    fun save(context: StateMachineContext<S, E>, id: String, time: Long, timeUnit: TimeUnit) {
        redisOperations.opsForValue()[id] = serialize(context)
        redisOperations.expire(id, time, timeUnit)
    }

    fun getContext(id: String): StateMachineContext<S, E>? {
        return deserialize(redisOperations.opsForValue()[id])
    }

    private fun createDefaultTemplate(connectionFactory: RedisConnectionFactory): RedisTemplate<String, ByteArray> {
        val template = RedisTemplate<String, ByteArray>()
        template.keySerializer = StringRedisSerializer()
        template.hashKeySerializer = StringRedisSerializer()
        template.setConnectionFactory(connectionFactory)
        template.afterPropertiesSet()
        return template
    }

    fun serialize(context: StateMachineContext<S, E>): ByteArray {
        val kryo = kryoThreadLocal.get()
        val out = ByteArrayOutputStream()
        val output = Output(out)
        kryo.writeObject(output, context)
        output.close()
        return out.toByteArray()
    }

    fun <S, E> deserialize(data: ByteArray?): StateMachineContext<S, E>? {
        return if ((data != null) && data.isNotEmpty()) {
            val kryo: Kryo = kryoThreadLocal.get()
            val input = ByteArrayInputStream(data)
            val kryoInput = Input(input)
            kryo.readObject(kryoInput, StateMachineContext::class.java) as StateMachineContext<S, E>
        } else {
            null
        }
    }
}

到此我们就已经实现了redis+mysql的持久化功能,简单的测试一下

@Service
class WorkflowServiceImpl(
    private val workflowStateMachine: StateMachine<WorkflowState, WorkflowStateChangeEvent>,
    private val stateMachineMemPersister: StateMachinePersister<WorkflowState, WorkflowStateChangeEvent, String>,
    private val workflowMapper: WorkflowMapper
) : ServiceImpl<WorkflowMapper, Workflow>(), IWorkflowService {

    @Synchronized
    fun sendEvent(changeEvent: WorkflowStateChangeEvent, workflow: Workflow): Boolean {
        var result = false

        try {
            workflowStateMachine.start()
            stateMachineMemPersister.restore(workflowStateMachine, workflow.code)
            val message = MessageBuilder.withPayload(changeEvent).setHeader("workflow", workflow).build()
            result = workflowStateMachine.sendEvent(message)
            if (!result) {
                return false
            }
            stateMachineMemPersister.persist(workflowStateMachine, workflow.code)


        } catch (e: Exception) {
            slog.error("流程操作失败:$e")
        } finally {
            workflowStateMachine.stop()
        }
        return result
    }

    override fun create(workflow: Workflow): Workflow {
        workflow.status = WorkflowState.WAIT_REVIEW.key
        workflow.code = UUID.randomUUID().toString()
        val result = sendEvent(WorkflowStateChangeEvent.SUBMIT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.insert(workflow)
        return workflow
    }

    override fun reject(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.WAIT_MODIFY.key
        val result = sendEvent(WorkflowStateChangeEvent.REJECT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }

    override fun agree(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.COMPLETED.key
        val result = sendEvent(WorkflowStateChangeEvent.AGREE, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }

    override fun reSubmit(id: Long): Workflow {
        val workflow = workflowMapper.selectById(id)
        if (Objects.isNull(workflow)) {
            throw RuntimeException("流程不存在")
        }
        workflow.status = WorkflowState.WAIT_REVIEW.key
        val result = sendEvent(WorkflowStateChangeEvent.RE_SUBMIT, workflow)
        if (!result){
            throw RuntimeException("流程节点错误")
        }
        workflowMapper.updateById(workflow)
        return workflow
    }
}
@RestController
@RequestMapping("/workflow")
@Tag(name = "流程控制")
class WorkflowController(
    private val workflowService: IWorkflowService
) {


    @Operation(summary = "创建流程")
    @PostMapping("/create")
    fun create(@RequestBody workflow: Workflow):String{
        workflowService.create(workflow)
        return "success"
    }

    @GetMapping("/agree/{id}")
    fun agree(@PathVariable("id") id:Long):String{
        workflowService.agree(id)
        return "success"
    }

    @GetMapping("/reject/{id}")
    fun reject(@PathVariable("id") id:Long):String{
        workflowService.reject(id)
        return "success"
    }

    @GetMapping("/reSubmit/{id}")
    fun complete(@PathVariable("id") id:Long):String{
        workflowService.reSubmit(id)
        return "success"
    }
}

先创建一个流程,这个时候流程的状态应该是WAIT_REVIEW已提交,待审核,这个时候只能进行REJECTAGREE操作。

我们试一下RE_SUBMIT操作应该是不行的。

我们进行一下REJECT操作,应该是可以的。REJECT操作完成之后我们的状态变成了WAIT_MODIFY,只能进行RE_SUBMIT操作。

并且redis中也是存在的,删除redis中的数据也不影响我们状态的正确推进。

但是不能删除数据库中的持久化的数据,如果删除了并且redis中也过期后,会影响我们流程的正常推进。

标签:return,Statemachine,val,workflow,Spring,Redis,fun,id,WAIT
From: https://www.cnblogs.com/loveletters/p/17591677.html

相关文章

  • Redis
    1.什么是Redis?它主要用来什么的?Redis,英文全称是RemoteDictionaryServer(远程字典服务),是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。与MySQL数据库不同的是,Redis的数据是存在内存中的。它的读写速度非常快,每......
  • 02:SpringBoot2 整合 Redis 详细步骤
    1、pom文件中添加redis引用1<dependency>2<groupId>org.springframework.boot</groupId>3<artifactId>spring-boot-starter-web</artifactId>4</dependency>5<dependency>6......
  • 2023Spring project0
    Task1:copy-on-writetrie第一个task实现一个写时复制Trie树,个人理解,这个概念类似于OI中的可持久化Trie树首先大体框架已经给出来了,主要实现三个功能,分别是Get,Put和Remove。Get给定一个key,返回key所对应的value。有以下三种情况:对应的key在Trie树中不存在,那么应该提前退出......
  • 《Redis 设计与实现》的总结
    17.集群cluster-enabled#是否开启集群模式的选项CLUSTERMEET<ip><port>#告诉当前节点将ip:port节点加入到集群中三个数据结构:clusterNode:clusterNode结构保存了一个节点的当前状态,比如节点的创建时间、节点的名字、节点当前的配置纪元、节点的IP地址和端口号等等。......
  • 15_Spring_JDBCTemplate批操作
    15_Spring_JDBCTemplate批操作一次连接,操作表格里的多条数据,就是批量操作1批量增加2批量修改3批量删除实体类packagecom.msb.pojo;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjava.io.Serializable;/**@......
  • 17_Spring_事务环境搭建
    17_Spring_事务环境搭建通过张三给李四转账案例演示事务的控制1数据库中准备表格applicationContext.xmljdbc.properties见上节课2项目中准备实体类packagecom.msb.pojo;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;i......
  • 18_Spring_事务管理注解方式
    18_Spring_事务管理注解方式事务的管理应该放在我们的service层进行处理spring中有两种事务的管理方式1编程式事务管理(了解)2声明式事务管理(掌握)基于注解方式实现(掌握)XML方式实现(了解)Spring声明式事务的实现方式,底层就是AOP,AOP的底层就是动态代理Spring事务管......
  • 19_Spring_事务管理XML配置方式
    19_Spring_事务管理XML配置方式applicationContext中,通过AOP实现事务的控制<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/sche......
  • 20_Spring_零XML事务控制
    20_Spring_零XML事务控制创建配置类packagecom.msb.config;importcom.alibaba.druid.pool.DruidDataSource;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annot......
  • 21_Spring_日志框架和测试支持
    21_Spring_日志框架和测试支持spring5框架自带了通用的日志封装,也可以整合自己的日志1)spring移除了LOG4jConfigListener,官方建议使用log4j22)spring5整合log4j2导入log4j2依赖<!--log4j2依赖--><!--<dependency><groupId>org.apache.logg......