searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"})
engine.go中的源码实现:
// 将文档加入索引
//
// 输入参数:
// docId 标识文档编号,必须唯一
// data 见DocumentIndexData注释
//
// 注意:
// 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
// 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
// 如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
func (engine *Engine) IndexDocument(docId uint64, data types.DocumentIndexData) {
if !engine.initialized {
log.Fatal("必须先初始化引擎")
}
atomic.AddUint64(&engine.numIndexingRequests, 1)
shard := int(murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.NumShards))
engine.segmenterChannel <- segmenterRequest{
docId: docId, shard: shard, data: data}
}
而其中:
engine.segmenterChannel <- segmenterRequest{
docId: docId, shard: shard, data: data}
将请求发送给segmenterChannel,其定义:
// 建立分词器使用的通信通道
segmenterChannel chan segmenterRequest
而接受请求处理的代码在segmenter_worker.go里:
func (engine *Engine) segmenterWorker() {
for {
request := <-engine.segmenterChannel //关键
tokensMap := make(map[string][]int)
numTokens := 0
if !engine.initOptions.NotUsingSegmenter && request.data.Content != "" {
// 当文档正文不为空时,优先从内容分词中得到关键词
segments := engine.segmenter.Segment([]byte(request.data.Content))
for _, segment := range segments {
token := segment.Token().Text()
if !engine.stopTokens.IsStopToken(token) {
tokensMap[token] = append(tokensMap[token], segment.Start())
}
}
numTokens = len(segments)
} else {
// 否则载入用户输入的关键词
for _, t := range request.data.Tokens {
if !engine.stopTokens.IsStopToken(t.Text) {
tokensMap[t.Text] = t.Locations
}
}
numTokens = len(request.data.Tokens)
}
// 加入非分词的文档标签
for _, label := range request.data.Labels {
if !engine.initOptions.NotUsingSegmenter {
if !engine.stopTokens.IsStopToken(label) {
tokensMap[label] = []int{}
}
} else {
tokensMap[label] = []int{}
}
}
indexerRequest := indexerAddDocumentRequest{
document: &types.DocumentIndex{
DocId: request.docId,
TokenLength: float32(numTokens),
Keywords: make([]types.KeywordIndex, len(tokensMap)),
},
}
iTokens := 0
for k, v := range tokensMap {
indexerRequest.document.Keywords[iTokens] = types.KeywordIndex{
Text: k,
// 非分词标注的词频设置为0,不参与tf-idf计算
Frequency: float32(len(v)),
Starts: v}
iTokens++
}
var dealDocInfoChan = make(chan bool, 1)
indexerRequest.dealDocInfoChan = dealDocInfoChan
engine.indexerAddDocumentChannels[request.shard] <- indexerRequest
rankerRequest := rankerAddDocRequest{
docId: request.docId,
fields: request.data.Fields,
dealDocInfoChan: dealDocInfoChan,
}
engine.rankerAddDocChannels[request.shard] <- rankerRequest
}
}
上面代码的作用就是在统计词频和单词位置(注意:tag也是作为搜索的单词,不过其词频是0,而无法参与tf-idf计算),并封装为indexerRequest,发送给engine.indexerAddDocumentChannels[request.shard]
------------------------------------------------
补充一点,上述代码之所以得以执行是因为在:
searcher = engine.Engine{}
// 初始化
searcher.Init(types.EngineInitOptions{SegmenterDictionaries: "../data/dictionary.txt"})
searcher的初始化代码里有这么一段:
// 启动分词器
for iThread := 0; iThread < options.NumSegmenterThreads; iThread++ {
go engine.segmenterWorker()
}
------------------------------------------------
接收indexerRequest的代码在index_worker.go里:
func (engine *Engine) indexerAddDocumentWorker(shard int) {
for {
request := <-engine.indexerAddDocumentChannels[shard] //关键
addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)
// save
if engine.initOptions.UsePersistentStorage {
for k, v := range addInvertedIndex {
engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{
typ: "index",
keyword: k,
keywordIndices: v,
}
}
}
atomic.AddUint64(&engine.numTokenIndexAdded,
uint64(len(request.document.Keywords)))
atomic.AddUint64(&engine.numDocumentsIndexed, 1)
}
}
-----------------------------------------------
而上述函数之所以得以执行,还是因为在searcher的初始化函数里有这么一句:
// 启动索引器和排序器
for shard := 0; shard < options.NumShards; shard++ {
go engine.indexerAddDocumentWorker(shard) //关键
go engine.indexerRemoveDocWorker(shard)
go engine.rankerAddDocWorker(shard)
go engine.rankerRemoveDocWorker(shard)
for i := 0; i < options.NumIndexerThreadsPerShard; i++ {
go engine.indexerLookupWorker(shard)
}
for i := 0; i < options.NumRankerThreadsPerShard; i++ {
go engine.rankerRankWorker(shard)
}
}
------------------------------------------------
其中,engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)的核心代码在indexer.go里:
// 向反向索引表中加入一个文档
func (indexer *Indexer) AddDocument(document *types.DocumentIndex, dealDocInfoChan chan<- bool) (addInvertedIndex map[string]*types.KeywordIndices) {
if indexer.initialized == false {
log.Fatal("索引器尚未初始化")
}
indexer.InvertedIndexShard.Lock()
defer indexer.InvertedIndexShard.Unlock()
// 更新文档总数及关键词总长度
indexer.DocInfosShard.Lock()
if _, found := indexer.DocInfosShard.DocInfos[document.DocId]; !found {
indexer.DocInfosShard.DocInfos[document.DocId] = new(types.DocInfo)
indexer.DocInfosShard.NumDocuments++
}
if document.TokenLength != 0 {
originalLength := indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths
indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths = float32(document.TokenLength)
indexer.InvertedIndexShard.TotalTokenLength += document.TokenLength - originalLength
}
indexer.DocInfosShard.Unlock()
close(dealDocInfoChan)
// docIdIsNew := true
foundKeyword := false
addInvertedIndex = make(map[string]*types.KeywordIndices)
for _, keyword := range document.Keywords {
addInvertedIndex[keyword.Text], foundKeyword = indexer.InvertedIndexShard.InvertedIndex[keyword.Text]
if !foundKeyword {
addInvertedIndex[keyword.Text] = new(types.KeywordIndices)
}
indices := addInvertedIndex[keyword.Text]
if !foundKeyword {
// 如果没找到该搜索键则加入
switch indexer.initOptions.IndexType {
case types.LocationsIndex:
indices.Locations = [][]int{keyword.Starts}
case types.FrequenciesIndex:
indices.Frequencies = []float32{keyword.Frequency}
}
indices.DocIds = []uint64{document.DocId}
indexer.InvertedIndexShard.InvertedIndex[keyword.Text] = indices
continue
}
// 查找应该插入的位置
position, found := indexer.searchIndex(
indices, 0, indexer.getIndexLength(indices)-1, document.DocId)
if found {
// docIdIsNew = false
// 覆盖已有的索引项
switch indexer.initOptions.IndexType {
case types.LocationsIndex:
indices.Locations[position] = keyword.Starts
case types.FrequenciesIndex:
indices.Frequencies[position] = keyword.Frequency
}
continue
}
// 当索引不存在时,插入新索引项
switch indexer.initOptions.IndexType {
case types.LocationsIndex:
indices.Locations = append(indices.Locations, []int{})
copy(indices.Locations[position+1:], indices.Locations[position:])
indices.Locations[position] = keyword.Starts
case types.FrequenciesIndex:
indices.Frequencies = append(indices.Frequencies, float32(0))
copy(indices.Frequencies[position+1:], indices.Frequencies[position:])
indices.Frequencies[position] = keyword.Frequency
}
indices.DocIds = append(indices.DocIds, 0)
copy(indices.DocIds[position+1:], indices.DocIds[position:])
indices.DocIds[position] = document.DocId
}
return
}
查找docID是否存在于倒排列表的时候是二分:
// 二分法查找indices中某文档的索引项
// 第一个返回参数为找到的位置或需要插入的位置
// 第二个返回参数标明是否找到
func (indexer *Indexer) searchIndex(
indices *types.KeywordIndices, start int, end int, docId uint64) (int, bool) {
// 特殊情况
if indexer.getIndexLength(indices) == start {
return start, false
}
if docId < indexer.getDocId(indices, start) {
return start, false
} else if docId == indexer.getDocId(indices, start) {
return start, true
}
if docId > indexer.getDocId(indices, end) {
return end + 1, false
} else if docId == indexer.getDocId(indices, end) {
return end, true
}
// 二分
var middle int
for end-start > 1 {
middle = (start + end) / 2
if docId == indexer.getDocId(indices, middle) {
return middle, true
} else if docId > indexer.getDocId(indices, middle) {
start = middle
} else {
end = middle
}
}
return end, false
}
TODO,待分析:indexer里索引的细节,以及评分相关的逻辑:
rankerRequest := rankerAddDocRequest{
docId: request.docId,
fields: request.data.Fields,
dealDocInfoChan: dealDocInfoChan,
}
engine.rankerAddDocChannels[request.shard] <- rankerRequest
标签:engine,docId,shard,indexer,wukong,start,part,go,源码
From: https://blog.51cto.com/u_11908275/6382339