首页 > 编程语言 > Go源码阅读——github.com/medcl/esm —— v0.go

Go源码阅读——github.com/medcl/esm —— v0.go

时间:2023-05-12 10:11:25浏览次数:51  
标签:body github err nil settings 源码 medcl interface string

esm(An Elasticsearch Migration Tool)—— v0.go

https://github.com/medcl/esm release: 8.7.1

通过阅读好的源代码,细致思考,理性分析并借鉴优秀实践经验,提高 zuoyang 的编程水平,所谓 "他山之石,可以攻玉"  该是如此吧。 

/*
Copyright 2016 Medcl (m AT medcl.net)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"regexp"
	"strings"

	log "github.com/cihub/seelog"
)

type ESAPIV0 struct {
	Host      string //eg: http://localhost:9200
	Auth      *Auth  //eg: user:pass
	HttpProxy string //eg: http://proxyIp:proxyPort
	Compress  bool
}

/*获取 Elasticsearch 集群的健康状况。*/
func (s *ESAPIV0) ClusterHealth() *ClusterHealth {

	/*使用 fmt.Sprintf() 函数构造了请求 URL,并调用了 Get() 方法发起了 GET 请求。*/
	url := fmt.Sprintf("%s/_cluster/health", s.Host)
	/*Get() 方法返回的 r、body 和 errs 变量分别代表 HTTP 响应、响应体和请求错误。*/
	r, body, errs := Get(url, s.Auth, s.HttpProxy)

	if r != nil && r.Body != nil {
		//如果 r 的值不为空,同时 r.Body 也不为空,则说明请求成功,并通过 io.Copy()函数把 r.Body 中的数据流扔到 ioutil.Discard 中丢弃。
		//ioutil.Discard 是一个 Go 语言标准库中的变量,它是一个 io.Writer 接口的实现,可以用于写入任何数据但会直接丢弃它们。
		//这个变量通常在执行某些操作时不需要输出结果时使用,例如在日志库中,当日志级别低于某个阈值时,可以将日志输出重定向到 ioutil.Discard,这样就可以避免不必要的输出
		//io.Copy() 是一个 Go 语言标准库中的函数,用于复制一个 Reader 的内容到一个 Writer 中,它的函数签名如下: func Copy(dst Writer, src Reader) (written int64, err error)
		io.Copy(ioutil.Discard, r.Body)
		defer r.Body.Close()
	}

	/*判断 errs 变量是否为空,如果存在请求错误,则返回一个 ClusterHealth 对象,将集群状态设置为 "unreachable"。*/
	if errs != nil {
		return &ClusterHealth{Name: s.Host, Status: "unreachable"}
	}

	log.Debug(url)
	log.Debug(body)

	health := &ClusterHealth{}
	/*
	   使用 json.Unmarshal() 函数解析 HTTP 响应体,将响应体中的数据解析为一个 ClusterHealth struct对象
	*/
	err := json.Unmarshal([]byte(body), health)

	if err != nil {
		log.Error(body)
		return &ClusterHealth{Name: s.Host, Status: "unreachable"}
	}
	return health
}

func (s *ESAPIV0) Bulk(data *bytes.Buffer) {
	if data == nil || data.Len() == 0 {
		log.Trace("data is empty, skip")
		return
	}
	data.WriteRune('\n')
	url := fmt.Sprintf("%s/_bulk", s.Host)

	body, err := DoRequest(s.Compress, "POST", url, s.Auth, data.Bytes(), s.HttpProxy)

	if err != nil {
		log.Error(err)
		return
	}
	response := BulkResponse{}
	err = DecodeJson(body, &response)
	if err == nil {
		if response.Errors {
			fmt.Println(body)
		}
	}

	data.Reset()
}

func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) {

	// get all settings
	allSettings := &Indexes{}

	url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames)
	resp, body, errs := Get(url, s.Auth, s.HttpProxy)

	if resp != nil && resp.Body != nil {
		io.Copy(ioutil.Discard, resp.Body)
		defer resp.Body.Close()
	}

	if errs != nil {
		return nil, errs[0]
	}

	if resp.StatusCode != 200 {
		return nil, errors.New(body)
	}

	log.Debug(body)

	err := json.Unmarshal([]byte(body), allSettings)
	if err != nil {
		panic(err)
		return nil, err
	}

	return allSettings, nil
}

