首页 > 编程语言 >wukong引擎源码分析之索引——part 1 倒排列表本质是有序数组存储

wukong引擎源码分析之索引——part 1 倒排列表本质是有序数组存储

时间:2023-05-30 22:35:30浏览次数:48  
标签:engine docId shard indexer wukong start part go 源码

searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"})

engine.go中的源码实现:

// 将文档加入索引
//
// 输入参数:
//  docId   标识文档编号,必须唯一
//  data    见DocumentIndexData注释
//
// 注意:
//      1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
//  2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
//         如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
func (engine *Engine) IndexDocument(docId uint64, data types.DocumentIndexData) {
    if !engine.initialized {
        log.Fatal("必须先初始化引擎")
    }
    atomic.AddUint64(&engine.numIndexingRequests, 1)
    shard := int(murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.NumShards))
    engine.segmenterChannel <- segmenterRequest{
        docId: docId, shard: shard, data: data}
}

而其中:

engine.segmenterChannel <- segmenterRequest{
        docId: docId, shard: shard, data: data}

将请求发送给segmenterChannel,其定义:

// 建立分词器使用的通信通道
    segmenterChannel chan segmenterRequest

而接受请求处理的代码在segmenter_worker.go里:

func (engine *Engine) segmenterWorker() {
    for {
        request := <-engine.segmenterChannel //关键

        tokensMap := make(map[string][]int)
        numTokens := 0
        if !engine.initOptions.NotUsingSegmenter && request.data.Content != "" {
            // 当文档正文不为空时,优先从内容分词中得到关键词
            segments := engine.segmenter.Segment([]byte(request.data.Content))
            for _, segment := range segments {
                token := segment.Token().Text()
                if !engine.stopTokens.IsStopToken(token) {
                    tokensMap[token] = append(tokensMap[token], segment.Start())
                }
            }
            numTokens = len(segments)
        } else {
            // 否则载入用户输入的关键词
            for _, t := range request.data.Tokens {
                if !engine.stopTokens.IsStopToken(t.Text) {
                    tokensMap[t.Text] = t.Locations
                }
            }
            numTokens = len(request.data.Tokens)
        }

        // 加入非分词的文档标签
        for _, label := range request.data.Labels {
            if !engine.initOptions.NotUsingSegmenter {
                if !engine.stopTokens.IsStopToken(label) {
                    tokensMap[label] = []int{}
                }
            } else {
                tokensMap[label] = []int{}
            }
        }

        indexerRequest := indexerAddDocumentRequest{
            document: &types.DocumentIndex{
                DocId:       request.docId,
                TokenLength: float32(numTokens),
                Keywords:    make([]types.KeywordIndex, len(tokensMap)),
            },
        }
        iTokens := 0
        for k, v := range tokensMap {
            indexerRequest.document.Keywords[iTokens] = types.KeywordIndex{
                Text: k,
                // 非分词标注的词频设置为0,不参与tf-idf计算
                Frequency: float32(len(v)),
                Starts:    v}
            iTokens++
        }

        var dealDocInfoChan = make(chan bool, 1)

        indexerRequest.dealDocInfoChan = dealDocInfoChan
        engine.indexerAddDocumentChannels[request.shard] <- indexerRequest

        rankerRequest := rankerAddDocRequest{
            docId:           request.docId,
            fields:          request.data.Fields,
            dealDocInfoChan: dealDocInfoChan,
        }
        engine.rankerAddDocChannels[request.shard] <- rankerRequest
    }
}

上面代码的作用就是在统计词频和单词位置(注意:tag也是作为搜索的单词,不过其词频是0,而无法参与tf-idf计算),并封装为indexerRequest,发送给engine.indexerAddDocumentChannels[request.shard]

------------------------------------------------

补充一点,上述代码之所以得以执行是因为在:

searcher = engine.Engine{}
// 初始化
searcher.Init(types.EngineInitOptions{SegmenterDictionaries: "../data/dictionary.txt"})

searcher的初始化代码里有这么一段:

// 启动分词器
    for iThread := 0; iThread < options.NumSegmenterThreads; iThread++ {
        go engine.segmenterWorker()
    }

------------------------------------------------

接收indexerRequest的代码在index_worker.go里:

func (engine *Engine) indexerAddDocumentWorker(shard int) {
    for {
        request := <-engine.indexerAddDocumentChannels[shard] //关键
        addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)
        // save
        if engine.initOptions.UsePersistentStorage {
            for k, v := range addInvertedIndex {
                engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{
                    typ:            "index",
                    keyword:        k,
                    keywordIndices: v,
                }
            }
        }

        atomic.AddUint64(&engine.numTokenIndexAdded,
            uint64(len(request.document.Keywords)))
        atomic.AddUint64(&engine.numDocumentsIndexed, 1)
    }
}

