首页 > 其他分享 >实现阻塞读且并发安全的map

实现阻塞读且并发安全的map

时间:2023-02-28 11:58:02浏览次数:32  
标签:map en hashmap 读且 Get mux 并发 key entry

实现阻塞读且并发安全的 map

需要实现以下接口

type sp interface {
    // 存入k-v, 此方法不会阻塞, 时刻都可以立即执行并返回.
    Put(key string, val interface{})  
    // 读取一个 k-v, 如果 key 不存在则阻塞, 等待 key 存在或者超时.
    Get(key string, timeout time.Duration) (interface{}, bool)
}

看到这个接口要求, 第一个反应就是用 channel 来进行信息的传递, 大概的模型差不多就是: 一个 Put 操作会向所有监听当前 key 的 Get 操作的 channel 中发送数据. 好了, 有了一个大概的思路, 我们开始实现这个接口.

首先定义 map 结构体和 entry 的结构体:

type Map struct {
	mux     sync.RWMutex      // 保护 hashmap
	hashmap map[string]*entry // hashmap
}

type entry struct {
	mux sync.RWMutex    // 用来保护 entry
	valid bool          // 结果是否有效, true 有效, false 无效
	value interface{}   // 结果
	chans []chan Result // Get 操作监听的通道
}

type Result struct {
	val  interface{} // 读取的结果
	read bool        // 读到了: true; 没读到: false
}

这里使用了读写锁来保护 Map 和 entry, 用来保证并发安全. chans 就是 Get 操作监听的通道, 这里使用了一个 valid 标志位来判断这个值是否有效, 为什么这么设计后文会讲.

我们先考虑 Put 的实现方法:

  • Put 首先如果 key 对应的 entry 不存在, 那么就需要 new 一个实例. 如果已经存在了, 需要将 value 和 valid 更新.
  • 接着将这个值发送给所有的 Get 监听通道, 发送一个关闭一个, 最后清空监听的通道.

来看看具体的实现:

func (this *Map) Put(key string, value interface{}) {
    go func() {
        // 1
        this.mux.RLock()
        en, exist := this.hashmap[key]
        if !exist {
            this.mux.RUnlock()
            this.mux.Lock()
            en = &entry{
                value: nil,
                valid: false,
                chans: make([]chan Result, 0),
            }
            this.hashmap[key] = en
            this.mux.Unlock()
        } else {
            this.mux.RUnlock()
        }

        // 2
        // en 是指针值, 即使 entry 指向的地址中的结构体内容改变, 这个地址还是不变的.
        // 所以不需要保护 hashmap, 只需要保护 entry 即可.
        en.mux.Lock()
        en.value = value // 赋值
        en.valid = true  // 结果有效
        // 向阻塞接收方发送消息
        for _, ch := range en.chans {
            ch <- Result{val: value, read: true} // 返回读取的信息
            close(ch)                            // 从发送方将 channel 全部关闭
        }
        en.chans = make([]chan Result, 0) // 发送完消息后清除掉所有的 channel
        en.mux.Unlock()
    }()
}

我们接着考虑 Get 的实现方法:

  • 首先 Get 方法需要查询 hashmap 中是否存在 key 对应的 entry, 如果不存在那么 new 一个实例.
  • 接着查看 entry 中的值是否有效, 如果有效那么直接返回.
  • 如果无效, 那么就将自己监听结果的 channel 注册, 然后监听两个 channel (timer 和 监听结果的) 即可.

来看看具体实现:

func (this *Map) Get(key string, timeout time.Duration) (interface{}, bool) {
    // 1
    this.mux.RLock()
    en, exist := this.hashmap[key]
    if !exist {
        this.mux.RUnlock()
        this.mux.Lock()
        en = &entry{
            value: nil,
            valid: false, // 消息无效
            chans: make([]chan Result, 0),
        }
        this.hashmap[key] = en
        this.mux.Unlock()
    } else {
        this.mux.RUnlock()
    }

    // 2
    en.mux.RLock()
    if en.valid {
        en.mux.RUnlock()
        return en.value, true
    }
    en.mux.RUnlock()

    // 3
    en.mux.Lock()
    readChan := make(chan Result, 1)      // 设置缓冲为 1 的 channel
    en.chans = append(en.chans, readChan) // 增加一个异步读取队列
    en.mux.Unlock()

    timer := time.NewTimer(timeout)
    select {
    case <-timer.C:
        return nil, false
    case result := <-readChan:
        return result.val, result.read
    }
}