func (s *ESAPIV0) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) {
	url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
	resp, body, errs := Get(url, s.Auth, s.HttpProxy)

	if resp != nil && resp.Body != nil {
		io.Copy(ioutil.Discard, resp.Body)
		defer resp.Body.Close()
	}

	if errs != nil {
		log.Error(errs)
		return "", 0, nil, errs[0]
	}

	if resp.StatusCode != 200 {
		return "", 0, nil, errors.New(body)
	}

	idxs := Indexes{}
	er := json.Unmarshal([]byte(body), &idxs)

	if er != nil {
		log.Error(body)
		return "", 0, nil, er
	}

	// remove indexes that start with . if user asked for it
	//if copyAllIndexes == false {
	//      for name := range idxs {
	//              switch name[0] {
	//              case '.':
	//                      delete(idxs, name)
	//              case '_':
	//                      delete(idxs, name)
	//
	//
	//                      }
	//              }
	//      }

	// if _all indexes limit the list of indexes to only these that we kept
	// after looking at mappings
	if indexNames == "_all" {

		var newIndexes []string
		for name := range idxs {
			newIndexes = append(newIndexes, name)
		}
		indexNames = strings.Join(newIndexes, ",")

	} else if strings.Contains(indexNames, "*") || strings.Contains(indexNames, "?") {

		r, _ := regexp.Compile(indexNames)

		//check index patterns
		var newIndexes []string
		for name := range idxs {
			matched := r.MatchString(name)
			if matched {
				newIndexes = append(newIndexes, name)
			}
		}
		indexNames = strings.Join(newIndexes, ",")

	}

	i := 0
	// wrap in mappings if moving from super old es
	for name, idx := range idxs {
		i++
		if _, ok := idx.(map[string]interface{})["mappings"]; !ok {
			(idxs)[name] = map[string]interface{}{
				"mappings": idx,
			}
		}
	}

	return indexNames, i, &idxs, nil
}

func getEmptyIndexSettings() map[string]interface{} {
	tempIndexSettings := map[string]interface{}{}
	tempIndexSettings["settings"] = map[string]interface{}{}
	tempIndexSettings["settings"].(map[string]interface{})["index"] = map[string]interface{}{}
	return tempIndexSettings
}

func cleanSettings(settings map[string]interface{}) {
	//clean up settings
	delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "creation_date")
	delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "uuid")
	delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "version")
	delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "provided_name")
}

func (s *ESAPIV0) UpdateIndexSettings(name string, settings map[string]interface{}) error {

	log.Debug("update index: ", name, settings)
	cleanSettings(settings)
	url := fmt.Sprintf("%s/%s/_settings", s.Host, name)

	if _, ok := settings["settings"].(map[string]interface{})["index"]; ok {
		if set, ok := settings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"]; ok {
			log.Debug("update static index settings: ", name)
			staticIndexSettings := getEmptyIndexSettings()
			staticIndexSettings["settings"].(map[string]interface{})["index"].(map[string]interface{})["analysis"] = set
			Post(fmt.Sprintf("%s/%s/_close", s.Host, name), s.Auth, "", s.HttpProxy)
			body := bytes.Buffer{}
			enc := json.NewEncoder(&body)
			enc.Encode(staticIndexSettings)
			bodyStr, err := Request("PUT", url, s.Auth, &body, s.HttpProxy)
			if err != nil {
				log.Error(bodyStr, err)
				panic(err)
				return err
			}
			delete(settings["settings"].(map[string]interface{})["index"].(map[string]interface{}), "analysis")
			Post(fmt.Sprintf("%s/%s/_open", s.Host, name), s.Auth, "", s.HttpProxy)
		}
	}

	log.Debug("update dynamic index settings: ", name)

	body := bytes.Buffer{}
	enc := json.NewEncoder(&body)
	enc.Encode(settings)
	_, err := Request("PUT", url, s.Auth, &body, s.HttpProxy)

	return err
}