-----------------------------------------------

而上述函数之所以得以执行,还是因为在searcher的初始化函数里有这么一句:

// 启动索引器和排序器
    for shard := 0; shard < options.NumShards; shard++ {
        go engine.indexerAddDocumentWorker(shard) //关键
        go engine.indexerRemoveDocWorker(shard)
        go engine.rankerAddDocWorker(shard)
        go engine.rankerRemoveDocWorker(shard)

        for i := 0; i < options.NumIndexerThreadsPerShard; i++ {
            go engine.indexerLookupWorker(shard)
        }
        for i := 0; i < options.NumRankerThreadsPerShard; i++ {
            go engine.rankerRankWorker(shard)
        }
    }

------------------------------------------------

其中,engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)的核心代码在indexer.go里:

// 向反向索引表中加入一个文档
func (indexer *Indexer) AddDocument(document *types.DocumentIndex, dealDocInfoChan chan<- bool) (addInvertedIndex map[string]*types.KeywordIndices) {
    if indexer.initialized == false {
        log.Fatal("索引器尚未初始化")
    }

    indexer.InvertedIndexShard.Lock()
    defer indexer.InvertedIndexShard.Unlock()

    // 更新文档总数及关键词总长度
    indexer.DocInfosShard.Lock()
    if _, found := indexer.DocInfosShard.DocInfos[document.DocId]; !found {
        indexer.DocInfosShard.DocInfos[document.DocId] = new(types.DocInfo)
        indexer.DocInfosShard.NumDocuments++
    }
    if document.TokenLength != 0 {
        originalLength := indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths
        indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths = float32(document.TokenLength)
        indexer.InvertedIndexShard.TotalTokenLength += document.TokenLength - originalLength
    }
    indexer.DocInfosShard.Unlock()
    close(dealDocInfoChan)

    // docIdIsNew := true
    foundKeyword := false
    addInvertedIndex = make(map[string]*types.KeywordIndices)
    for _, keyword := range document.Keywords {
        addInvertedIndex[keyword.Text], foundKeyword = indexer.InvertedIndexShard.InvertedIndex[keyword.Text]
        if !foundKeyword {
            addInvertedIndex[keyword.Text] = new(types.KeywordIndices)
        }
        indices := addInvertedIndex[keyword.Text]

        if !foundKeyword {
            // 如果没找到该搜索键则加入
            switch indexer.initOptions.IndexType {
            case types.LocationsIndex:
                indices.Locations = [][]int{keyword.Starts}
            case types.FrequenciesIndex:
                indices.Frequencies = []float32{keyword.Frequency}
            }
            indices.DocIds = []uint64{document.DocId}
            indexer.InvertedIndexShard.InvertedIndex[keyword.Text] = indices
            continue
        }

        // 查找应该插入的位置
        position, found := indexer.searchIndex(
            indices, 0, indexer.getIndexLength(indices)-1, document.DocId)
        if found {
            // docIdIsNew = false

            // 覆盖已有的索引项
            switch indexer.initOptions.IndexType {
            case types.LocationsIndex:
                indices.Locations[position] = keyword.Starts
            case types.FrequenciesIndex:
                indices.Frequencies[position] = keyword.Frequency
            }
            continue
        }

        // 当索引不存在时,插入新索引项
        switch indexer.initOptions.IndexType {
        case types.LocationsIndex:
            indices.Locations = append(indices.Locations, []int{})
            copy(indices.Locations[position+1:], indices.Locations[position:])
            indices.Locations[position] = keyword.Starts
        case types.FrequenciesIndex:
            indices.Frequencies = append(indices.Frequencies, float32(0))
            copy(indices.Frequencies[position+1:], indices.Frequencies[position:])
            indices.Frequencies[position] = keyword.Frequency
        }
        indices.DocIds = append(indices.DocIds, 0)
        copy(indices.DocIds[position+1:], indices.DocIds[position:])
        indices.DocIds[position] = document.DocId
    }
    return
}

查找docID是否存在于倒排列表的时候是二分:

// 二分法查找indices中某文档的索引项
// 第一个返回参数为找到的位置或需要插入的位置
// 第二个返回参数标明是否找到  
func (indexer *Indexer) searchIndex( 
    indices *types.KeywordIndices, start int, end int, docId uint64) (int, bool) {
    // 特殊情况
    if indexer.getIndexLength(indices) == start {
        return start, false
    }
    if docId < indexer.getDocId(indices, start) {
        return start, false
    } else if docId == indexer.getDocId(indices, start) {
        return start, true
    }
    if docId > indexer.getDocId(indices, end) {
        return end + 1, false
    } else if docId == indexer.getDocId(indices, end) {
        return end, true
    }

    // 二分
    var middle int
    for end-start > 1 {
        middle = (start + end) / 2      
        if docId == indexer.getDocId(indices, middle) {
            return middle, true
        } else if docId > indexer.getDocId(indices, middle) {
            start = middle     
        } else {
            end = middle
        }
    }
    return end, false
}

 