测试代码:

func NewSP() SP {
	return &Map{
		mux:     sync.RWMutex{},
		hashmap: make(map[string]*entry),
	}
}

func TestMap(t *testing.T) {
	testCases := []func(){
		func() {
			wg := &sync.WaitGroup{}
			sp := NewSP()
			wg.Add(1)
			go func() {
				sp.Put("key1", "value1")
				time.Sleep(3 * time.Second)
				sp.Put("key2", "value2")
				wg.Done()
			}()
			wg.Add(1)
			go func() {
				res1, ok1 := sp.Get("key1", 1*time.Second)
				if !ok1 {
					t.Fail()
				}
				fmt.Println("res1: ", res1)
				res2, ok2 := sp.Get("key2", 1*time.Second)
				if ok2 {
					t.Fail()
				}
				fmt.Println("res2: ", res2)
				res3, ok3 := sp.Get("key2", 5*time.Second)
				if !ok3 {
					t.Fail()
				}
				fmt.Println("res3: ", res3)
				res4, ok4 := sp.Get("key1", 5*time.Second)
				if !ok4 {
					t.Fail()
				}
				fmt.Println("res4: ", res4)
				wg.Done()
			}()
			wg.Wait()
		},
	}

	for _, f := range testCases {
		f()
	}
}

测试结果:

res1:  value1
res2:  <nil>
res3:  value2
res4:  value1

标签:map,en,hashmap,读且,Get,mux,并发,key,entry
From: https://www.cnblogs.com/geraldkohn/p/17163520.html

相关文章

  • 谷歌Gmail邮箱开启SMTP/IMAP服务流程[转载]
    前言:本篇专门定向讲解谷歌Gmail邮箱,如何开通SMTP协议的流程,在讲篇幅前,我需要你确定3件事:1.你已经有谷歌账号了2.你很清楚自己为什么想要开通SMTP服务3.你已经掌握一定......
  • java网络编程-并发服务端
    上次的服务端一次只能接受一个客户端的连接,性能实在堪忧,我们对服务端进行了改造,每接到一个客户端的请求,就新建一个线程,让新线程跟客户端进行交互,主线程可以继续等待其他客......
  • java8 flatmap的使用
    Useruser=newUser(“[email protected]”,“1234”);user.setPosition(“Developer”);Stringposition=Optional.ofNullable(user).flatMap(u->u.getPosition()).......
  • 高并发linux内核参数调优
    高并发linux内核参数调优内核参数说明#【net】########################cat/proc/sys/net/ipv4/tcp_syncookies#默认值:1#作用:是否打开SYNCookie功能,该功能可以防......
  • 并发replace操作导致的死锁问题
    背景批量对一张表进行replaceinto操作,每个SQL操作1000条数据,最近有同事反馈使用并发replace操作的时候,遇到了死锁的问题。针对这个问题,我看了看表的结构,发现表中有一个主......
  • 三天吃透Java并发八股文!
    本文已经收录到Github仓库,该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校......
  • colmap相关博客与材料
    colmap教程colmap简易教程(命令行模式)colmap导出colmap结果导出已知相机位姿重建有点没太看懂。需要再搞搞。【colmap】已知相机位姿情况下进行三维重建COLMAP......
  • 并发多线程学习(二)上下文切换
    上下文切换(有时也称做进程切换或任务切换)是指CPU从一个进程(或线程)切换到另一个进程(或线程)。上下文是指某一时间点CPU寄存器和程序计数器的内容。寄存器是cpu内部的少......
  • 并发多线程学习(三)Java多线程入门类和接口
    1Thread类和Runnable接口上一章我们了解了操作系统中多线程的基本概念。那么在Java中,我们是如何使用多线程的呢?首先,我们需要有一个“线程”类。JDK提供了Thread类和Runn......
  • 并发多线程学习(五)Java线程的状态及主要转化方法
    1操作系统中的线程状态转换首先我们来看看操作系统中的线程状态转换。在现在的操作系统中,线程是被视为轻量级进程的,所以操作系统线程的状态其实和操作系统进程的状态是......