func (s *ESAPIV0) UpdateIndexMapping(indexName string, settings map[string]interface{}) error {

	log.Debug("start update mapping: ", indexName, settings)

	for name, mapping := range settings {

		log.Debug("start update mapping: ", indexName, name, mapping)

		url := fmt.Sprintf("%s/%s/%s/_mapping", s.Host, indexName, name)

		body := bytes.Buffer{}
		enc := json.NewEncoder(&body)
		enc.Encode(mapping)
		res, err := Request("POST", url, s.Auth, &body, s.HttpProxy)
		if err != nil {
			log.Error(url)
			log.Error(body.String())
			log.Error(err, res)
			panic(err)
		}
	}
	return nil
}

func (s *ESAPIV0) DeleteIndex(name string) (err error) {

	log.Debug("start delete index: ", name)

	url := fmt.Sprintf("%s/%s", s.Host, name)

	Request("DELETE", url, s.Auth, nil, s.HttpProxy)

	log.Debug("delete index: ", name)

	return nil
}

func (s *ESAPIV0) CreateIndex(name string, settings map[string]interface{}) (err error) {
	cleanSettings(settings)

	body := bytes.Buffer{}
	enc := json.NewEncoder(&body)
	enc.Encode(settings)
	log.Debug("start create index: ", name, settings)

	url := fmt.Sprintf("%s/%s", s.Host, name)

	resp, err := Request("PUT", url, s.Auth, &body, s.HttpProxy)
	log.Debugf("response: %s", resp)

	return err
}

func (s *ESAPIV0) Refresh(name string) (err error) {

	log.Debug("refresh index: ", name)

	url := fmt.Sprintf("%s/%s/_refresh", s.Host, name)

	resp, _, _ := Post(url, s.Auth, "", s.HttpProxy)
	if resp != nil && resp.Body != nil {
		io.Copy(ioutil.Discard, resp.Body)
		defer resp.Body.Close()
	}

	return nil
}

func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, slicedId, maxSlicedCount int, fields string) (scroll interface{}, err error) {

	// curl -XGET 'http://es-0.9:9200/_search?search_type=scan&scroll=10m&size=50'
	url := fmt.Sprintf("%s/%s/_search?search_type=scan&scroll=%s&size=%d", s.Host, indexNames, scrollTime, docBufferCount)

	var jsonBody []byte
	if len(query) > 0 || len(fields) > 0 {
		queryBody := map[string]interface{}{}
		if len(fields) > 0 {
			if !strings.Contains(fields, ",") {
				queryBody["_source"] = fields
			} else {
				queryBody["_source"] = strings.Split(fields, ",")
			}
		}

		if len(query) > 0 {
			queryBody["query"] = map[string]interface{}{}
			queryBody["query"].(map[string]interface{})["query_string"] = map[string]interface{}{}
			queryBody["query"].(map[string]interface{})["query_string"].(map[string]interface{})["query"] = query
		}

		jsonBody, err = json.Marshal(queryBody)
		if err != nil {
			log.Error(err)
			return nil, err
		}

	}
	//resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy)
	body, err := DoRequest(s.Compress, "POST", url, s.Auth, jsonBody, s.HttpProxy)
	if err != nil {
		log.Error(err)
		return nil, err
	}

	scroll = &Scroll{}
	err = DecodeJson(body, scroll)
	if err != nil {
		log.Error(err)
		return nil, err
	}

	return scroll, err
}

func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (interface{}, error) {
	//  curl -XGET 'http://es-0.9:9200/_search/scroll?scroll=5m'
	id := bytes.NewBufferString(scrollId)
	url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
	body, err := DoRequest(s.Compress, "GET", url, s.Auth, nil, s.HttpProxy)

	if err != nil {
		log.Error(err)
		return nil, err
	}

	// decode elasticsearch scroll response
	scroll := &Scroll{}
	err = DecodeJson(body, &scroll)
	if err != nil {
		log.Error(err)
		return nil, err
	}

	return scroll, nil
}