TODO,待分析:indexer里索引的细节,以及评分相关的逻辑:

rankerRequest := rankerAddDocRequest{
            docId:           request.docId,
            fields:          request.data.Fields,
            dealDocInfoChan: dealDocInfoChan,
        }
        engine.rankerAddDocChannels[request.shard] <- rankerRequest

标签:engine,docId,shard,indexer,wukong,start,part,go,源码
From: https://blog.51cto.com/u_11908275/6382339

相关文章

  • BDB c++例子,从源码编译到运行
    第一步先下载源码,解压后./dist/configure--enable-cxx编译,然后make,makeinstall--enable-cxxTobuildtheBerkeleyDBC++API,enter--enable-cxxasanargumenttoconfigure. 默认的安装路径是:/usr/local/BerkeleyDB.6.1/ 代码如下:#include<stdlib.h>#include<strin......
  • 算法学习day34贪心part03-1005、134、135
    packageLeetCode.greedypart03;/***1005.K次取反后最大化的数组和*给你一个整数数组nums和一个整数k,按以下方法修改该数组:*选择某个下标i并将nums[i]替换为-nums[i]。*重复这个过程恰好k次。可以多次选择同一个下标i。*以这种方式修改数组后,返回......
  • 算法学习day32贪心part02-122、55、45
    packageLeetCode.greedypart02;/***122.买卖股票的最佳时机II*给你一个整数数组prices,其中prices[i]表示某支股票第i天的价格。*在每一天,你可以决定是否购买和/或出售股票。*你在任何时候最多只能持有一股股票。你也可以先购买,然后在同一天出售。*返......
  • 基于JAVA的springboot+vue学生综合测评系统,附源码+数据库+论文+PPT
    1、项目介绍本学生综合测评系统以springboot作为框架,b/s模式以及MySql作为后台运行的数据库,同时使用Tomcat用为系统的服务器。本系统主要包括首页,个人中心,学生管理,试题信息管理,测评试题管理,管理员管理,综合测评管理,系统管理,综合考试管理等功能,通过这些功能的实现基本能够满足日常......
  • springboot启动源码
    每个SpringBoot项目都有一个主程序启动类,在主程序启动类中有一个启动项目的main()方法,在该方法中通过执行SpringApplication.run()即可启动整个SpringBoot程序。问题:那么SpringApplication.run()方法到底是如何做到启动SpringBoot项目的呢?下面我们查看run()方法内部的源码,核......
  • 【Oracle】Clean all objects belong to particular the user but not using drop use
      #--WX:DBAJOE399--DEST_SCHEMA=Expected_user_namesqlplus/assysdba<<!EOFsetserveroutputonsetechooffsetfeedbackoffWHENEVERSQLERROREXIT1WHENEVEROSEEROREXIT1altersessionsetcurrent_schema=${DEST_SCHEMA};purgedba......
  • leetcode 561. Array Partition I
    Givenanarrayof2nintegers,yourtaskistogrouptheseintegersintonpairsofinteger,say(a1,b1),(a2,b2),...,(an,bn)whichmakessumofmin(ai,bi)forallifrom1tonaslargeaspossible.Example1:Input:[1,4,3,2]Output:4Explanation:......
  • UE4 源码解析----引擎初始化流程
      在研究UE4的源码过程中着实不理解的地方有很多,今天给大家分享一下UE4引擎的初始化流程。一、引擎的函数入口C++的函数入口都是Main()函数入口,UE4也是一样,Engine\Source\Runtime\Launch\PrivateWindows函数入口 引擎入口函数为:GuardedMain 二、引擎初始化的三个阶......
  • 源码编译安装openssh 最新版
    #创建几个目录备用mkdir-p/usr/local/zlib/mkdir-p/usr/local/openssl/mkdir-p/usr/local/openssh/ #安装编译需要用到的依赖包yum-yinstallwgetdnfvimgccgcc-c++makeperlpam-devel #下载相关源码包cd/rootwgethttps://ftp.openbsd.org/pub......
  • Linkis v1.3源码分析
     首先找到单机安装linkis的教程,可以看到第一次先执行install.sh 我们就从这个install.sh作为突破口,看看安装的时候,做了什么?   看到install.sh的前面就是设置了一些变量和环境检测,然后就执行了common.sh了,所以先暂时暂停到install.sh的48行,先去看一下......