标签:body,github,err,nil,settings,源码,medcl,interface,string
From: https://www.cnblogs.com/zuoyang/p/17392999.html

相关文章

  • java基于springboot+html的学生就业管理系统的设计与实现,附源码+数据库+文档,包安装调
    1、项目介绍本系统是利用现代化的计算机网络技术将传统信息宣传方式整合,按照实践过程设计完成的。同时完善服务,初步设计一个学生就业管理系统平台以利于相关的事务操作。为了使系统在各项管理中发挥更大的作用,实现计算机信息化高效的管理,现将开发目标功能需求介绍如下:(1)管理员模......
  • 源码环境搭建-唯一客服系统文档中心
    运行源码环境golang语言为跨平台的开发语言,使用唯一客服系统全源码版本,进行二次开发,需要搭建golang运行环境,并且开启gomodule依赖管理Windows系统首先下载golang压缩包,在下面这个地址下载https://studygolang.com/dl找到windows的安装包,msi的就可以,这样下一步下一步的直接就......
  • Go源码阅读——github.com/medcl/esm —— scroll.go
    esm(AnElasticsearchMigrationTool)——log.gohttps://github.com/medcl/esmrelease:8.7.1通过阅读好的源代码,细致思考,理性分析并借鉴优秀实践经验,提高zuoyang的编程水平,所谓"他山之石,可以攻玉" 该是如此吧。 /*Copyright2016Medcl(mATmedcl.net)Licensedu......
  • APIView执行流程(源码分析)、Request对象源码分析
    目录一、APIView执行流程——源码分析(难,了解)1.1基于APIView+JsonResponse编写接口1.2基于APIView+Response写接口1.3APIView的执行流程二、Request对象源码分析(难,了解)一、APIView执行流程——源码分析(难,了解)1.1基于APIView+JsonResponse编写接口#原来基于django原生的Vi......
  • https://pengchenggang.github.io/vuejsdev-com-github 备份发布
    https://pengchenggang.github.io/vuejsdev-com-github备份发布现在还没有解决的就是开clash,代码提交不上去,只能关了提,但是关了提交,也得赶运气。提交代码体验很差~......
  • 通过ssh的方式提交github
    通过ssh的方式提交github原因:github的https的clone项目报错,所以改用ssh的方式1本地创建ssh秘钥目录是.ssh我电脑的目录在C:\Users\Reciter\.ssh生成秘钥文件id_rsaid_rsa.pubssh-keygen-t-rsa-C'second@mail.com'id_rsa就是你的秘钥id_rsa.pub就是你的......
  • 直播网站程序源码,【openpyxl】只读模式、只写模式
    直播网站程序源码,【openpyxl】只读模式、只写模式1.只读模式只读模式,如果你需要读取很大的Excel文件,但是又不改变和保存,例如只读取数值用于其他数据分析,这时候我们完全可以使用只读模式提供性能 fromopenpyxlimportload_workbook#加载Excel文件时使用read_only指定只读模......
  • 成品直播源码推荐,js点击让窗口抖动动画效果
    成品直播源码推荐,js点击让窗口抖动动画效果比如说用户的未输入密码就点击登录按钮,则输入框会晃动一下提示用户需要输入,实现这种效果很简单,只需要给元素添加一个类,然后做一个关键帧动画即可css代码 .shake{   animation:shake800msease-in-out; }@keyframesshake{......
  • Go源码阅读——github.com/medcl/esm —— http.go
    esm(AnElasticsearchMigrationTool)——http.gohttps://github.com/medcl/esmrelease:8.7.1通过阅读好的源代码,细致思考,理性分析并借鉴优秀实践经验,提高zuoyang的编程水平,所谓"他山之石,可以攻玉" 该是如此吧。 /*Copyright2016Medcl(mATmedcl.net)Licensed......
  • Github 自动部署(docker)
    githubaction自动化部署(docker)上一篇博客pm2方式自动部署方式类型一个利用pm2方式本文利用docker方式配置文件name:github-action-demo#工作流名称on:push:branches:-develop#生效分支jobs:first-github-job:#任务名称自定义runs-on:......