Go 分布式计算(全)
原文:
zh.annas-archive.org/md5/BF0BD04A27ACABD0F3CDFCFC72870F45
译者:飞龙
前言
Go 编程语言是在 Google 开发的,用于解决他们在为其基础设施开发软件时遇到的问题。他们需要一种静态类型的语言,不会减慢开发人员的速度,可以立即编译和执行,利用多核处理器,并使跨分布式系统的工作变得轻松。
《使用 Go 进行分布式计算》的使命是使并发和并行推理变得轻松,并为读者提供设计和实现此类程序的信心。我们将首先深入探讨 goroutines 和 channels 背后的核心概念,这是 Go 语言构建的两个基本概念。接下来,我们将使用 Go 和 Go 标准库设计和构建一个分布式搜索引擎。
这本书是为谁准备的
这本书适用于熟悉 Golang 语法并对基本 Go 开发有一定了解的开发人员。如果您经历过 Web 应用程序产品周期,将会更有优势,尽管这并非必需。
本书涵盖的内容
第一章《Go 的开发环境》涵盖了开始使用 Go 和本书其余部分所需的一系列主题和概念。其中一些主题包括 Docker 和 Go 中的测试。
第二章《理解 Goroutines》介绍了并发和并行主题,然后深入探讨了 goroutines 的实现细节、Go 的运行时调度器等。
第三章《Channels and Messages》首先解释了控制并行性的复杂性,然后介绍了使用不同类型的通道来控制并行性的策略。
第四章《RESTful Web》提供了开始在 Go 中设计和构建 REST API 所需的所有上下文和知识。我们还将讨论使用不同可用方法与 REST API 服务器进行交互。
第五章《介绍 Goophr》开始讨论分布式搜索引擎的含义,使用 OpenAPI 规范描述 REST API,并描述搜索引擎组件的责任。最后,我们将描述项目结构。
第六章《Goophr Concierge》深入介绍了 Goophr 的第一个组件,详细描述了该组件应该如何工作。借助架构和逻辑流程图,进一步强化了这些概念。最后,我们将看看如何实现和测试该组件。
第七章《Goophr 图书管理员》详细介绍了负责维护搜索词索引的组件。我们还将讨论如何搜索给定的词语以及如何对搜索结果进行排序等。最后,我们将看看如何实现和测试该组件。
第八章《部署 Goophr》将前三章中实现的所有内容汇集起来,并在本地系统上启动应用程序。然后,我们将通过 REST API 添加一些文档并对其进行搜索,以测试我们的设计。
第九章《Web 规模架构的基础》是一个广泛而复杂的主题介绍,讨论如何设计和扩展系统以满足 Web 规模的需求。我们将从单个运行在单个服务器上的单体实例开始,并将其扩展到跨越多个区域,具有冗余保障以确保服务永远不会中断等。
充分利用本书
-
本书中的材料旨在实现动手操作。在整本书中,我们都在努力提供所有相关信息,以便读者可以选择自己尝试解决问题,然后再参考书中提供的解决方案。
-
书中的代码除了标准库外没有任何 Go 依赖。这样做是为了确保书中提供的代码示例永远不会改变,也让我们能够探索标准库。
-
书中的源代码应放置在
$GOPATH/src/distributed-go
目录下。给出的示例源代码将位于$GOPATH/src/distributed-go/chapterX
文件夹中,其中X
代表章节编号。 -
从
golang.org/
和www.docker.com/community-edition
网站下载并安装 Go 和 Docker
下载示例代码文件
您可以从www.packtpub.com
的帐户中下载本书的示例代码文件。如果您在其他地方购买了本书,可以访问www.packtpub.com/support
并注册,文件将直接发送到您的邮箱。
您可以按照以下步骤下载代码文件:
-
在
www.packtpub.com
登录或注册。 -
选择“支持”选项卡。
-
点击“代码下载和勘误”。
-
在搜索框中输入书名,然后按照屏幕上的说明操作。
下载文件后,请确保使用以下最新版本解压或提取文件夹:
-
WinRAR / 7-Zip for Windows
-
Zipeg / iZip / UnRarX for Mac
-
7-Zip / PeaZip for Linux
本书的代码包也托管在 GitHub 上,网址为github.com/PacktPublishing/Distributed-Computing-with-Go
。如果代码有更新,将在现有的 GitHub 存储库中进行更新。
我们还有其他代码包来自我们丰富的图书和视频目录,可在github.com/PacktPublishing/
上找到。快去看看吧!
下载彩色图片
我们还提供了一个 PDF 文件,其中包含本书中使用的屏幕截图/图表的彩色图片。您可以在这里下载:www.packtpub.com/sites/default/files/downloads/DistributedComputingwithGo_ColorImages.pdf
。
使用的约定
本书中使用了许多文本约定。
CodeInText
:表示文本中的代码单词、数据库表名、文件夹名、文件名、文件扩展名、路径名、虚拟 URL、用户输入和 Twitter 句柄。例如,“现在我们已经准备好所有的代码,让我们使用Dockerfile
文件构建 Docker 镜像。”
代码块设置如下:
// addInt.go
package main
func addInt(numbers ...int) int {
sum := 0
for _, num := range numbers {
sum += num
}
return sum
}
当我们希望引起您对代码块的特定部分的注意时,相关行或项目会以粗体显示:
// addInt.go
package main
func addInt(numbers ...int) int {
sum := 0
for _, num := range numbers {
sum += num
}
return sum
}
任何命令行输入或输出都将按以下方式编写:
$ cd docker
粗体:表示新术语、重要单词或屏幕上看到的单词,例如在菜单或对话框中,也会在文本中出现。例如,“从管理面板中选择系统信息。”
警告或重要提示会这样出现。
提示和技巧会这样出现。
第一章:Go 的开发环境
Go 是为 21 世纪应用程序开发而构建的现代编程语言。在过去的十年里,硬件和技术有了显著的进步,大多数其他语言没有利用这些技术进步。正如我们将在整本书中看到的,Go 允许我们构建利用多核系统提供的并发性和并行性的网络应用程序。
在本章中,我们将看一些在书的其余部分工作所需的主题,比如:
-
Go 配置——
GOROOT
、GOPATH
等。 -
Go 包管理
-
整本书中使用的项目结构
-
容器技术以及如何使用 Docker
-
在 Go 中编写测试
GOROOT
为了运行或构建一个 Go 项目,我们需要访问 Go 二进制文件及其库。在 Unix 系统上,典型的 Go 安装(安装说明可以在golang.org/dl/
找到)会将 Go 二进制文件放在/usr/bin/go
。然而,也可以在不同的路径上安装 Go。在这种情况下,我们需要设置GOROOT
环境变量指向我们的 Go 安装路径,并将其附加到我们的PATH
环境变量中。
GOPATH
程序员倾向于在许多项目上工作,将源代码与非编程相关文件分开是一个好习惯。将源代码放在一个单独的位置或工作空间是一个常见的做法。每种编程语言都有其自己的约定,规定语言相关项目应该如何设置,Go 也不例外。
GOPATH
是开发人员必须设置的最重要的环境变量。它告诉 Go 编译器在哪里找到项目和其依赖项的源代码。GOPATH
中有一些需要遵循的约定,它们与文件夹层次结构有关。
src/
这个目录将包含我们项目和它们依赖项的源代码。一般来说,我们希望我们的源代码有版本控制,并且托管在云上。如果我们或其他人能够轻松地使用我们的项目,那将是很好的。这需要我们做一些额外的设置。
假设我们的项目托管在http://git-server.com/user-name/my-go-project
。我们想要在本地系统上克隆和构建这个项目。为了使其正常工作,我们需要将其克隆到$GOPATH/src/git-server.com/user-name/my-go-project
。当我们第一次为 Go 项目构建依赖项时,我们会看到src/
文件夹中有许多包含我们项目依赖项的目录和子目录。
pkg/
Go 是一种编译型编程语言;我们有我们想要在项目中使用的源代码和依赖项的代码。一般来说,每次构建一个二进制文件,编译器都必须读取我们项目和依赖项的源代码,然后将其编译成机器代码。每次编译我们的主程序时编译未更改的依赖项会导致非常缓慢的构建过程。这就是目标文件存在的原因;它们允许我们将依赖项编译成可重用的机器代码,可以直接包含在我们的 Go 二进制文件中。
这些目标文件存储在$GOPATH/pkg
中;它们遵循与src/
类似的目录结构,只是它们位于一个子目录中。这些目录往往遵循<OS>_<CPU-Architecture>
的命名模式,因为我们可以为多个系统构建可执行二进制文件:
$ tree $GOPATH/pkg
pkg
└── linux_amd64
├── github.com
│ ├── abbot
│ │ └── go-http-auth.a
│ ├── dimfeld
│ │ └── httppath.a
│ ├── oklog
│ │ └── ulid.a
│ ├── rcrowley
│ │ └── go-metrics.a
│ ├── sirupsen
│ │ └── logrus.a
│ ├── sony
│ │ └── gobreaker.a
└── golang.org
└── x
├── crypto
│ ├── bcrypt.a
│ ├── blowfish.a
│ └── ssh
│ └── terminal.a
├── net
│ └── context.a
└── sys
bin/
Go 将我们的项目编译和构建成可执行二进制文件,并将它们放在这个目录中。根据构建规范,它们可能在当前系统或其他系统上可执行。为了使用bin/
目录中可用的二进制文件,我们需要设置相应的GOBIN=$GOPATH/bin
环境变量。
包管理
在过去,所有程序都是从头开始编写的——每个实用函数和运行代码的库都必须手工编写。现在,我们不希望经常处理低级细节;从头开始编写所有所需的库和实用程序是不可想象的。Go 带有丰富的库,这对于我们大多数需求来说已经足够了。然而,可能我们需要一些标准库提供的额外库或功能。这样的库应该可以在互联网上找到,并且我们可以下载并将它们添加到我们的项目中以开始使用它们。
在前一节GOPATH中,我们讨论了所有项目都保存在$GOPATH/src/git-server.com/user-name/my-go-project
形式的合格路径中。这对于我们可能拥有的任何依赖项都是正确的。在 Go 中处理依赖项有多种方法。让我们看看其中一些。
go get
go get
是标准库提供的用于包管理的实用程序。我们可以通过运行以下命令来安装新的包/库:
$ go get git-server.com/user-name/library-we-need
这将下载并构建源代码,然后将其安装为二进制可执行文件(如果可以作为独立可执行文件使用)。go get
实用程序还会安装我们项目所需的所有依赖项。
go get
实用程序是一个非常简单的工具。它将安装 Git 存储库上的最新主提交。对于简单的项目,这可能足够了。然而,随着项目在大小和复杂性上的增长,跟踪使用的依赖版本可能变得至关重要。不幸的是,go get
对于这样的项目并不是很好,我们可能需要看看其他包管理工具。
glide
glide
是 Go 社区中最广泛使用的包管理工具之一。它解决了go get
的限制,但需要开发人员手动安装。以下是安装和使用glide
的简单方法:
$ curl https://glide.sh/get | sh
$ mkdir new-project && cd new-project
$ glide create
$ glide get github.com/last-ent/skelgor # A helper project to generate project skeleton.
$ glide install # In case any dependencies or configuration were manually added.
$ glide up # Update dependencies to latest versions of the package.
$ tree
.
├── glide.lock
├── glide.yaml
└── vendor
└── github.com
└── last-ent
└── skelgor
├── LICENSE
├── main.go
└── README.md
如果您不希望通过curl
和sh
安装glide
,还有其他选项可在项目页面上更详细地描述,该页面位于github.com/masterminds/glide
。
go dep
go dep
是 Go 社区正在开发的新的依赖管理工具。现在,它需要 Go 1.7 或更新版本进行编译,并且已经准备好供生产使用。然而,它仍在进行更改,并且尚未合并到 Go 的标准库中。
项目结构
一个项目可能不仅仅包括项目的源代码,例如配置文件和项目文档。根据偏好,项目的结构方式可能会发生很大变化。然而,最重要的是要记住整个程序的入口是通过main
函数,这是在main.go
中作为约定实现的。
本书中将构建的应用程序将具有以下初始结构:
$ tree
.
├── common
│ ├── helpers.go
│ └── test_helpers.go
└── main.go
使用书中的代码
本书中讨论的源代码可以通过两种方式获得:
-
使用
go get -u github.com/last-ent/distributed-go
-
从网站下载代码包并将其提取到
$GOPATH/src/github.com/last-ent/distributed-go
完整书籍的代码现在应该可以在$GOPATH/src/github.com/last-ent/distributed-go
中找到,每章的特定代码将在该特定章节编号的目录中找到。
例如,
第一章的代码 -> $GOPATH/src/github.com/last-ent/distributed-go/chapter1
第二章的代码 -> $GOPATH/src/github.com/last-ent/distributed-go/chapter2
等等。
每当我们在任何特定章节中讨论代码时,都意味着我们在相应章节的文件夹中。
容器
在整本书中,我们将编写 Go 程序,这些程序将被编译为二进制文件,并直接在我们的系统上运行。然而,在后面的章节中,我们将使用docker-compose
来构建和运行多个 Go 应用程序。这些应用程序可以在我们的本地系统上运行而没有任何真正的问题;然而,我们的最终目标是能够在服务器上运行这些程序,并能够通过互联网访问它们。
在 20 世纪 90 年代和 21 世纪初,将应用程序部署到互联网的标准方式是获取服务器实例,将代码或二进制文件复制到实例上,然后启动程序。这在一段时间内运行良好,但很快就开始出现了复杂性。以下是其中一些:
-
在开发人员的机器上运行的代码可能在服务器上无法运行。
-
在服务器实例上运行良好的程序可能在将最新补丁应用到服务器操作系统时失败。
-
作为服务的一部分添加每个新实例时,必须运行各种安装脚本,以便我们可以使新实例与所有其他实例保持一致。这可能是一个非常缓慢的过程。
-
必须特别注意确保新实例及其上安装的所有软件版本与我们的程序使用的 API 兼容。
-
还必须确保所有配置文件和重要的环境变量都被复制到新实例;否则,应用程序可能会在没有或几乎没有线索的情况下失败。
-
通常在本地系统上运行的程序版本与测试系统上运行的程序版本与生产系统上运行的程序版本都配置不同,这意味着我们的应用程序可能会在这三种类型的系统中的一种上失败。如果发生这种情况,我们最终将不得不花费额外的时间和精力来尝试弄清楚问题是否特定于某个实例、某个系统等等。
如果我们能以明智的方式避免这种情况发生,那将是很好的。容器试图使用操作系统级别的虚拟化来解决这个问题。这是什么意思呢?
所有程序和应用程序都在称为用户空间的内存部分中运行。这使操作系统能够确保程序无法引起重大的硬件或软件问题。这使我们能够从用户空间应用程序中可能发生的任何程序崩溃中恢复过来。
容器的真正优势在于它们允许我们在隔离的用户空间中运行应用程序,我们甚至可以自定义用户空间的以下属性:
-
连接的设备,如网络适配器和 TTY
-
CPU 和 RAM 资源
-
主机操作系统可访问的文件和文件夹
然而,这如何帮助我们解决之前提到的问题呢?为此,让我们深入了解一下Docker。
Docker
现代软件开发在产品开发和产品部署到服务器实例中广泛使用容器技术。Docker 是 Docker, Inc(www.docker.com
)推广的容器技术,截至目前为止,它是最广泛使用的容器技术。另一个主要的替代品是由 CoreOS 开发的rkt(coreos.com/rkt
),但在本书中,我们只会关注 Docker。
Docker 与虚拟机(VM)相比
迄今为止,看了 Docker 的描述,我们可能会想它是否是另一个虚拟机。然而,这并不是这样,因为虚拟机需要我们在机器或超级用户之上运行完整的客户操作系统,以及所有所需的二进制文件。在 Docker 的情况下,我们使用操作系统级别的虚拟化,这允许我们在隔离的用户空间中运行我们的容器。
VM 的最大优势是我们可以在系统上运行不同类型的操作系统,例如 Windows、FreeBSD 和 Linux。然而,在 Docker 的情况下,我们可以运行任何 Linux 版本,唯一的限制是它必须是 Linux:
Docker 容器与虚拟机
Docker 容器的最大优势是,由于它在 Linux 上作为一个独立的进程运行,因此它轻量级且不知道主机操作系统的所有功能。
理解 Docker
在我们开始使用 Docker 之前,让我们简要了解一下 Docker 的使用方式,结构以及完整系统的主要组件是什么。
以下列表和附带的图片应该有助于理解 Docker 管道的架构:
-
Dockerfile:它包含了构建运行我们程序的镜像的指令。
-
Docker 客户端:这是用户用来与 Docker 守护程序交互的命令行程序。
-
Docker 守护程序:这是一个守护程序应用程序,用于监听管理构建或运行容器以及将容器推送到 Docker 注册表的命令。它还负责配置容器网络、卷等。
-
Docker 镜像:Docker 镜像包含构建可在安装了 Docker 的任何 Linux 机器上执行的容器二进制文件所需的所有步骤。
-
Docker 注册表:Docker 注册表负责存储和检索 Docker 镜像。我们可以使用公共 Docker 注册表或私有注册表。Docker Hub 被用作默认的 Docker 注册表。
-
Docker 容器:Docker 容器与我们迄今讨论的容器不同。Docker 容器是 Docker 镜像的可运行实例。Docker 容器可以被创建、启动、停止等。
-
Docker API:我们之前讨论过的 Docker 客户端是与 Docker API 交互的命令行界面。这意味着 Docker 守护程序不需要在与 Docker 客户端相同的机器上运行。本书中将使用的默认设置是使用 UNIX 套接字或网络接口与本地系统上的 Docker 守护程序通信:
Docker 架构
测试 Docker 设置
让我们确保我们的 Docker 设置完美运行。对于我们的目的,Docker 社区版应该足够了(www.docker.com/community-edition
)。安装完成后,我们将通过运行一些基本命令来检查它是否正常工作。
让我们首先检查我们安装了什么版本:
$ docker --version
Docker version 17.12.0-ce, build c97c6d6
让我们试着深入了解一下我们的 Docker 安装的细节:
$ docker info
Containers: 38
Running: 0
Paused: 0
Stopped: 38
Images: 24
Server Version: 17.12.0-ce
在 Linux 上,当您尝试运行 docker 命令时,可能会出现Permission denied错误。为了与 Docker 交互,您可以在命令前加上sudo
,或者您可以创建一个“docker”用户组并将您的用户添加到该组中。有关更多详细信息,请参阅链接docs.docker.com/install/linux/linux-postinstall/.
让我们尝试运行一个 Docker 镜像。如果您还记得关于 Docker 注册表的讨论,您就知道我们不需要使用 Dockerfile 构建 Docker 镜像,就可以运行 Docker 容器。我们可以直接从 Docker Hub(默认的 Docker 注册表)拉取它并将镜像作为容器运行:
$ docker run docker/whalesay cowsay Welcome to GopherLand!
Unable to find image 'docker/whalesay:latest' locally
Trying to pull repository docker.io/docker/whalesay ...
sha256:178598e51a26abbc958b8a2e48825c90bc22e641de3d31e18aaf55f3258ba93b: Pulling from docker.io/docker/whalesay
e190868d63f8: Pull complete
909cd34c6fd7: Pull complete
0b9bfabab7c1: Pull complete
a3ed95caeb02: Pull complete
00bf65475aba: Pull complete
c57b6bcc83e3: Pull complete
8978f6879e2f: Pull complete
8eed3712d2cf: Pull complete
Digest: sha256:178598e51a26abbc958b8a2e48825c90bc22e641de3d31e18aaf55f3258ba93b
Status: Downloaded newer image for docker.io/docker/whalesay:latest
________________________
< Welcome to GopherLand! >
------------------------
\
\
\
## .
## ## ## ==
## ## ## ## ===
/""""""""""""""""___/ ===
~~~ {~~ ~~~~ ~~~ ~~~~ ~~ ~ / ===- ~~~
\______ o __/
\ __/
\__________/
前面的命令也可以像这样执行,只需使用docker run ...
,这更方便:
$ docker pull docker/whalesay & docker run docker/whalesay cowsay Welcome to GopherLand!
一旦我们有了一长串构建的镜像,我们可以列出它们所有,同样也适用于 Docker 容器:
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
docker.io/docker/whalesay latest 6b362a9f73eb 2 years ago 247 MB
$ docker container ls --all
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a1b1efb42130 docker/whalesay "cowsay Welcome to..." 5 minutes ago Exited (0) 5 minutes ago frosty_varahamihira
最后,值得注意的是,随着我们不断使用 docker 来构建和运行镜像和容器,我们将开始创建一堆“悬空”的镜像,我们可能不会再真正使用。但是,它们最终会占用存储空间。为了摆脱这样的“悬空”镜像,我们可以使用以下命令:
$ docker rmi --force 'docker images -q -f dangling=true'
# list of hashes for all deleted images.
Dockerfile
现在我们已经掌握了 Docker 的基础知识,让我们来看看在本书中将用作模板的Dockerfile
文件。
接下来,让我们看一个例子:
FROM golang:1.10
# The base image we want to use to build our docker image from.
# Since this image is specialized for golang it will have GOPATH = /go
ADD . /go/src/hello
# We copy files & folders from our system onto the docker image
RUN go install hello
# Next we can create an executable binary for our project with the command,
'go install' ENV NAME Bob
# Environment variable NAME will be picked up by the program 'hello'
and printed to console.ENTRYPOINT /go/bin/hello
# Command to execute when we start the container # EXPOSE 9000 # Generally used for network applications. Allows us to connect to the
application running inside the container from host system's localhost.
main.go
让我们创建一个最基本的 Go 程序,这样我们就可以在 Docker 镜像中使用它。它将获取NAME
环境变量并打印<NAME> is your uncle.
然后退出:
package main
import (
"fmt"
"os"
)
func main() {
fmt.Println(os.Getenv("NAME") + " is your uncle.")
}
现在我们已经把所有的代码都放好了,让我们使用Dockerfile
文件构建 Docker 镜像:
$ cd docker
$ tree
.
├── Dockerfile
└── main.go"
0 directories, 2 files $ # -t tag lets us name our docker images so that we can easily refer to them $ docker build . -t hello-uncle Sending build context to Docker daemon 3.072 kB Step 1/5 : FROM golang:1.9.1 ---> 99e596fc807e Step 2/5 : ADD . /go/src/hello ---> Using cache ---> 64d080d7eb39 Step 3/5 : RUN go install hello ---> Using cache ---> 13bd4a1f2a60 Step 4/5 : ENV NAME Bob ---> Using cache ---> cc432fe8ffb4 Step 5/5 : ENTRYPOINT /go/bin/hello ---> Using cache ---> e0bbfb1fe52b Successfully built e0bbfb1fe52b $ # Let's now try to run the docker image. $ docker run hello-uncle Bob is your uncle. $ # We can also change the environment variables on the fly. $ docker run -e NAME=Sam hello-uncle Sam is your uncle.
在 Go 中进行测试
测试是编程的重要部分,无论是在 Go 中还是在任何其他语言中。Go 有一种直接的方法来编写测试,在本节中,我们将看一些重要的工具来帮助测试。
我们需要遵循一些规则和约定来测试我们的代码。它们可以列举如下:
-
源文件和相关的测试文件放置在同一个包/文件夹中
-
任何给定源文件的测试文件的名称是
<source-file-name>_test.go
-
测试函数需要以"Test"前缀开头,并且函数名的下一个字符应该是大写的
在本节的其余部分,我们将查看三个文件及其相关的测试:
-
variadic.go
和variadic_test.go
-
addInt.go
和addInt_test.go
-
nil_test.go
(这些测试没有任何源文件)
在此过程中,我们将介绍我们可能使用的任何进一步的概念。
variadic.go
为了理解第一组测试,我们需要了解什么是变参函数以及 Go 如何处理它。让我们从定义开始:
Variadic 函数是在函数调用期间可以接受任意数量的参数的函数。
鉴于 Go 是一种静态类型语言,对变参函数的唯一限制是传递给它的不定数量的参数应该是相同的数据类型。但是,这并不限制我们传递其他变量类型。如果传递了参数,则函数将接收到一个元素的切片,否则为nil
。
让我们看一下代码,以便更好地理解:
// variadic.go
package main
func simpleVariadicToSlice(numbers ...int) []int {
return numbers
}
func mixedVariadicToSlice(name string, numbers ...int) (string, []int) {
return name, numbers
}
// Does not work.
// func badVariadic(name ...string, numbers ...int) {}
我们在数据类型之前使用...
前缀来定义函数作为变参函数。请注意,每个函数只能有一个变参参数,并且它必须是最后一个参数。如果我们取消注释badVariadic
行并尝试测试代码,我们会看到这个错误。
variadic_test.go
我们想要测试两个有效的函数,simpleVariadicToSlice
和mixedVariadicToSlice
,以验证前一节中定义的各种规则。但是,为了简洁起见,我们将测试这些:
-
simpleVariadicToSlice
:这是为了没有参数,三个参数,以及查看如何将切片传递给变参函数 -
mixedVariadicToSlice
:这是为了接受一个简单的参数和一个变参参数
现在让我们看一下测试这两个函数的代码:
// variadic_test.go
package main
import "testing"
func TestSimpleVariadicToSlice(t *testing.T) {
// Test for no arguments
if val := simpleVariadicToSlice(); val != nil {
t.Error("value should be nil", nil)
} else {
t.Log("simpleVariadicToSlice() -> nil")
}
// Test for random set of values
vals := simpleVariadicToSlice(1, 2, 3)
expected := []int{1, 2, 3}
isErr := false
for i := 0; i < 3; i++ {
if vals[i] != expected[i] {
isErr = true
break
}
}
if isErr {
t.Error("value should be []int{1, 2, 3}", vals)
} else {
t.Log("simpleVariadicToSlice(1, 2, 3) -> []int{1, 2, 3}")
}
// Test for a slice
vals = simpleVariadicToSlice(expected...)
isErr = false
for i := 0; i < 3; i++ {
if vals[i] != expected[i] {
isErr = true
break
}
}
if isErr {
t.Error("value should be []int{1, 2, 3}", vals)
} else {
t.Log("simpleVariadicToSlice([]int{1, 2, 3}...) -> []int{1, 2, 3}")
}
}
func TestMixedVariadicToSlice(t *testing.T) {
// Test for simple argument & no variadic arguments
name, numbers := mixedVariadicToSlice("Bob")
if name == "Bob" && numbers == nil {
t.Log("Recieved as expected: Bob, <nil slice>")
} else {
t.Errorf("Received unexpected values: %s, %s", name, numbers)
}
}
在variadic_test.go
中运行测试
让我们运行这些测试并查看输出。在运行测试时,我们将使用-v
标志来查看每个单独测试的输出:
$ go test -v ./{variadic_test.go,variadic.go}
=== RUN TestSimpleVariadicToSlice
--- PASS: TestSimpleVariadicToSlice (0.00s)
variadic_test.go:10: simpleVariadicToSlice() -> nil
variadic_test.go:26: simpleVariadicToSlice(1, 2, 3) -> []int{1, 2, 3}
variadic_test.go:41: simpleVariadicToSlice([]int{1, 2, 3}...) -> []int{1, 2, 3}
=== RUN TestMixedVariadicToSlice
--- PASS: TestMixedVariadicToSlice (0.00s)
variadic_test.go:49: Received as expected: Bob, <nil slice>
PASS
ok command-line-arguments 0.001s
addInt.go
variadic_test.go
中的测试详细说明了变参函数的规则。但是,您可能已经注意到TestSimpleVariadicToSlice
在其函数体中运行了三个测试,但go test
将其视为单个测试。Go 提供了一种很好的方法来在单个函数内运行多个测试,我们将在addInt_test.go
中查看它们。
对于这个例子,我们将使用一个非常简单的函数,如下所示:
// addInt.go
package main
func addInt(numbers ...int) int {
sum := 0
for _, num := range numbers {
sum += num
}
return sum
}
addInt_test.go
您可能还注意到在TestSimpleVariadicToSlice
中,我们重复了很多逻辑,而唯一变化的因素是输入和期望值。一种测试风格,称为表驱动开发,定义了运行测试所需的所有数据的表,迭代表的“行”,并对它们运行测试。
让我们看一下我们将要测试的没有参数和变参参数:
// addInt_test.go
package main
import (
"testing"
)
func TestAddInt(t *testing.T) {
testCases := []struct {
Name string
Values []int
Expected int
}{
{"addInt() -> 0", []int{}, 0},
{"addInt([]int{10, 20, 100}) -> 130", []int{10, 20, 100}, 130},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
sum := addInt(tc.Values...)
if sum != tc.Expected {
t.Errorf("%d != %d", sum, tc.Expected)
} else {
t.Logf("%d == %d", sum, tc.Expected)
}
})
}
}
在 addInt_test.go 中运行测试
现在让我们运行这个文件中的测试,并且我们期望testCases
表中的每一行被视为一个单独的测试:
$ go test -v ./{addInt.go,addInt_test.go}
=== RUN TestAddInt
=== RUN TestAddInt/addInt()_->_0
=== RUN TestAddInt/addInt([]int{10,_20,_100})_->_130
--- PASS: TestAddInt (0.00s)
--- PASS: TestAddInt/addInt()_->_0 (0.00s)
addInt_test.go:23: 0 == 0
--- PASS: TestAddInt/addInt([]int{10,_20,_100})_->_130 (0.00s)
addInt_test.go:23: 130 == 130
PASS
ok command-line-arguments 0.001s
nil_test.go
我们还可以创建不特定于任何特定源文件的测试;唯一的标准是文件名需要采用<text>_test.go
的形式。nil_test.go
中的测试阐明了语言的一些有用特性,开发人员在编写测试时可能会发现有用。它们如下:
-
httptest.NewServer
: 想象一下我们需要针对发送数据的服务器测试我们的代码的情况。启动和协调一个完整的服务器来访问一些数据是困难的。http.NewServer
为我们解决了这个问题。 -
t.Helper
:如果我们使用相同的逻辑来通过或失败很多testCases
,将这个逻辑分离到一个单独的函数中是有意义的。然而,这会扭曲测试运行调用堆栈。我们可以通过注释测试中的t.Helper()
并重新运行go test
来看到这一点。
我们还可以格式化我们的命令行输出以打印漂亮的结果。我们将展示一个简单的例子,为通过的案例添加一个勾号,为失败的案例添加一个叉号。
在测试中,我们将运行一个测试服务器,在其上进行 GET 请求,然后测试预期输出与实际输出:
// nil_test.go
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)
const passMark = "\u2713"
const failMark = "\u2717"
func assertResponseEqual(t *testing.T, expected string, actual string) {
t.Helper() // comment this line to see tests fail due to 'if expected != actual'
if expected != actual {
t.Errorf("%s != %s %s", expected, actual, failMark)
} else {
t.Logf("%s == %s %s", expected, actual, passMark)
}
}
func TestServer(t *testing.T) {
testServer := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
path := r.RequestURI
if path == "/1" {
w.Write([]byte("Got 1."))
} else {
w.Write([]byte("Got None."))
}
}))
defer testServer.Close()
for _, testCase := range []struct {
Name string
Path string
Expected string
}{
{"Request correct URL", "/1", "Got 1."},
{"Request incorrect URL", "/12345", "Got None."},
} {
t.Run(testCase.Name, func(t *testing.T) {
res, err := http.Get(testServer.URL + testCase.Path)
if err != nil {
t.Fatal(err)
}
actual, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
t.Fatal(err)
}
assertResponseEqual(t, testCase.Expected, fmt.Sprintf("%s", actual))
})
}
t.Run("Fail for no reason", func(t *testing.T) {
assertResponseEqual(t, "+", "-")
})
}
在 nil_test.go 中运行测试
我们运行了三个测试,其中两个测试案例将通过,一个将失败。这样我们就可以看到勾号和叉号的效果。
$ go test -v ./nil_test.go
=== RUN TestServer
=== RUN TestServer/Request_correct_URL
=== RUN TestServer/Request_incorrect_URL
=== RUN TestServer/Fail_for_no_reason
--- FAIL: TestServer (0.00s)
--- PASS: TestServer/Request_correct_URL (0.00s)
nil_test.go:55: Got 1\. == Got 1\. ![](https://gitee.com/OpenDocCN/freelearn-golang-zh/raw/master/docs/dist-cmp-go/img/1a82adfd-2d48-47fe-8d7d-776e1ae5d133.png)
--- PASS: TestServer/Request_incorrect_URL (0.00s)
nil_test.go:55: Got None. == Got None. ![](https://gitee.com/OpenDocCN/freelearn-golang-zh/raw/master/docs/dist-cmp-go/img/1a82adfd-2d48-47fe-8d7d-776e1ae5d133.png)
--- FAIL: TestServer/Fail_for_no_reason (0.00s)
nil_test.go:59: + != - ![](https://gitee.com/OpenDocCN/freelearn-golang-zh/raw/master/docs/dist-cmp-go/img/5270c9e7-2a17-4ce4-bdd5-4b72eb407085.jpg)
FAIL exit status 1 FAIL command-line-arguments 0.003s
总结
在本章中,我们首先看了成功运行 Go 项目的基本设置。然后我们看了如何为我们的 Go 项目安装依赖以及如何构建项目结构。我们还研究了容器背后的重要概念,它们解决了什么问题,以及我们将如何在本书中使用它们以及一个示例。接下来,我们看了如何在 Go 中编写测试,并且在这个过程中,我们学到了一些有趣的概念,比如处理可变参数函数和其他有用的测试函数。
在下一章中,我们将开始研究 Go 编程的核心基础之一——goroutines 以及在使用它们时需要牢记的重要细节。
第二章:理解 Goroutines
在过去的十年里,软件开发和编程已经取得了相当大的进步。许多以前被认为是学术和低效的概念开始在现代软件解决方案中找到位置。其中两个概念是协程(Go 中的 goroutines)和通道。从概念上讲,它们随着时间的推移而发展,并且它们在每种编程语言中的实现方式也不同。在许多编程语言中,比如 Ruby 或 Clojure,它们被实现为库,但在 Go 中,它们作为一种本地特性在语言中实现。正如我们将看到的,这使得该语言真正现代化,相当高效,并且是一种先进的编程语言。
在本章中,我们将通过查看 goroutines 和以下主题来尝试理解 Go:
-
并发和并行
-
Go 的运行时调度程序
-
在使用 goroutines 时要注意的事项
并发和并行
计算机和软件程序很有用,因为它们可以快速完成大量繁重的工作,还可以同时做多件事情。我们希望我们的程序能够同时做多件事情,也就是说,多任务处理,编程语言的成功可能取决于编写和理解多任务处理程序的难易程度。
并发和并行是我们在研究多任务处理时经常遇到的两个术语,它们经常被互换使用。然而,它们意味着两个截然不同的事情。
Go 博客上给出的标准定义(blog.golang.org/concurrency-is-not-parallelism
)如下:
-
并发性:并发性是指同时处理很多事情。这意味着我们在一段时间内设法同时完成多项任务。但是,我们一次只做一件事。这往往发生在一个任务在等待时,程序决定在空闲时间运行另一个任务。在下图中,这是通过在蓝色任务的空闲时段运行黄色任务来表示的。
-
并行性:并行性是指同时做很多事情。这意味着即使我们有两个任务,它们也在不间断地工作,没有任何间断。在图中,这表明绿色任务是独立运行的,并且不受红色任务的影响:
重要的是要理解这两个术语之间的区别。让我们通过一些具体的例子来进一步阐述两者之间的区别。
并发
让我们通过一个简单的例子来看看并发的概念,以及我们如何执行一些日常例行任务。
想象一下你开始一天,需要完成六件事:
-
预订酒店。
-
预订机票
-
订购一件连衣裙
-
支付信用卡账单
-
写电子邮件
-
听有声读物
完成它们的顺序并不重要,对于一些任务,比如写电子邮件或听有声读物,你不需要一次完成它们。以下是完成任务的一种可能方式:
-
订购一件连衣裙。
-
写电子邮件的三分之一。
-
预订酒店。
-
听 10 分钟的有声读物。
-
支付信用卡账单。
-
写电子邮件的另外三分之一。
-
预订机票。
-
听 20 分钟的有声读物。
-
完成写电子邮件。
-
继续听有声读物直到入睡。
在编程术语中,我们同时执行了上述任务。我们度过了一整天,从任务列表中选择了特定的任务,并开始处理它们。对于某些任务,我们甚至决定将它们分成几部分,在其他任务之间处理这些部分。
最终我们将编写一个程序,以并发的方式执行所有前面的步骤,但让我们一步一步来。让我们首先构建一个按顺序执行任务的程序,然后逐渐修改它,直到它成为纯并发代码并使用 goroutines。程序的进展将分为三个步骤:
-
串行任务执行。
-
使用 goroutines 的串行任务执行。
-
并发任务执行。
代码概述
代码将由一组打印出其分配任务的函数组成。在写电子邮件或听有声读物的情况下,我们进一步将任务细分为更多函数。具体如下:
-
writeMail
,continueWritingMail1
,continueWritingMail2
-
listenToAudioBook
,continueListeningToAudioBook
串行任务执行
让我们首先实现一个以线性方式执行所有任务的程序。根据我们之前讨论的代码概述,以下代码应该很简单:
package main
import (
"fmt"
)
// Simple individual tasks
func makeHotelReservation() {
fmt.Println("Done making hotel reservation.")
}
func bookFlightTickets() {
fmt.Println("Done booking flight tickets.")
}
func orderADress() {
fmt.Println("Done ordering a dress.")
}
func payCreditCardBills() {
fmt.Println("Done paying Credit Card bills.")
}
// Tasks that will be executed in parts
// Writing Mail
func writeAMail() {
fmt.Println("Wrote 1/3rd of the mail.")
continueWritingMail1()
}
func continueWritingMail1() {
fmt.Println("Wrote 2/3rds of the mail.")
continueWritingMail2()
}
func continueWritingMail2() {
fmt.Println("Done writing the mail.")
}
// Listening to Audio Book
func listenToAudioBook() {
fmt.Println("Listened to 10 minutes of audio book.")
continueListeningToAudioBook()
}
func continueListeningToAudioBook() {
fmt.Println("Done listening to audio book.")
}
// All the tasks we want to complete in the day.
// Note that we do not include the sub tasks here.
var listOfTasks = []func(){
makeHotelReservation, bookFlightTickets, orderADress,
payCreditCardBills, writeAMail, listenToAudioBook,
}
func main() {
for _, task := range listOfTasks {
task()
}
}
我们接受每个主要任务,并按简单的顺序开始执行它们。执行上述代码应该产生预期之外的输出,如下所示:
Done making hotel reservation.
Done booking flight tickets.
Done ordering a dress.
Done paying Credit Card bills.
Wrote 1/3rd of the mail.
Wrote 2/3rds of the mail.
Done writing the mail.
Listened to 10 minutes of audio book.
Done listening to audio book.
使用 goroutines 进行串行任务执行
我们列出了一系列任务,并编写了一个程序以线性和顺序的方式执行它们。但是,我们希望同时执行这些任务!让我们首先为分割任务引入 goroutines,看看效果如何。我们只会展示代码片段,其中代码实际上发生了变化:
/********************************************************************
We start by making Writing Mail & Listening Audio Book concurrent.
*********************************************************************/
// Tasks that will be executed in parts
// Writing Mail
func writeAMail() {
fmt.Println("Wrote 1/3rd of the mail.")
go continueWritingMail1() // Notice the addition of 'go' keyword.
}
func continueWritingMail1() {
fmt.Println("Wrote 2/3rds of the mail.")
go continueWritingMail2() // Notice the addition of 'go' keyword.
}
func continueWritingMail2() {
fmt.Println("Done writing the mail.")
}
// Listening to Audio Book
func listenToAudioBook() {
fmt.Println("Listened to 10 minutes of audio book.")
go continueListeningToAudioBook() // Notice the addition of 'go' keyword.
}
func continueListeningToAudioBook() {
fmt.Println("Done listening to audio book.")
}
以下是可能的输出:
Done making hotel reservation.
Done booking flight tickets.
Done ordering a dress.
Done paying Credit Card bills.
Wrote 1/3rd of the mail.
Listened to 10 minutes of audio book.
哎呀!这不是我们期望的。continueWritingMail1
,continueWritingMail2
和continueListeningToAudioBook
函数的输出缺失;原因是我们使用了 goroutines。由于 goroutines 没有等待,main
函数中的代码继续执行,一旦控制流到达main
函数的末尾,程序就会结束。我们真正想做的是在main
函数中等待,直到所有 goroutines 都执行完毕。我们可以通过两种方式实现这一点——使用通道或使用WaitGroup
。由于我们有第三章,通道和消息专门讨论通道,让我们在本节中使用WaitGroup
。
为了使用WaitGroup
,我们必须记住以下几点:
-
使用
WaitGroup.Add(int)
来计算我们将作为逻辑的一部分运行多少 goroutines。 -
使用
WaitGroup.Done()
来表示 goroutine 完成了其任务。 -
使用
WaitGroup.Wait()
来等待直到所有 goroutines 都完成。 -
将
WaitGroup
实例传递给 goroutines,以便它们可以调用Done()
方法。
基于这些观点,我们应该能够修改源代码以使用WaitGroup
。以下是更新后的代码:
package main
import (
"fmt"
"sync"
)
// Simple individual tasks
func makeHotelReservation(wg *sync.WaitGroup) {
fmt.Println("Done making hotel reservation.")
wg.Done()
}
func bookFlightTickets(wg *sync.WaitGroup) {
fmt.Println("Done booking flight tickets.")
wg.Done()
}
func orderADress(wg *sync.WaitGroup) {
fmt.Println("Done ordering a dress.")
wg.Done()
}
func payCreditCardBills(wg *sync.WaitGroup) {
fmt.Println("Done paying Credit Card bills.")
wg.Done()
}
// Tasks that will be executed in parts
// Writing Mail
func writeAMail(wg *sync.WaitGroup) {
fmt.Println("Wrote 1/3rd of the mail.")
go continueWritingMail1(wg)
}
func continueWritingMail1(wg *sync.WaitGroup) {
fmt.Println("Wrote 2/3rds of the mail.")
go continueWritingMail2(wg)
}
func continueWritingMail2(wg *sync.WaitGroup) {
fmt.Println("Done writing the mail.")
wg.Done()
}
// Listening to Audio Book
func listenToAudioBook(wg *sync.WaitGroup) {
fmt.Println("Listened to 10 minutes of audio book.")
go continueListeningToAudioBook(wg)
}
func continueListeningToAudioBook(wg *sync.WaitGroup) {
fmt.Println("Done listening to audio book.")
wg.Done()
}
// All the tasks we want to complete in the day.
// Note that we do not include the sub tasks here.
var listOfTasks = []func(*sync.WaitGroup){
makeHotelReservation, bookFlightTickets, orderADress,
payCreditCardBills, writeAMail, listenToAudioBook,
}
func main() {
var waitGroup sync.WaitGroup
// Set number of effective goroutines we want to wait upon
waitGroup.Add(len(listOfTasks))
for _, task := range listOfTasks{
// Pass reference to WaitGroup instance
// Each of the tasks should call on WaitGroup.Done()
task(&waitGroup)
}
// Wait until all goroutines have completed execution.
waitGroup.Wait()
}
以下是一种可能的输出顺序;请注意continueWritingMail1
和continueWritingMail2
在listenToAudioBook
和continueListeningToAudioBook
之后执行:
Done making hotel reservation.
Done booking flight tickets.
Done ordering a dress.
Done paying Credit Card bills.
Wrote 1/3rd of the mail.
Listened to 10 minutes of audio book.
Done listening to audio book.
Wrote 2/3rds of the mail.
Done writing the mail.
并发任务执行
在上一节的最终输出中,我们可以看到listOfTasks
中的所有任务都是按顺序执行的,最大并发的最后一步是让顺序由 Go 运行时决定,而不是由listOfTasks
中的顺序。这听起来可能是一项费力的任务,但实际上这是非常简单实现的。我们只需要在task(&waitGroup)
前面加上go
关键字:
func main() {
var waitGroup sync.WaitGroup
// Set number of effective goroutines we want to wait upon
waitGroup.Add(len(listOfTasks))
for _, task := range listOfTasks {
// Pass reference to WaitGroup instance
// Each of the tasks should call on WaitGroup.Done()
go task(&waitGroup) // Achieving maximum concurrency
}
// Wait until all goroutines have completed execution.
waitGroup.Wait()
以下是可能的输出:
Listened to 10 minutes of audio book.
Done listening to audio book.
Done booking flight tickets.
Done ordering a dress.
Done paying Credit Card bills.
Wrote 1/3rd of the mail.
Wrote 2/3rds of the mail.
Done writing the mail.
Done making hotel reservation.
如果我们看一下这种可能的输出,任务是按以下顺序执行的:
-
听有声读物。
-
预订机票。
-
订购一件连衣裙。
-
支付信用卡账单。
-
写一封电子邮件。
-
预订酒店。
现在我们对并发是什么以及如何使用goroutines
和WaitGroup
编写并发代码有了一个很好的了解,让我们深入了解并行性。
并行性
想象一下,你需要写几封电子邮件。它们将会很长、很费力,而让自己保持愉快的最好方法是在写邮件的同时听音乐,也就是说,在“并行”写邮件的同时听音乐。如果我们想编写一个模拟这种情况的程序,以下是一种可能的实现:
package main
import (
"fmt"
"sync"
"time"
)
func printTime(msg string) {
fmt.Println(msg, time.Now().Format("15:04:05"))
}
// Task that will be done over time
func writeMail1(wg *sync.WaitGroup) {
printTime("Done writing mail #1.")
wg.Done()
}
func writeMail2(wg *sync.WaitGroup) {
printTime("Done writing mail #2.")
wg.Done()
}
func writeMail3(wg *sync.WaitGroup) {
printTime("Done writing mail #3.")
wg.Done()
}
// Task done in parallel
func listenForever() {
for {
printTime("Listening...")
}
}
func main() {
var waitGroup sync.WaitGroup
waitGroup.Add(3)
go listenForever()
// Give some time for listenForever to start
time.Sleep(time.Nanosecond * 10)
// Let's start writing the mails
go writeMail1(&waitGroup)
go writeMail2(&waitGroup)
go writeMail3(&waitGroup)
waitGroup.Wait()
}
程序的输出可能如下:
Done writing mail #3\. 19:32:57
Listening... 19:32:57
Listening... 19:32:57
Done writing mail #1\. 19:32:57
Listening... 19:32:57
Listening... 19:32:57
Done writing mail #2\. 19:32:57
数字代表时间,以小时:分钟:秒
表示,可以看到它们是并行执行的。您可能已经注意到,并行代码看起来几乎与最终并发示例的代码相同。然而,在listenForever
函数中,我们在一个无限循环中打印Listening...
。如果前面的示例没有使用协程编写,输出将继续打印Listening...
,永远不会到达writeMail
函数调用。
现在我们了解了如何使用协程来运行并发程序,让我们看看 Go 是如何允许我们做到这一点的。接下来我们将看一下 Go 运行时使用的调度器。
Go 的运行时调度器
Go 程序连同运行时在多个 OS 线程上进行管理和执行。运行时使用一种称为M:N调度器的调度策略,它将 M 个协程调度到 N 个 OS 线程上。因此,每当我们需要运行或切换到不同的协程时,上下文切换将会很快,这也使我们能够利用 CPU 的多个核进行并行计算。
对 Go 的运行时和调度器有一个扎实的理解会非常有趣和有用,现在是一个详细了解它们的好时机。
从 Go 调度器的角度来看,主要有三个实体:
-
协程(G)
-
OS 线程或机器(M)
-
上下文或处理器(P)
让我们看看它们做了什么。我们还将查看这些实体的部分结构定义,以便更好地了解调度是如何实现和运行的。
协程
它是包含程序/函数实际指令的逻辑执行单元。它还包含有关协程的其他重要信息,例如堆栈内存、它正在运行的机器(M)以及调用它的 Go 函数。以下是协程结构中可能有用的一些元素:
// Denoted as G in runtime
type g struct {
stack stack // offset known to runtime/cgo
m *m // current m; offset known to arm liblink
goid int64
waitsince int64 // approx time when the g become blocked
waitreason string // if status==Gwaiting
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
timer *timer // cached timer for time.Sleep
// ...
}
一个有趣的事情是,当我们的 Go 程序启动时,首先启动一个名为主协程的协程,它负责在启动我们的程序之前设置运行时空间。典型的运行时设置可能包括最大堆栈大小、启用垃圾回收等。
OS 线程或机器
最初,OS 线程或机器由 OS 创建和管理。随后,调度器可以请求创建或销毁更多的 OS 线程或机器。这是协程将要执行的实际资源。它还维护有关主协程、当前正在其上运行的 G、线程本地存储(tls)等信息:
// Denoted as M in runtime
type m struct {
g0 *g // goroutine with scheduling stack
tls [6]uintptr // thread-local storage (for x86 extern register)
curg *g // current running goroutine
p puintptr // attached p for executing go code (nil if not executing go code)
id int32
createstack [32]uintptr // stack that created this thread.
spinning bool // m is out of work and is actively looking for work
// ...
}
上下文或处理器
我们有一个全局调度器负责启动新的 M,注册 G 和处理系统调用。然而,它不处理协程的实际执行。这是由一个名为处理器的实体来完成的,它有自己的内部调度器和一个名为运行队列(代码中的runq
)的队列,其中包含将在当前上下文中执行的协程。它还处理在各种协程之间的切换等:
// Denoted as P in runtime code
type p struct {
id int32
m muintptr // back-link to associated m (nil if idle)
runq [256]guintptr
//...
}
从 Go 1.5 开始,Go 运行时可以在程序生命周期的任何时刻运行最大数量的GOMAXPROCS
Ps。当然,我们可以通过设置GOMAXPROCS
环境变量或调用GOMAXPROCS()
函数来更改这个数字。
使用 G、M 和 P 进行调度
当程序准备开始执行时,运行时已经设置好了机器和处理器。运行时会请求操作系统启动足够数量的机器(M),GOMAXPROCS 数量的处理器来执行 goroutine(G)。重要的是要理解 M 是实际的执行单元,G 是逻辑执行单元。然而,它们需要 P 来实际执行 G 对 M。让我们看一个可能的场景来更好地解释调度过程。首先让我们看看我们将在场景中使用的组件:
-
我们有一组准备运行的 M:M1...Mn
-
我们还有两个 P:P1 和 P2,分别带有运行队列—runq1 和 runq2
-
最后但并非最不重要的,我们还有 20 个 goroutine,G1...G20,我们希望作为程序的一部分执行
Go 的运行时和所有组件,M1...Mn,P1 和 P2,以及 G1...G20,如下图所示:
鉴于我们有两个处理器,全局调度器理想情况下会在两个处理器之间平均分配 goroutine。假设 P1 被分配为处理 G1...G10 并将它们放入其运行队列,同样 P2 将 G11...G20 放入其运行队列。接下来,P1 的调度器从其运行队列中弹出一个 goroutine 来运行,G1,选择一个机器来运行它,M1,同样 P2 在 M2 上运行 G11。这可以通过以下图示进行说明:
一个进程的内部调度器还负责将当前的 goroutine 与它想要执行的下一个 goroutine 进行切换。如果一切顺利,调度器会出于以下三个可能的原因之一切换当前的 goroutine:
-
当前执行的时间片已经结束:进程将使用schedtick(每次调度器调用时递增)来跟踪当前 goroutine 执行了多长时间,一旦达到一定的时间限制,当前 goroutine 将被放回运行队列,下一个 goroutine 将被选中执行。
-
执行完成:简而言之,goroutine 已经执行完所有指令。在这种情况下,它不会被放回运行队列。
-
等待系统调用:在某些情况下,goroutine 可能需要进行系统调用,结果会导致 goroutine 被阻塞。鉴于我们有一些处理器,阻塞这样一个昂贵的资源是没有意义的。好消息是,在 Go 中,处理器不需要等待系统调用;相反,它可以离开等待的 M 和 G 组合,系统调用后会被全局调度器接管。与此同时,处理器可以从可用的机器中选择另一个 M,从其运行队列中选择另一个 goroutine,并开始执行。这可以通过以下图示进行解释:
前面的图解释了处理器 P1 在机器 M1 上运行 goroutine G1。现在 G1 将开始进行系统调用。这可以通过以下图示进行说明:
前面的图解释了处理器 P1 由于系统调用从机器 M1 和 goroutine G1 中分离。P1 选择一个新的机器 M5,并选择一个新的 goroutine G9 来执行:
在前面的图中,G1-M1 系统调用已经完成。现在 G1 被放回 P1 的运行队列,M1 被添加到空闲机器的集合中。
在本节的最后部分,我们将讨论调度器中实施的另一种策略,称为work-stealing。
假设处理器 P1 有 10 个 goroutines,P2 有 10 个 goroutines。然而,事实证明 P1 中的 goroutines 很快就完成了,现在 P1 的运行队列中没有 goroutines 了。如果 P1 空闲并等待全局调度器提供更多工作,那将是一场悲剧。通过工作窃取策略的帮助,P1 开始与其他处理器进行检查,如果另一个处理器的运行队列中有 goroutines,它将“窃取”其中一半并开始执行它们。这确保了我们最大程度地利用了程序的 CPU 使用率。让我们提出两个有趣的问题:
-
如果一个处理器意识到它无法再窃取任何任务怎么办?处理器会等待一小段时间,期望有新的 goroutines,如果没有创建,处理器就会被终止。
-
处理器能否窃取超过一半的运行队列?即使我们有很多处理器在工作,工作窃取策略也总是会窃取目标处理器运行队列的一半。
这可以用以下图示说明:
上图显示了两个处理器 P1 和 P2,在两台机器上执行各自运行队列中的一个 goroutine。假设当 P1 在运行时,处理器 P2 的任务已经完成。如下图所示:
处理器 P2 已经耗尽了它的运行队列,没有更多的 goroutines 可以执行。多亏了工作窃取策略,P2 已经“窃取”了 P1 运行队列中一半的 goroutines,并可以开始执行它们,如下图所示:
在使用 goroutines 时要注意的事项
到这个点,我们应该已经对 goroutines 和调度器的工作原理有了很好的理解。现在让我们来看看在使用 goroutines 时可能会让我们感到意外的一些事情。
单个 goroutine 使整个程序停止
我们知道 goroutines 在多个线程和多个核心上运行。那么当一个线程发生 panic 时会发生什么?下面是一个可以让我们模拟这种情况的例子。我们将创建许多类似的 goroutines,它们的唯一目的是取一个数字,并在从分母减去 10 后将其除以自身。这对大多数情况都有效,除了当数字是10
时。以下代码实现了所描述的功能:
package main
import (
"fmt"
"sync"
)
func simpleFunc(index int, wg *sync.WaitGroup) {
// This line should fail with Divide By Zero when index = 10
fmt.Println("Attempting x/(x-10) where x = ", index, " answer is : ", index/(index-10))
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(40)
for i := 0; i < 40; i += 1 {
go func(j int) {
simpleFunc(j, &wg)
}(i)
}
wg.Wait()
}
先前代码的输出可能如下所示:
Attempting x/(x-10) where x = 39 answer is : 1 Attempting x/(x-10) where x = 20 answer is : 2... Attempting x/(x-10) where x = 37 answer is : 1 Attempting x/(x-10) where x = 11 answer is : 11 panic: runtime error: integer divide by zerogoroutine 15 [running]:main.simpleFunc(0xa, 0xc42000e280) ...exit status 2
基本上,许多 goroutines 被放入运行队列中,并以随机顺序执行,它们的输出被打印到控制台。然而,一旦执行了索引==10 的 goroutine,它引发了一个 panic,该 panic 没有被函数处理,导致整个程序停止并以状态码2
退出。这表明即使一个未被处理的错误或 panic 也会使整个程序停止!
然而,如果因为我们遇到了一个我们本来可以优雅处理的 panic 而导致程序崩溃是没有意义的。Go 允许我们使用一个名为recover
的适当命名的函数从 panic 中恢复。让我们看看如何在先前的代码示例中使用recover
:
package main
import (
"fmt"
"sync"
)
func simpleFunc(index int, wg *sync.WaitGroup) {
// functions with defer keyword are executed at the end of the function
// regardless of whether the function was executed successfully or not.
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from", r)
}
}()
// We have changed the order of when wg.Done is called because
// we should call upon wg.Done even if the following line fails.
// Whether a defer function exists or not is dependent on whether it is registered
// before or after the failing line of code.
defer wg.Done()
// This line should fail with Divide By Zero when index = 10
fmt.Println("Attempting x/(x-10) where x = ", index, " answer is : ", index/(index-10))
}
func main() {
var wg sync.WaitGroup
wg.Add(40)
for i := 0; i < 40; i += 1 {
go func(j int) {
simpleFunc(j, &wg)
}(i)
}
wg.Wait()
}
先前代码的输出可能如下所示:
Attempting x/(x-10) where x = 39 answer is : 1 Attempting x/(x-10) where x = 14 answer is : 3 Recovered from runtime error: integer divide by zero Attempting x/(x-10) where x = 3 answer is : 0 ...Attempting x/(x-10) where x = 29 answer is : 1 Attempting x/(x-10) where x = 9 answer is : -9
Goroutines 是不可预测的
在本章中,我们首先看了 Go 如何使我们能够编写并发的代码,并在一定程度上实现并行。然后我们讨论了 Go 如何在机器和处理器上调度 goroutines。我们可能能够推断 goroutines 将如何分布在机器和处理器上,这反过来可能让我们编写非标准或 hacky 的 Go 代码。
考虑并行性部分的代码,我们试图模拟在听音乐的同时写几封电子邮件。以下是代码的输出,供快速参考:
Done writing mail #3\. 19:32:57
Listening... 19:32:57
Listening... 19:32:57
Done writing mail #1\. 19:32:57
Listening... 19:32:57
Listening... 19:32:57
Done writing mail #2\. 19:32:57
现在我们可以很容易地推断出至少有两个 P,其中一个被用于打印Listening...
的 goroutine,而另一个 P 则处理与写邮件相关的 goroutines。
这一切都很好,但考虑一种情况,即GOMAXPROCS
设置为1
,或者系统硬件能力较低,可能导致较少的机器。这可能导致 goroutine 打印Listening...
永远运行,永远不会将控制权交给其他 goroutines。实际上,Go 编译器应该检测到这种情况,并相应地计划 goroutines 的调度。然而,最好是规划我们的代码,这样我们就不必依赖 Go 的调度器及其当前的实现。
总结
Goroutines 是并发的,到一定程度上是并行的;然而,我们应该将它们视为并发。Goroutines 的执行顺序是不可预测的,我们不应该依赖它们按任何特定顺序执行。
我们还应该注意处理 goroutines 中的错误和恐慌,因为即使它们在并行执行,一个 goroutine 中的恐慌也会导致整个程序崩溃。最后,goroutines 可能会在系统调用上阻塞,但这不会阻塞程序的执行,也不会减慢整个程序的性能。
我们看了一些 Go 运行时调度器背后的设计概念,以了解为什么会发生所有这些。
也许你会想为什么我们在本章没有讨论通道。原因是,通过不依赖通道,我们能够以它们最基本的形式来看待 goroutines。这使我们能够更深入地了解 goroutines 的概念和实现。
在下一章中,我们将看一下通道以及它们如何进一步增强 goroutines 的功能。
第三章:通道和消息
在第二章中,理解 Goroutines,我们看到了 goroutines 的工作原理,如何以并发的方式使用它们,以及可能发生的一些常见错误。它们简单易用,但受限于它们只能生成其他 goroutines 并等待系统调用。实际上,goroutines 比前一章展示的更有能力,为了发挥它们的全部潜力,我们需要了解如何使用通道,这是本章的目标。在这里,我们将讨论以下主题:
-
控制并行性
-
通道和数据通信
-
通道的类型
-
关闭和复用通道
控制并行性
我们知道,生成的 goroutines 将尽快开始执行,并以同时的方式执行。然而,当这些 goroutines 需要在一个具有较低限制的共同源上工作时,就会存在固有的风险。这可能导致共同源明显减慢或在某些情况下甚至失败。正如你可能猜到的那样,这在计算机科学领域并不是一个新问题,有许多处理它的方法。正如我们将在整个章节中看到的,Go 提供了一些机制来以简单直观的方式控制并行性。让我们从一个模拟负担共同源问题的例子开始,然后继续解决它。
想象一个收银员需要处理订单,但一天只能处理 10 个订单。让我们看看如何将其作为一个程序来呈现:
// cashier.go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
// ordersProcessed & cashier are declared in main function
// so that cashier has access to shared state variable 'ordersProcessed'.
// If we were to declare the variable inside the 'cashier' function,
// then it's value would be set to zero with every function call.
ordersProcessed := 0
cashier := func(orderNum int) {
if ordersProcessed < 10 {
// Cashier is ready to serve!
fmt.Println("Processing order", orderNum)
ordersProcessed++
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("I am tired! I want to take rest!", orderNum)
}
wg.Done()
}
for i := 0; i < 30; i++ {
// Note that instead of wg.Add(60), we are instead adding 1
// per each loop iteration. Both are valid ways to add to WaitGroup as long as we can ensure the right number of calls.
wg.Add(1)
go func(orderNum int) {
// Making an order
cashier(orderNum)
}(i)
}
wg.Wait()
}
程序的可能输出如下:
Processing order 29
Processing order 22
Processing order 23
Processing order 13
Processing order 24
Processing order 25
Processing order 21
Processing order 26
Processing order 0
Processing order 27
Processing order 14
I am tired! I want to take rest! 28
I am tired! I want to take rest! 1
I am tired! I want to take rest! 7
I am tired! I want to take rest! 8
I am tired! I want to take rest! 2
I am tired! I want to take rest! 15
...
前面的输出显示了一个收银员在接受 10 个订单后不堪重负。然而,值得注意的是,如果你多次运行前面的代码,你可能会得到不同的输出。例如,在某些运行中,所有 30 个订单可能会被处理!
这是因为所谓的竞争条件。数据竞争(或竞争条件)发生在多个参与者(在我们的情况下是 goroutines)试图访问和修改一个共享状态时,这会导致 goroutines 的读写不正确。
我们可以尝试以两种方式解决这个问题:
-
增加订单处理限制
-
增加收银员的数量
增加限制只有在一定程度上是可行的,超过这个限制将会开始降低系统的性能,或者在收银员的情况下,工作既不高效也不 100%准确。相反,通过增加收银员的数量,我们可以开始连续处理更多订单,而不改变限制。有两种方法:
-
没有通道的分布式工作
-
使用通道的分布式工作
没有通道的分布式工作
为了在收银员之间平均分配工作,我们需要预先知道订单的数量,并确保每个收银员接收的工作都在他/她的限制范围内。这不是最实际的解决方案,因为在现实世界的情况下,我们需要跟踪每个收银员处理了多少订单,并将剩余的订单转给其他收银员。然而,在我们寻找正确解决方法之前,让我们花时间更好地理解无控制并行性的问题,并尝试解决它。以下代码尝试以天真的方式解决它,这应该为我们提供一个良好的开始:
// wochan.go
package main
import (
"fmt"
"sync"
)
func createCashier(cashierID int, wg *sync.WaitGroup) func(int) {
ordersProcessed := 0
return func(orderNum int) {
if ordersProcessed < 10 {
// Cashier is ready to serve!
//fmt.Println("Cashier ", cashierID, "Processing order", orderNum, "Orders Processed", ordersProcessed)
fmt.Println(cashierID, "->", ordersProcessed)
ordersProcessed++
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("Cashier ", cashierID, "I am tired! I want to take rest!", orderNum)
}
wg.Done()
}
}
func main() {
cashierIndex := 0
var wg sync.WaitGroup
// cashier{1,2,3}
cashiers := []func(int){}
for i := 1; i <= 3; i++ {
cashiers = append(cashiers, createCashier(i, &wg))
}
for i := 0; i < 30; i++ {
wg.Add(1)
cashierIndex = cashierIndex % 3
func(cashier func(int), i int) {
// Making an order
go cashier(i)
}(cashiers[cashierIndex], i)
cashierIndex++
}
wg.Wait()
}
以下是可能的一个输出:
Cashier 2 Processing order 7
Cashier 1 Processing order 6
Cashier 3 Processing order 8
Cashier 3 Processing order 29
Cashier 1 Processing order 9
Cashier 3 Processing order 2
Cashier 2 Processing order 10
Cashier 1 Processing order 3
...
我们将 30 个可用订单分配给收银员1
、2
和3
,所有订单都成功处理,没有人抱怨累了。但是,请注意,使这项工作需要我们付出很多努力。我们必须创建一个函数生成器来创建收银员,通过cashierIndex
跟踪要使用哪个收银员等等。最糟糕的部分是前面的代码是不正确的!从逻辑上看,它可能看起来是在做我们想要的事情;但是,请注意,我们正在生成多个 goroutine,它们正在处理具有共享状态ordersProcessed
的变量!这就是我们之前讨论的数据竞争。好消息是我们可以在wochan.go
中以两种方式检测到它:
- 在
createCashier
函数中,用fmt.Println(cashierID, "->", ordersProcessed)
替换fmt.Println("Cashier ", cashierID, "Processing order", orderNum)
。以下是一个可能的输出:
3 -> 0
3 -> 1
1 -> 0
...
2 -> 3
3 -> 1 # Cashier 3 sees ordersProcessed as 1 but three lines above, Cashier 3
was at ordersProcessed == 4!
3 -> 5
1 -> 4
1 -> 4 # Cashier 1 sees ordersProcessed == 4 twice.
2 -> 4
2 -> 4 # Cashier 2 sees ordersProcessed == 4 twice.
...
- 前面的观点证明了代码是不正确的;然而,我们不得不猜测代码中可能存在的问题,然后进行验证。Go 为我们提供了工具来检测数据竞争,这样我们就不必担心这类问题。我们只需使用
-race
标志测试、运行、构建或安装包(在运行的情况下是文件)。让我们在我们的程序上运行它并查看输出:
$ go run -race wochan.go
Cashier 1 Processing order 0
Cashier 2 Processing order 1
==================
WARNING: DATA RACE
Cashier 3 Processing order 2
Read at 0x00c4200721a0 by goroutine 10:
main.createCashier.func1()
wochan.go:11 +0x73
Previous write at 0x00c4200721a0 by goroutine 7:
main.createCashier.func1()
wochan.go:14 +0x2a7
Goroutine 10 (running) created at:
main.main.func1()
wochan.go:40 +0x4a
main.main()
wochan.go:41 +0x26e
Goroutine 7 (finished) created at:
main.main.func1()
wochan.go:40 +0x4a
main.main()
wochan.go:41 +0x26e
==================
Cashier 2 Processing order 4
Cashier 3 Processing order 5
==================
WARNING: DATA RACE
Read at 0x00c420072168 by goroutine 9:
main.createCashier.func1()
wochan.go:11 +0x73
Previous write at 0x00c420072168 by goroutine 6:
main.createCashier.func1()
wochan.go:14 +0x2a7
Goroutine 9 (running) created at:
main.main.func1()
wochan.go:40 +0x4a
main.main()
wochan.go:41 +0x26e
Goroutine 6 (finished) created at:
main.main.func1()
wochan.go:40 +0x4a
main.main()
wochan.go:41 +0x26e
==================
Cashier 1 Processing order 3
Cashier 1 Processing order 6
Cashier 2 Processing order 7
Cashier 3 Processing order 8
...
Found 2 data race(s)
exit status 66
如图所示,-race
标志帮助我们检测数据竞争。
这是否意味着当我们有共享状态时我们无法分配我们的任务?当然可以!但是我们需要使用 Go 提供的机制来实现这一目的:
-
互斥锁、信号量和锁
-
通道
互斥锁是一种互斥锁,它为我们提供了一种同步机制,允许只有一个 goroutine 在任何给定时间访问特定的代码或共享状态。正如已经说明的,对于同步问题,我们可以使用互斥锁或通道,Go 建议使用正确的构造来解决正确的问题。然而,在实践中,使用通道为我们提供了更高级的抽象和更大的灵活性,尽管互斥锁也有其用途。因此,在本章和本书中,我们将使用通道。
使用通道进行分布式工作
现在我们对三件事情很确定:我们想要正确地将订单分配给收银员,我们想要确保每个收银员处理正确数量的订单,我们想要使用通道来解决这个问题。在解决使用通道解决收银员问题之前,让我们先看一下通道的基本语法和用法。
什么是通道?
通道是一种通信机制,允许我们在 goroutine 之间传递数据。它是 Go 中的内置数据类型。数据可以使用原始数据类型之一传递,或者我们可以使用结构创建自己的复杂数据类型。
以下是一个简单的示例,演示如何使用通道:
// simchan.go
package main
import "fmt"
// helloChan waits on a channel until it gets some data and then prints the value.
func helloChan(ch <- chan string) {
val := <- ch
fmt.Println("Hello, ", val)
}
func main() {
// Creating a channel
ch := make(chan string)
// A Goroutine that receives data from a channel
go helloChan(ch)
// Sending data to a channel.
ch <- "Bob"
}
如果我们运行前面的代码,它将打印以下输出:
Hello, Bob
使用通道的基本模式可以通过以下步骤来解释:
-
创建通道以接受要处理的数据。
-
启动等待通道数据的 goroutine。
-
然后,我们可以使用
main
函数或其他 goroutine 将数据传递到通道中。 -
监听通道的 goroutine 可以接受数据并处理它们。
使用通道的优势在于多个 goroutine 可以在同一个通道上等待并同时执行任务。
使用 goroutine 解决收银员问题
在尝试解决问题之前,让我们首先制定我们想要实现的目标:
-
创建一个接受所有订单的通道
orderChannel
。 -
启动所需数量的收银员 goroutine,从
orderChannel
接受有限数量的订单。 -
开始将所有订单放入
orderChannel
。
让我们看一个可能的解决方案,试图使用前面的步骤解决收银员问题:
// wichan.go
package main
import (
"fmt"
"sync"
)
func cashier(cashierID int, orderChannel <-chan int, wg *sync.WaitGroup) {
// Process orders upto limit.
for ordersProcessed := 0; ordersProcessed < 10; ordersProcessed++ {
// Retrieve order from orderChannel
orderNum := <-orderChannel
// Cashier is ready to serve!
fmt.Println("Cashier ", cashierID, "Processing order", orderNum, "Orders Processed", ordersProcessed)
wg.Done()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(30)
ordersChannel := make(chan int)
for i := 0; i < 3; i++ {
// Start the three cashiers
func(i int) {
go cashier(i, ordersChannel, &wg)
}(i)
}
// Start adding orders to be processed.
for i := 0; i < 30; i++ {
ordersChannel <- i
}
wg.Wait()
}
通过使用-race
标志运行前面的代码,我们可以看到代码在没有任何数据竞争的情况下运行:
$ go run -race wichan.go
Cashier 2 Processing order 2 Orders Processed 0
Cashier 2 Processing order 3 Orders Processed 1
Cashier 0 Processing order 0 Orders Processed 0
Cashier 1 Processing order 1 Orders Processed 0
...
Cashier 0 Processing order 27 Orders Processed 9
代码非常简单,易于并行化,并且在不引起任何数据竞争的情况下运行良好。
通道和数据通信
Go 是一种静态类型的语言,这意味着给定的通道只能发送或接收单一数据类型的数据。在 Go 的术语中,这被称为通道的元素类型。Go 通道将接受任何有效的 Go 数据类型,包括函数。以下是一个接受和调用函数的简单程序的示例:
// elems.go
package main
import "fmt"
func main() {
// Let's create three simple functions that take an int argument
fcn1 := func(i int) {
fmt.Println("fcn1", i)
}
fcn2 := func(i int) {
fmt.Println("fcn2", i*2)
}
fcn3 := func(i int) {
fmt.Println("fcn3", i*3)
}
ch := make(chan func(int)) // Channel that sends & receives functions that take an int argument
done := make(chan bool) // A Channel whose element type is a boolean value.
// Launch a goroutine to work with the channels ch & done.
go func() {
// We accept all incoming functions on Channel ch and call the functions with value 10\.
for fcn := range ch {
fcn(10)
}
// Once the loop terminates, we print Exiting and send true to done Channel.
fmt.Println("Exiting")
done <- true
}()
// Sending functions to channel ch
ch <- fcn1
ch <- fcn2
ch <- fcn3
// Close the channel once we are done sending it data.
close(ch)
// Wait on the launched goroutine to end.
<-done
}
前面的代码的输出如下:
fcn1 10
fcn2 20
fcn3 30
Exiting
在前面的代码示例中,我们说通道ch
的元素类型为func(int)
,通道done
的元素类型为bool
。代码中还有很多有趣的细节,但我们将在接下来的部分讨论它们。
消息和事件
到目前为止,我们一直在使用术语数据来指代从通道发送和接收的值。虽然到目前为止这可能很容易理解,但 Go 使用两个特定的术语来描述通过通道进行通信的数据类型。它们被称为消息和事件。在代码方面它们是相同的,但这些术语用于帮助我们理解被发送的数据的类型。简而言之:
-
消息通常是我们希望 goroutine 处理并在需要时对其进行操作的值。
-
事件用于表示某个事件已发生。接收到的实际值可能并不像接收值的行为那样重要。请注意,尽管我们使用术语事件,它们仍然是一种消息类型。
在前面的代码示例中,发送到ch
的值是消息,而发送到done
的值是事件。需要注意的重要一点是,事件通道的元素类型往往是struct{}{}
、bool
或int
。
现在我们了解了通道元素类型、消息和事件是什么,让我们来看看不同类型的通道。
通道的类型
Go 为我们提供了三种主要的通道类型变体。它们可以被广泛地分类为:
-
无缓冲
-
缓冲
-
单向(只发送和只接收类型的通道)
无缓冲通道
这是 Go 中可用的基本通道类型。使用起来非常简单——我们将数据发送到通道,然后在另一端接收数据。有趣的部分是,任何在无缓冲通道上操作的 goroutine 都将被阻塞,直到发送方和接收方的 goroutine 都可用。例如,考虑以下代码片段:
ch := make(chan int)
go func() {ch <- 100} // Send 100 into channel.
Channel: send100
go func() {val := <- ch} // Goroutine waiting on channel.
Channel: recv1
go func() {val := <- ch} // Another goroutine waiting on channel.
Channel: recv2
我们有一个元素类型为int
的通道ch
。我们启动了三个 goroutine;一个将消息100
发送到通道(send100
),另外两个 goroutine(recv1
和recv2
)在通道上等待。send100
被阻塞,直到recv1
或recv2
中的任一个开始监听通道以接收消息。如果我们假设recv2
接收了由send100
发送到通道的消息,那么recv1
将等待,直到在通道上发送另一条消息。如果前面的四行是通道上的唯一通信,那么recv1
将等待直到程序结束,然后将被 Go 运行时突然终止。
缓冲通道
考虑这样一种情况,我们能够向通道发送的消息比接收消息的 goroutine 处理的消息多。如果我们使用无缓冲通道,程序将显著减慢,因为我们必须等待每条消息被处理后才能放入另一条消息。如果通道能够存储这些额外的消息或“缓冲”消息,那将是理想的。这正是缓冲通道所做的。它维护一个消息队列,goroutine 将以自己的速度消耗它。然而,即使缓冲通道也有限制容量;我们需要在通道创建时定义队列的容量。
那么,我们如何使用带缓冲的通道呢?从语法上讲,它与使用无缓冲通道是相同的。带缓冲通道的行为可以解释如下:
-
如果带缓冲通道为空:在通道上接收消息将被阻塞,直到通过通道发送消息
-
如果带缓冲通道已满:在通道上发送消息将被阻塞,直到至少从通道接收到一条消息,从而为新消息腾出空间放在通道的缓冲区或队列中
-
如果带缓冲通道部分填充,即既不满也不空:在通道上发送或接收消息都不会被阻塞,通信是瞬时的
通过带缓冲通道进行通信
单向缓冲
消息可以从通道发送和接收。然而,当 goroutine 使用通道进行通信时,它们通常只用于单一目的:要么从通道发送,要么接收。Go 允许我们指定 goroutine 使用的通道是用于发送还是接收消息。它通过单向通道的帮助来实现这一点。一旦通道被标识为单向,我们就不能对其执行其他操作。这意味着单向发送通道不能用于接收消息,单向接收通道不能用于发送消息。任何尝试这样做的行为都将被 Go 编译器识别为编译时错误。
以下是正确使用单向通道的示例:
// unichans.go
package main
import (
"fmt"
"sync"
)
func recv(ch <-chan int, wg *sync.WaitGroup) {
fmt.Println("Receiving", <-ch)
wg.Done()
}
func send(ch chan<- int, wg *sync.WaitGroup) {
fmt.Println("Sending...")
ch <- 100
fmt.Println("Sent")
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
ch := make(chan int)
go recv(ch, &wg)
go send(ch, &wg)
wg.Wait()
}
预期输出将如下所示:
Sending...
Receiving 100 # (or) Sent
Sent # (or) Receiving 100
现在,让我们尝试在接收通道上发送消息并看看会发生什么。我们只会在前面的示例中看到更改的函数:
// unichans2.go
// ...
// Changed function
func recv(ch <-chan int, wg *sync.WaitGroup) {
fmt.Println("Receiving", <-ch)
fmt.Println("Trying to send") // signalling that we are going to send over channel.
ch <- 13 // Sending over channel
wg.Done()
}
现在,如果我们尝试运行或构建更新后的程序,我们将会得到以下错误:
$ go run unichans.go
# command-line-arguments
unichans.go:11: invalid operation: ch <- 13 (send to receive-only type <-chan int)
那么,如果我们使用带缓冲的通道,程序会如何行为?由于未填充的通道不会阻塞,send
协程将消息发送到通道,然后继续执行。recv
协程在开始执行时从通道中读取,然后打印它:
// buffchan.go
package main
import (
"fmt"
"sync"
)
func recv(ch <-chan int, wg *sync.WaitGroup) {
fmt.Println("Receiving", <-ch)
wg.Done()
}
func send(ch chan<- int, wg *sync.WaitGroup) {
fmt.Println("Sending...")
ch <- 100
fmt.Println("Sent")
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
// Using a buffered channel.
ch := make(chan int, 10)
go recv(ch, &wg)
go send(ch, &wg)
wg.Wait()
}
输出将如下所示:
Sending...
Sent
Receiving 100
关闭通道
在前面的部分中,我们已经看过三种类型的通道以及如何创建它们。在本节中,让我们看看如何关闭通道以及这可能会影响在这些通道上发送和接收消息。当我们不再想在通道上发送任何消息时,我们关闭通道。通道关闭后的行为对于每种类型的通道都是不同的。让我们深入了解一下:
-
无缓冲关闭通道:发送消息将导致恐慌,接收消息将立即产生通道元素类型的零值。
-
带缓冲关闭通道:发送消息将导致恐慌,但在通道的队列中首先产生所有值。一旦队列耗尽,通道将开始产生通道元素类型的零值。
以下是一个阐述前两点的程序:
// closed.go
package main
import "fmt"
type msg struct {
ID int
value string
}
func handleIntChan(intChan <-chan int, done chan<- int) {
// Even though there are only 4 elements being sent via channel, we retrieve 6 values.
for i := 0; i < 6; i++ {
fmt.Println(<-intChan)
}
done <- 0
}
func handleMsgChan(msgChan <-chan msg, done chan<- int) {
// We retrieve 6 values of element type struct 'msg'.
// Given that there are only 4 values in the buffered channel,
// the rest should be zero value of struct 'msg'.
for i := 0; i < 6; i++ {
fmt.Println(fmt.Sprintf("%#v", <-msgChan))
}
done <- 0
}
func main() {
intChan := make(chan int)
done := make(chan int)
go func() {
intChan <- 9
intChan <- 2
intChan <- 3
intChan <- 7
close(intChan)
}()
go handleIntChan(intChan, done)
msgChan := make(chan msg, 5)
go func() {
for i := 1; i < 5; i++ {
msgChan <- msg{
ID: i,
value: fmt.Sprintf("VALUE-%v", i),
}
}
close(msgChan)
}()
go handleMsgChan(msgChan, done)
// We wait on the two channel handler goroutines to complete.
<-done
<-done
// Since intChan is closed, this will cause a panic to occur.
intChan <- 100
}
程序的一个可能输出如下:
9
2
3
7
0
0
main.msg{ID:1, value:"VALUE-1"}
main.msg{ID:2, value:"VALUE-2"}
main.msg{ID:3, value:"VALUE-3"}
main.msg{ID:4, value:"VALUE-4"}
main.msg{ID:0, value:""}
main.msg{ID:0, value:""}
panic: send on closed channel
goroutine 1 [running]:
main.main()
closed.go:58 +0x194
Process finished with exit code 2
最后,以下是一些有关关闭通道和已关闭通道的进一步有用的要点:
-
无法确定通道是否已关闭。我们能做的最好的事情是检查我们是否能够成功地从通道中检索到消息。我们知道检索通道的默认语法是
msg := <- ch
。然而,还有一种检索的变体:msg, ok := <-ch
。第二个参数告诉我们检索是否成功。如果通道关闭,ok
将为false
。这可以用来告诉通道何时已关闭。 -
msg, ok := <-ch
是在迭代通道时的常见模式。因此,Go 允许我们对通道进行range
。当通道关闭时,range
循环结束。 -
关闭已关闭的通道、空通道或只接收通道将导致恐慌。只有双向通道或只发送通道可以关闭。
-
关闭通道并不是强制性的,对于垃圾收集器(GC)也是无关紧要的。如果 GC 确定通道不可达,无论通道是打开的还是关闭的,通道都将被垃圾回收。
多路复用通道
多路复用描述了我们使用单一资源来对多个信号或操作进行操作的方法。这种方法在电信和计算机网络中被广泛使用。我们可能会发现自己处于这样一种情况:我们有多种类型的任务需要执行。但是,它们只能在互斥状态下执行,或者它们需要在共享资源上工作。为此,我们使用 Go 中称为通道多路复用的模式。在深入讨论如何实际多路复用通道之前,让我们尝试自己实现它。
假设我们有一组通道,并且我们希望在数据发送到通道时立即对其进行操作。以下是我们希望这样做的一种天真的方法:
// naiveMultiplexing.go
package main
import "fmt"
func main() {
channels := 5{
make(chan int),
make(chan int),
make(chan int),
make(chan int),
make(chan int),
}
go func() {
// Starting to wait on channels
for _, chX := range channels {
fmt.Println("Receiving from", <- chX)
}
}()
for i := 1; i < 6; i++ {
fmt.Println("Sending on channel:", i)
channels[i] <- 1
}
}
前面程序的输出如下:
Sending on channel: 1
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/home/entux/Documents/Code/GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:23 +0x2b1
goroutine 5 [chan receive]:
main.main.func1(0xc4200160c0, 0xc420016120, 0xc420016180, 0xc4200161e0, 0xc420016240)
GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:17 +0xba
created by main.main
GO-WORKSPACE/src/distributed-go/ch3/naiveSwitch.go:19 +0x18b
在 goroutine 中的循环中,第一个通道从未被等待,这导致了 goroutine 中的死锁。多路复用帮助我们在多个通道上等待,而不会在任何通道上阻塞,同时在通道上有消息时对其进行操作。
在多路复用通道时,有一些重要的要点需要记住:
- 语法:
select {
case <- ch1:
// Statements to execute if ch1 receives a message
case val := <- ch2:
// Save message received from ch2 into a variable and
execute statements for ch2
}
-
在执行
select
时,可能会有多个case
准备好接收消息。在这种情况下,select
不会执行所有case
,而是随机选择一个执行,然后退出select
语句。 -
然而,如果我们希望在
select
语句的case
中对发送到所有通道的消息做出反应,前面的观点可能会受到限制。然后我们可以将select
语句放在for
循环中,它将确保处理所有消息。 -
尽管
for
循环将处理发送到所有通道的消息,但循环仍会被阻塞,直到有消息可用。可能存在我们不希望阻塞循环迭代,而是执行一些“默认”操作的情况。这可以通过select
语句中的default
case 来实现。 -
基于前面两点的更新语法是:
for {
select {
case <- ch1:
// Statements to execute if ch1 receives a message
case val := <- ch2:
// Save message received from ch2 into a variable and
execute statements for ch2
default:
// Statements to execute if none of the channels has yet
received a message.
}
}
- 对于缓冲通道,接收消息的顺序不是保证的。
以下是在不被任何通道阻塞的情况下对所有所需通道进行多路复用的正确方法,并继续处理发送的所有消息:
// multiplexing.go
package main
import (
"fmt"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan string)
ch3 := make(chan int, 3)
done := make(chan bool)
completed := make(chan bool)
ch3 <- 1
ch3 <- 2
ch3 <- 3
go func() {
for {
select {
case <-ch1:
fmt.Println("Received data from ch1")
case val := <-ch2:
fmt.Println(val)
case c := <-ch3:
fmt.Println(c)
case <-done:
fmt.Println("exiting...")
completed <- true
return
}
}
}()
ch1 <- 100
ch2 <- "ch2 msg"
// Uncomment us to avoid leaking the 'select' goroutine!
//close(done)
//<-completed
}
以下是前面程序的输出:
1
Received data from ch1
2
3
不幸的是,该程序存在一个缺陷:它泄漏了处理select
的 goroutine。这也在main
函数末尾附近的注释中指出。当我们有一个正在运行但无法直接访问的 goroutine 时,通常会发生这种情况。即使 goroutine 的引用未被存储,GC 也不会对其进行垃圾回收。因此,我们需要一种机制来停止并从这样的 goroutine 返回。通常,这可以通过创建一个专门用于从 goroutine 返回的通道来实现。
在前面的代码中,我们通过done
通道发送信号。如果我们取消注释这些行然后运行程序,输出将如下:
1
2
3
Received data from ch1
ch2 msg
exiting...
总结
在本章中,我们探讨了控制并行性的原因,并对涉及共享状态的任务的复杂性有了更深入的了解。我们以一个超负荷的收银员的例子作为一个需要解决的编程问题,并通过通道进行实验,并进一步探讨了不同类型的通道以及使用它们涉及的微妙之处。例如,我们看到关闭的缓冲通道和非缓冲通道都会在我们尝试向它们发送消息时引发恐慌,并且从它们接收消息会根据通道是缓冲的以及通道是空的还是满的而导致不同的结果。我们还看到了如何在不阻塞任何通道的情况下等待多个通道上的消息的方法。
在后面的章节中,从第五章 介绍 Goophr 到第八章 部署 Goophr,我们将开发一个分布式网络应用。这需要我们具备基本的知识,如何使用 HTTP 协议与网络服务器进行交互,而不是使用网络浏览器。这些知识不仅在与我们的应用程序交互时会派上用场,而且在作为开发人员与标准网络交互时也会派上用场。这将是下一章第四章 RESTful 网络 的主题,我们将看看我们将使用的工具和协议来与我们的网络应用程序进行交互。
第四章:RESTful Web
在之前的章节中,我们看了 Go 语言中最重要的两个组件——goroutines 和 channels。在接下来的章节中,我们将使用 Go 构建一个分布式应用程序,了解如何为互联网或者在我们的情况下是 Web 编写应用程序非常重要。在本章中,我们将介绍使用 REST 网络协议构建 Web 应用程序的一种特定方式。我们还将学习如何与基于 REST 的 Web 应用程序进行交互。我们将按以下方式进行介绍:
-
对 HTTP 和会话的简要介绍
-
构建 REST 服务器的基础知识
-
设计一个简单的 REST 服务器
-
与 REST 服务器交互的工具
HTTP 和会话
在本节中,我们将简要介绍 HTTP 协议及其随时间的演变。还讨论服务器如何使用 HTTP 会话跟踪用户状态。当我们尝试理解 REST 协议的工作原理时,这些知识将会派上用场。
HTTP 的简要历史
为了更好地理解 REST 协议的优势,让我们先来了解一下 REST 网络协议出现之前互联网的使用方式。1990 年代的互联网主要用于存储和共享使用HTTP(超文本传输协议)标记的文档。对于本章来说,HTTP 可以总结如下:
-
HTTP 是一个网络通信协议,以 HTTP 请求开始,以 HTTP 响应结束。
-
早期的 HTTP 响应由纯文本文档组成,但很快 HTML 格式开始流行,因为它允许更多样式化的文档。
-
Web 浏览器带来了互联网的新时代:仅仅显示不同字体权重的文本文档已经不够了。CSS 和 JavaScript 开始出现,使这些文档可以定制化和更加交互。所有这些进步导致了我们现在所说的web。
-
可以使用 URL 和 HTTP 方法与 Web 服务器进行交互。有九种 HTTP 方法,但是在本书的目的中,我们只对其中的五种感兴趣:
-
GET
:在发送简单的 HTTP 请求时使用 -
POST
:当我们想在发送 HTTP 请求时包含有价值的信息时使用 -
PUT
,PATCH
和DELETE
:从技术上讲,它们与POST
方法相同,尽管在功能上有所不同
我们将在下一节重新讨论这些 HTTP 方法,并对它们进行更详细的探讨。
HTTP 会话
HTTP 协议本身是无状态的;也就是说,它不知道谁在访问网页,谁可以向页面发送 POST 请求等等。在这个时期(1990 年代)的大多数 HTTP 服务器中,它们可以被视为文件服务器;也就是说,它们通过互联网提供静态文件。然而,现代的网络体验更加广泛。想象一下访问 Gmail 或 Facebook,网站知道我们是谁,我们看到的是为我们动态生成的定制内容。它们保持我们正在阅读的文章或正在撰写的邮件的“状态”。如果我们关闭浏览器一段时间后返回网站,它可以让我们回到我们离开的地方。鉴于 HTTP 协议和 HTTP 服务器是无状态的,这些网站如何跟踪所有这些内容并将它们链接回正确的用户呢?答案是 HTTP 会话。
当我们从浏览器登录网站时,我们提供凭据来识别自己。服务器回复的响应也包括一个标记,这个标记将在不久的将来用来识别我们。这个标记可以是会话 ID、cookie、认证头等形式。Web 服务器维护这些标记和相应的用户 ID 的表。在我们登录网站后,浏览器总是在每个请求中的头部发送相应的标记给服务器。因此,Web 服务器能够跟踪每个用户并向任何给定的用户显示正确的内容。服务器是如何做到这一点的呢?它在服务器端维护所有的状态信息!
REST 协议
即使在 20 世纪 90 年代,计算机和互联网技术仍然迅速发展,而 Web 浏览器也在同时不断进化。这意味着 Web 服务器本身可以开始将一些工作转移到 Web 客户端;也就是说,Web 浏览器。慢慢地,这开始引导开发人员尝试不同的软件架构来开发 Web 应用程序。到 2010 年,REST 协议成为设计现代 Web 应用程序的最普遍方式。
REST(表述状态转移协议)首次由Roy Fielding在他的开创性论文中描述,题为基于网络的软件架构的体系结构风格和设计(www.ics.uci.edu/~fielding/pubs/dissertation/fielding_dissertation.pdf
)。这种设计 Web 应用程序的方式有许多优点。它是实用的,CPU 使用效率高,网络负载小,对于不断增加的互联网流量更具扩展性等。以下是使用 REST 软件架构的一些属性和好处。
服务器和客户端架构
在HTTP 会话部分,我们描述了一个大部分工作都由服务器完成,浏览器负责将用户输入传递给服务器,解析服务器返回的 HTML 文档,并在浏览器中呈现给用户。REST 允许我们将应用程序分成服务器和客户端。服务器(后端)负责执行业务逻辑,客户端(前端)负责将用户交互传递给服务器。这可能听起来并没有太多改变;然而,REST 架构的其余属性将更加明显。
标准数据格式
REST 围绕着使用标准数据格式在后端和前端之间通信状态和数据。这导致了后端和前端的解耦。这意味着我们不再局限于只使用 Web 浏览器与服务器通信,这反过来意味着我们的服务器现在能够与 Web 应用程序、命令行应用程序等进行交互。REST 允许我们使用任何类型的数据格式进行通信,尽管 JSON 格式已经成为 REST 协议通信的通用语言。
资源
由于我们的前端和后端是分开的,我们需要在两者之间通信状态和数据。在前端,我们需要显示我们提供的服务的所有可用实体。这些实体被称为资源。
考虑一个提供 REST 接口(REST API)的服务器,它在我们的个人图书馆中有一本书的列表。在这种情况下,书籍列表是资源,我们可以在特定的端点从后端请求关于每本书的信息。对于我们的例子,端点可以是<URL>/api/books
。/api
前缀通常在 REST 应用程序中使用,表示我们正在与后端 URL 交互。资源通常可以被认为是数据的集合,就像数据库表的行。
重用 HTTP 协议
我们在前一小节资源中定义了端点,但是我们如何与它们交互呢?REST 是建立在 HTTP 协议之上的,并且它使用 HTTP 方法或在 REST 的情况下使用动词来与服务器交互。让我们以前面的例子/api/books
为例,来了解它是如何使用的。
GET
REST 使用GET
动词来检索特定资源类型的项目。鉴于我们有很多项目,可以检索特定资源项目以及检索所有可用的资源项目。通常通过提供项目的 id 来检索特定资源项目。以下显示了用于检索的两种GET
形式:
-
/api/books
:返回图书馆中所有书籍的列表 -
/api/books/<id>
:返回图书馆中特定书籍的信息
POST
REST 使用POST
动词来创建特定资源类型的新项目。资源创建可能需要额外的信息,这些信息在POST
请求的正文中提供。作为正文的一部分提供的信息必须是 REST 服务器可以处理的数据格式。对/api/books
进行 POST 表示我们想要向图书馆的书籍列表中添加一本新书。
PUT 和 PATCH
这些采用/api/books/<id>
的形式。这些方法仅适用于已经存在的资源。它们将使用请求的正文更新给定资源的数据或新状态。PUT
期望提供资源的新状态,包括未更改的字段。PATCH
可以被认为是PUT
的更轻松版本,因为我们不需要提供完整的新状态,而只需要更新的字段。
DELETE
REST 使用DELETE
动词来删除特定的资源项目。它采用/api/resource/<id>
的形式。它根据<id>
删除特定的资源。REST 支持删除给定资源类型的所有项目,尽管这没有意义,因为现在用户可能会意外删除资源类型的所有项目。出于这个原因和许多其他原因,没有服务器实际实现这个功能。
可升级的组件
考虑到我们需要对 UI 进行更改,而这不会影响服务器逻辑的情况。如果网站没有根据客户端和服务器架构进行拆分,我们将不得不升级整个网站,这将是一项非常耗时的任务。由于前端和后端的拆分,我们可以只对所需的系统进行更改和升级。因此,我们可以确保最小的服务中断。
REST 服务器的基础知识
现在我们了解了 REST 应用程序应该如何行为,让我们来构建一个吧!我们将首先构建一个简单的 Web 服务器,然后通过描述设计决策和 API 定义来设计图书 REST 服务器,最后根据设计构建 REST 服务器。
一个简单的 Web 服务器
Go 为我们提供了一个内置的用于构建 Web 服务器的库,net/http
。对于我们想要在服务器上创建的每个端点,我们必须做两件事:
-
为端点创建一个处理程序函数,接受两个参数,一个用于写入响应,另一个用于处理传入的请求。
-
使用
net/http.HandleFunc
注册端点。
以下是一个简单的 Web 服务器,它接受所有传入的请求,将它们记录到控制台,然后返回Hello, World!
消息。
// helloServer.go
package main
import (
"fmt"
"log"
"net/http"
)
func helloWorldHandler(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("Received request [%s] for path: [%s]", r.Method, r.URL.Path)
log.Println(msg)
response := fmt.Sprintf("Hello, World! at Path: %s", r.URL.Path)
fmt.Fprintf(w, response)
}
func main() {
http.HandleFunc("/", helloWorldHandler) // Catch all Path
log.Println("Starting server at port :8080...")
http.ListenAndServe(":8080", nil)
}
在浏览器中请求 URL 时,以下是一些示例请求和响应:
http://localhost:8080/ --> Hello, World! at Path: /
http://localhost:8080/asdf htt--> Hello, World! at Path: /asdf
http://localhost:8080/some-path/123 --> Hello, World! at Path: /some-path/123
以下是服务器的输出:
2017/10/03 13:35:46 Starting server at port :8080...
2017/10/03 13:36:01 Received request [GET] for path: [/]
2017/10/03 13:37:22 Received request [GET] for path: [/asdf]
2017/10/03 13:37:40 Received request [GET] for path: [/some-path/123]
请注意,即使我们提供了多个路径,它们都默认为/
路径。
设计 REST API
我们已经了解了 HTTP 背后的历史和 REST 协议的核心概念。我们构建了一个简单的 Web 服务器,以展示构建 REST 服务器所需的一些服务器端代码。现在是时候利用我们迄今为止学到的一切来设计和构建一个 REST 服务器了。
我们将首先定义我们的 REST API 的数据格式,然后创建一个符合我们定义的 REST API 规范的 Web 服务器。
数据格式
在这一部分,我们将描述书籍资源的格式,然后我们将开始定义每个 REST API 交互以及这些交互的预期结果。
书籍资源
以下是书籍资源的基本定义。它是一个 JSON 数组,格式为"<key>": "<value-type>"
,尽管应用中使用的实际实体将包含真实值:
{
"id": "string",
"title": "string",
"link": "string"
}
GET /api/books
这个 REST API 调用将检索书籍资源类型的所有项目的列表。在我们的示例中,响应的 JSON 格式包括书籍资源类型的数组。然而,这种返回格式并不是返回项目的唯一方式。另一种但更流行的格式包括一个带有"数据"键的 JSON 对象,其中包含实际结果和服务器可能希望在响应中发送的任何其他键。
现在让我们看一下我们在示例中将使用的简单格式:
// Request
GET "<URL>/api/books/"
// Response
[
{
"id": "1",
"title": "book1",
"link": "http://link-to-book-1.com"
},
{
"id": "2",
"title": "book2",
"link": "http://link-to-book-2.com"
}
]
GET /api/books/
这种GET
调用将基于提供的<id>
检索单个书籍资源项目。一般来说,响应的 JSON 对象将是定义的资源类型,尽管服务器可能决定根据服务的逻辑添加或删除某些字段。对于我们的 API,我们将返回我们资源类型中定义的所有字段。
让我们看一个例子,当我们尝试检索 id 为"1"
的书籍资源时:
// Request
GET "<URL>/api/books/1"
// Response
{
"id": "1",
"title": "book1",
"link": "http://link-to-book-1.com"
}
POST /api/books
这个 REST API 调用将创建一个新的书籍资源类型的项目。然而,为了创建一个新的项目,我们需要提供所有必要的数据。可能有不需要任何额外信息的POST
请求。但在我们的情况下,我们需要发送诸如title
和link
之类的信息作为请求的负载。
在这个例子中,我们想要创建一个标题为"book5"
,链接为"http://link-to-book5.com"
的书籍项目。请注意,由于我们的服务器已经有两个书籍资源类型的项目,新项目将以"3"
的 id 创建;这是根据我们服务器的实现。其他 REST 服务器可能会有不同的行为。
// Request
POST "<URL>/api/books"
// payload
{
"title": "book5",
"link": "http://link-to-book-5.com"
}
// response
{
"id": "3",
"title": "book5",
"link": "http://link-to-book-5.com"
}
PUT /api/books/
我们将在我们的 REST API 中使用PUT
来更新特定的资源类型。我们的 API 中定义的PUT
对接受不完整数据的负载非常严格,也就是说,它将拒绝不完整的负载。
在这个例子中,我们将修改新创建的书籍"3"
,并将其链接更改为指向"http://link-to-book-15.com"
:
// Request
PUT "<URL>/api/books/3"
// payload
{
"title": "book5",
"link": "http://link-to-book-15.com"
}
// response
{
"id": "3",
"title": "book5",
"link": "http://link-to-book-15.com"
}
DELETE /api/books/
这是用于删除特定书籍资源的 REST API 调用。这种请求不需要主体,只需要书籍 id 作为 URL 的一部分,如下一个例子所示。
在这个例子中,我们将删除书籍2
。请注意,我们不会在响应中返回任何内容;其他 REST 服务器可能会返回已删除的项目:
// Request
DELETE "<URL>/api/books/2"
// Response
[]
不成功的请求
我们可能会发送构造不良的请求、对不可用实体的请求或不完整的负载。对于所有这些情况,我们将发送相关的 HTTP 错误代码。根据服务器的实现,可能会返回单个错误代码。一些服务器返回标准的错误代码"404",以增加安全性,不让恶意用户尝试查找他们不拥有的资源类型的项目。
设计决策
我们已经定义了我们的 REST API,接下来我们想要实现服务器。在编写任何代码之前,制定我们希望服务器实现的目标非常重要。以下是服务器的一些规格:
-
我们需要提取
<id>
用于PUT
、DELETE
和单个资源GET
请求。 -
我们希望记录每个传入的请求,类似于
helloWorldHandler
。 -
复制这么多的工作是繁琐的,也是不好的编码实践。我们可以利用闭包和函数文字来为我们创建新的函数,这些函数将合并前两点的任务。
-
为了保持示例简单,我们将使用
map[string]bookResource
来存储所有书籍资源的状态。所有操作将在此映射上进行。在现实世界的服务器中,我们通常会使用数据库来存储这些资源。 -
Go 服务器可以处理并发请求,这意味着我们应该确保书籍资源的映射免受竞争条件的影响。
让我们看看基于我们设计的代码可能是什么样子。
书籍 API 的 REST 服务器
我们将程序分为以下部分:
$ tree
.
├── books-handler
│ ├── actions.go
│ ├── common.go
│ └── handler.go
└── main.go
1 directory, 5 files
现在让我们看看每个文件的源代码。
主要.go
main.go
源文件主要负责组装和运行 Web 服务器的代码。实际响应 HTTP 请求的逻辑分布在其他文件中:
// restServer/main.go
package main
import (
"fmt"
"log"
"net/http"
booksHandler "github.com/last-ent/distributed-go/chapter4/books-handler"
)
func main() {
// Get state (map) for books available on REST server.
books := booksHandler.GetBooks()
log.Println(fmt.Sprintf("%+v", books))
actionCh := make(chan booksHandler.Action)
// Start goroutine responsible for handling interaction with the books map
go booksHandler.StartBooksManager(books, actionCh)
http.HandleFunc("/api/books/", booksHandler.MakeHandler(booksHandler.BookHandler, "/api/books/", actionCh))
log.Println("Starting server at port 8080...")
http.ListenAndServe(":8080", nil)
}
books-handler/common.go
此源文件中的代码是通用逻辑,可能会在多个请求之间共享:
通常,最好的做法是识别与特定处理程序无关的逻辑,然后将其移入common.go
或类似的源文件,这样可以更容易找到它们并减少重复的代码。
// restServer/books-handler/common.go
package booksHandler
import (
"encoding/json"
"fmt"
"log"
"net/http"
)
// bookResource is used to hold all data needed to represent a Book resource in the books map.
type bookResource struct {
Id string 'json:"id"'
Title string 'json:"title"'
Link string 'json:"link"'
}
// requestPayload is used to parse request's Payload. We ignore Id field for simplicity.
type requestPayload struct {
Title string 'json:"title"'
Link string 'json:"link"'
}
// response struct consists of all the information required to create the correct HTTP response.
type response struct {
StatusCode int
Books []bookResource
}
// Action struct is used to send data to the goroutine managing the state (map) of books.
// RetChan allows us to send data back to the Handler function so that we can complete the HTTP request.
type Action struct {
Id string
Type string
Payload requestPayload
RetChan chan<- response
}
// GetBooks is used to get the initial state of books represented by a map.
func GetBooks() map[string]bookResource {
books := map[string]bookResource{}
for i := 1; i < 6; i++ {
id := fmt.Sprintf("%d", i)
books[id] = bookResource{
Id: id,
Title: fmt.Sprintf("Book-%s", id),
Link: fmt.Sprintf("http://link-to-book%s.com", id),
}
}
return books
}
// MakeHandler shows a common pattern used reduce duplicated code.
func MakeHandler(fn func(http.ResponseWriter, *http.Request, string, string, chan<- Action),
endpoint string, actionCh chan<- Action) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
method := r.Method
msg := fmt.Sprintf("Received request [%s] for path: [%s]", method, path)
log.Println(msg)
id := path[len(endpoint):]
log.Println("ID is ", id)
fn(w, r, id, method, actionCh)
}
}
// writeResponse uses the pattern similar to MakeHandler.
func writeResponse(w http.ResponseWriter, resp response) {
var err error
var serializedPayload []byte
if len(resp.Books) == 1 {
serializedPayload, err = json.Marshal(resp.Books[0])
} else {
serializedPayload, err = json.Marshal(resp.Books)
}
if err != nil {
writeError(w, http.StatusInternalServerError)
fmt.Println("Error while serializing payload: ", err)
} else {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(serializedPayload)
}
}
// writeError allows us to return error message in JSON format.
func writeError(w http.ResponseWriter, statusCode int) {
jsonMsg := struct {
Msg string 'json:"msg"'
Code int 'json:"code"'
}{
Code: statusCode,
Msg: http.StatusText(statusCode),
}
if serializedPayload, err := json.Marshal(jsonMsg); err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
fmt.Println("Error while serializing payload: ", err)
} else {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(serializedPayload)
}
}
books-handler/actions.go
此源文件包含处理每个 HTTP 请求方法调用的函数:
// restServer/books-handler/actions.go
package booksHandler
import (
"net/http"
)
// actOn{GET, POST, DELETE, PUT} functions return Response based on specific Request type.
func actOnGET(books map[string]bookResource, act Action) {
// These initialized values cover the case:
// Request asked for an id that doesn't exist.
status := http.StatusNotFound
bookResult := []bookResource{}
if act.Id == "" {
// Request asked for all books.
status = http.StatusOK
for _, book := range books {
bookResult = append(bookResult, book)
}
} else if book, exists := books[act.Id]; exists {
// Request asked for a specific book and the id exists.
status = http.StatusOK
bookResult = []bookResource{book}
}
act.RetChan <- response{
StatusCode: status,
Books: bookResult,
}
}
func actOnDELETE(books map[string]bookResource, act Action) {
book, exists := books[act.Id]
delete(books, act.Id)
if !exists {
book = bookResource{}
}
// Return the deleted book if it exists else return an empty book.
act.RetChan <- response{
StatusCode: http.StatusOK,
Books: []bookResource{book},
}
}
func actOnPUT(books map[string]bookResource, act Action) {
// These initialized values cover the case:
// Request asked for an id that doesn't exist.
status := http.StatusNotFound
bookResult := []bookResource{}
// If the id exists, update its values with the values from the payload.
if book, exists := books[act.Id]; exists {
book.Link = act.Payload.Link
book.Title = act.Payload.Title
books[act.Id] = book
status = http.StatusOK
bookResult = []bookResource{books[act.Id]}
}
// Return status and updated resource.
act.RetChan <- response{
StatusCode: status,
Books: bookResult,
}
}
func actOnPOST(books map[string]bookResource, act Action, newID string) {
// Add the new book to 'books'.
books[newID] = bookResource{
Id: newID,
Link: act.Payload.Link,
Title: act.Payload.Title,
}
act.RetChan <- response{
StatusCode: http.StatusCreated,
Books: []bookResource{books[newID]},
}
}
books-handler/handler.go
handler.go
源文件包含处理和处理书籍请求所需的所有逻辑。请注意,除了包含处理 HTTP 请求的逻辑外,它还处理了服务器上书籍状态的维护:
// restServer/books-handler/handler.go
package booksHandler
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
)
// StartBooksManager starts a goroutine that changes the state of books (map).
// Primary reason to use a goroutine instead of directly manipulating the books map is to ensure
// that we do not have multiple requests changing books' state simultaneously.
func StartBooksManager(books map[string]bookResource, actionCh <-chan Action) {
newID := len(books)
for {
select {
case act := <-actionCh:
switch act.Type {
case "GET":
actOnGET(books, act)
case "POST":
newID++
newBookID := fmt.Sprintf("%d", newID)
actOnPOST(books, act, newBookID)
case "PUT":
actOnPUT(books, act)
case "DELETE":
actOnDELETE(books, act)
}
}
}
}
/* BookHandler is responsible for ensuring that we process only the valid HTTP Requests.
* GET -> id: Any
* POST -> id: No
* -> payload: Required
* PUT -> id: Any
* -> payload: Required
* DELETE -> id: Any
*/
func BookHandler(w http.ResponseWriter, r *http.Request, id string, method string, actionCh chan<- Action) {
// Ensure that id is set only for valid requests
isGet := method == "GET"
idIsSetForPost := method == "POST" && id != ""
isPutOrPost := method == "PUT" || method == "POST"
idIsSetForDelPut := (method == "DELETE" || method == "PUT") && id != ""
if !isGet && !(idIsSetForPost || idIsSetForDelPut || isPutOrPost) {
writeError(w, http.StatusMethodNotAllowed)
return
}
respCh := make(chan response)
act := Action{
Id: id,
Type: method,
RetChan: respCh,
}
// PUT & POST require a properly formed JSON payload
if isPutOrPost {
var reqPayload requestPayload
body, _ := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err := json.Unmarshal(body, &reqPayload); err != nil {
writeError(w, http.StatusBadRequest)
return
}
act.Payload = reqPayload
}
// We have all the data required to process the Request.
// Time to update the state of books.
actionCh <- act
// Wait for respCh to return data after updating the state of books.
// For all successful Actions, the HTTP status code will either be 200 or 201\.
// Any other status code means that there was an issue with the request.
var resp response
if resp = <-respCh; resp.StatusCode > http.StatusCreated {
writeError(w, resp.StatusCode)
return
}
// We should only log the delete resource and not send it back to user
if method == "DELETE" {
log.Println(fmt.Sprintf("Resource ID %s deleted: %+v", id, resp.Books))
resp = response{
StatusCode: http.StatusOK,
Books: []bookResource{},
}
}
writeResponse(w, resp)
}
尽管我们已经从头开始创建了一个 REST 服务器,但这并不是一个完整的 REST 服务器。为了使编写 REST 服务器成为可能,许多重要细节已被省略。但实际上,我们应该使用现有的库之一来帮助我们构建一个合适的 REST 服务器。
到目前为止一切顺利,但根据我们迄今为止看到的代码,我们如何与 REST 服务器以及基于该代码的服务器进行交互?让我们在下一节中看看这个问题。
如何进行 REST 调用
到目前为止,我们已经使用 Web 浏览器进行了 HTTP 请求。这适用于普通的 HTTP 服务器或对 REST 服务器进行简单的GET
请求。但是,浏览器将无法代表我们进行其他类型的 REST 调用。
大多数 Web 应用程序使用 JavaScript、Ajax 和其他前端技术与 REST 服务器进行交互。但是,我们不必创建一个完整的 Web 前端来与 REST 服务器进行交互;我们可以利用一些工具,还可以编写程序来代替我们进行 REST 调用。
cURL
cURL 是一个免费的命令行工具,用于在计算机网络上进行交互。它可以用于多种协议的通信,包括 HTTP、HTTPS、FTP、SCP 等。让我们对在前一节中创建的服务器进行 REST 调用。为了提高可读性,我们可以使用jq
库。
GET
现在让我们看看使用 cURL 命令进行 HTTP 请求。根据服务器的状态,进行GET
请求可能会有不同的输出:
$ # List all books on server
$ # Note that we use '-L' flag while using cURL.
$ # This takes care of any http redirections that might be required.
$ curl -L localhost:8080/api/books | jq # GET CALL
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 46 100 46 0 0 9721 0 --:--:-- --:--:-- --:--:-- 11500
100 311 100 311 0 0 59589 0 --:--:-- --:--:-- --:--:-- 59589
[
{
"id": "3",
"title": "Book-3",
"link": "http://link-to-book3.com"
},
{
"id": "4",
"title": "Book-4",
"link": "http://link-to-book4.com"
},
{
"id": "5",
"title": "Book-5",
"link": "http://link-to-book5.com"
},
{
"id": "1",
"title": "Book-1",
"link": "http://link-to-book1.com"
},
{
"id": "2",
"title": "Book-2",
"link": "http://link-to-book2.com"
}
]
$ curl localhost:8080/api/books/3 | jq # GET a single resource.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 61 100 61 0 0 13255 0 --:--:-- --:--:-- --:--:-- 15250
{
"id": "3",
"title": "Book-3",
"link": "http://link-to-book3.com"
}
DELETE
假设我们有一个 id 为"2"
的书籍,我们可以使用 cURL 进行删除,如下所示:
$ # We can make other method calls by providing -X flag with method name in caps.
$ curl -LX DELETE localhost:8080/api/books/2 | jq # DELETE a resource.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2 100 2 0 0 337 0 --:--:-- --:--:-- --:--:-- 400
[]
$ curl -L localhost:8080/api/books | jq # GET all books after resource deletion.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 46 100 46 0 0 21465 0 --:--:-- --:--:-- --:--:-- 46000
100 249 100 249 0 0 91008 0 --:--:-- --:--:-- --:--:-- 91008
[
{
"id": "5",
"title": "Book-5",
"link": "http://link-to-book5.com"
},
{
"id": "1",
"title": "Book-1",
"link": "http://link-to-book1.com"
},
{
"id": "3",
"title": "Book-3",
"link": "http://link-to-book3.com"
},
{
"id": "4",
"title": "Book-4",
"link": "http://link-to-book4.com"
}
]
PUT
让我们更新具有 id 为"4"
的现有书籍资源:
$ # We can use -d flag to provide payload in a Request
$ curl -H "Content-Type: application/json" -LX PUT -d '{"title": "New Book Title", "link": "New Link"}' localhost:8080/api/books/4 | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 100 100 53 100 47 13289 11785 --:--:-- --:--:-- --:--:-- 17666
{
"id": "4",
"title": "New Book Title",
"link": "New Link"
}
$ curl -L localhost:8080/api/books | jq # GET all books after updating a resource
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 46 100 46 0 0 9886 0 --:--:-- --:--:-- --:--:-- 11500
100 241 100 241 0 0 47024 0 --:--:-- --:--:-- --:--:-- 47024
[
{
"id": "1",
"title": "Book-1",
"link": "http://link-to-book1.com"
},
{
"id": "3",
"title": "Book-3",
"link": "http://link-to-book3.com"
},
{
"id": "4",
"title": "New Book Title",
"link": "New Link"
},
{
"id": "5",
"title": "Book-5",
"link": "http://link-to-book5.com"
}
]
POST
现在我们知道如何使用 cURL 向服务器发送有效负载,让我们创建一个新的书籍资源项:
$ curl -H "Content-Type: application/json" -LX POST -d '{"title":"Ultra New Book", "link": "Ultra New Link"}' localhost:8080/api/books/ | jq # POST ie., create a new resource.
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 111 100 59 100 52 99k 89655 --:--:-- --:--:-- --:--:-- 59000
{
"id": "6",
"title": "Ultra New Book",
"link": "Ultra New Link"
}
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 46 100 46 0 0 8234 0 --:--:-- --:--:-- --:--:-- 9200
100 301 100 301 0 0 46414 0 --:--:-- --:--:-- --:--:-- 46414
[
{
"id": "4",
"title": "New Book Title",
"link": "New Link"
},
{
"id": "5",
"title": "Book-5",
"link": "http://link-to-book5.com"
},
{
"id": "1",
"title": "Book-1",
"link": "http://link-to-book1.com"
},
{
"id": "6",
"title": "Ultra New Book",
"link": "Ultra New Link"
},
{
"id": "3",
"title": "Book-3",
"link": "http://link-to-book3.com"
}
]
以下是快速参考命令:
-
curl -L localhost:8080/api/books | jq # GET CALL
-
curl localhost:8080/api/books/3 | jq # 获取单个资源。
-
curl -LX DELETE localhost:8080/api/books/2 | jq # 删除一个资源。
-
curl -H "Content-Type: application/json" -LX PUT -d '{"title": "New Book Title", "link": "New Link"}' localhost:8080/api/books/4 | jq
-
curl -H "Content-Type: application/json" -LX POST -d '{"title":"Ultra New Book", "link": "Ultra New Link"}' localhost:8080/api/books/ | jq # POST 即创建一个新资源。
以下是服务器的控制台输出:
$ go run main.go
2017/10/09 21:07:50 map[5:{Id:5 Title:Book-5 Link:http://link-to-book5.com} 1:{Id:1 Title:Book-1 Link:http://link-to-book1.com} 2:{Id:2 Title:Book-2 Link:http://link-to-book2.com} 3:{Id:3 Title:Book-3 Link:http://link-to-book3.com} 4:{Id:4 Title:Book-4 Link:http://link-to-book4.com}]
2017/10/09 21:07:50 Starting server at port 8080...
2017/10/09 21:07:56 Received request [GET] for path: [/api/books/]
2017/10/09 21:07:56 ID is
2017/10/09 21:09:18 Received request [GET] for path: [/api/books/3]
2017/10/09 21:09:18 ID is 3
2017/10/09 21:11:38 Received request [DELETE] for path: [/api/books/2]
2017/10/09 21:11:38 ID is 2
2017/10/09 21:11:38 Resource ID 2 deleted: [{Id:2 Title:Book-2 Link:http://link-to-book2.com}]
2017/10/09 21:12:16 Received request [GET] for path: [/api/books/]
2017/10/09 21:12:16 ID is
2017/10/09 21:15:22 Received request [PUT] for path: [/api/books/4]
2017/10/09 21:15:22 ID is 4
2017/10/09 21:16:01 Received request [GET] for path: [/api/books/]
2017/10/09 21:16:01 ID is
2017/10/09 21:17:07 Received request [POST] for path: [/api/books/]
2017/10/09 21:17:07 ID is
2017/10/09 21:17:36 Received request [GET] for path: [/api/books/]
2017/10/09 21:17:36 ID is
需要牢记的一点是,即使我们使用重定向标志-L
,对于 POST 请求,请求体也不会被发送。我们需要确保将其发送到最终解析的端点。
这应该给我们一个如何使用 REST 客户端的基本概念。
Postman
现在让我们看一个可以用来进行 REST 调用的基于 GUI 的工具Postman(www.getpostman.com/
)。为了简洁起见,我们将看一个GET
和一个POST
调用。
以下屏幕截图说明了如何使用 Postman 进行GET
请求。请注意,Postman 允许我们以易于阅读的格式查看返回的 JSON:
GET /api/books
以下屏幕截图显示了如何进行POST
请求。请注意,我们可以很容易地提供一个 JSON 有效负载:
POST /api/books
希望前面的部分和这些屏幕截图足以让我们了解如何使用 Postman。
net/http
让我们看看如何从 Go 程序中以编程方式发送GET
和POST
:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
type bookResource struct {
Id string 'json:"id"'
Title string 'json:"title"'
Link string 'json:"link"'
}
func main() {
// GET
fmt.Println("Making GET call.")
// It is possible that we might have error while making an HTTP request
// due to too many redirects or HTTP protocol error. We should check for this eventuality.
resp, err := http.Get("http://localhost:8080/api/books")
if err != nil {
fmt.Println("Error while making GET call.", err)
return
}
fmt.Printf("%+v\n\n", resp)
// The response body is a data stream from the server we got the response back from.
// This data stream is not in a useable format yet.
// We need to read it from the server and convert it into a byte stream.
body, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
var books []bookResource
json.Unmarshal(body, &books)
fmt.Println(books)
fmt.Println("\n")
// POST
payload, _ := json.Marshal(bookResource{
Title: "New Book",
Link: "http://new-book.com",
})
fmt.Println("Making POST call.")
resp, err = http.Post(
"http://localhost:8080/api/books/",
"application/json",
bytes.NewBuffer(payload),
)
if err != nil {
fmt.Println(err)
}
fmt.Printf("%+v\n\n", resp)
body, _ = ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
var book bookResource
json.Unmarshal(body, &book)
fmt.Println(book)
fmt.Println("\n")
}
以下是运行程序时的控制台输出:
$ go run main.go
Making GET call.
&{Status:200 OK StatusCode:200 Proto:HTTP/1.1 ProtoMajor:1 ProtoMinor:1 Header:map[Content-Type:[application/json] Date:[Mon, 09 Oct 2017 20:07:43 GMT] Content-Length:[488]] Body:0xc4200f0040 ContentLength:488 TransferEncoding:[] Close:false Uncompressed:false Trailer:map[] Request:0xc42000a900 TLS:<nil>}
[{2 Book-2 http://link-to-book2.com} {3 Book-3 http://link-to-book3.com} {4 Book-4 http://link-to-book4.com} {5 Book-5 http://link-to-book5.com} {6 New Book http://new-book.com} {7 New Book http://new-book.com} {8 New Book http://new-book.com} {1 Book-1 http://link-to-book1.com}]
Making POST call.
&{Status:201 Created StatusCode:201 Proto:HTTP/1.1 ProtoMajor:1 ProtoMinor:1 Header:map[Content-Type:[application/json] Date:[Mon, 09 Oct 2017 20:07:43 GMT] Content-Length:[58]] Body:0xc4200f0140 ContentLength:58 TransferEncoding:[] Close:false Uncompressed:false Trailer:map[] Request:0xc4200fc100 TLS:<nil>}
{9 New Book http://new-book.com}
有关net/http
库的更多详细信息可以在golang.org/pkg/net/http/
找到。
总结
在本章中,我们讨论了 HTTP 和会话的简要历史。接下来,我们看了 REST 协议旨在解决的问题以及它们是如何引起关注的。然后,我们深入了解了 REST 协议是什么,如何设计基于它的应用程序,如何基于我们的设计构建 REST 服务器,最后我们看了使用 cURL、Postman 和 Go 程序与 REST 服务器交互的不同方式。您可以自由选择与 REST 服务器交互的方式。但是,在本书的其余部分,我们将看到使用 cURL 与 REST 服务器交互。
现在我们已经讨论了开发分布式和面向 Web 的应用程序所必需的所有重要主题。在下一章,第五章,介绍 Goophr,我们可以开始讨论分布式文档索引器在概念层面上是什么,以及如何设计它,规划数据通信等等。
第五章:介绍 Goophr
既然我们已经对 goroutines、通道、REST 和一些用于开发 Go 应用程序的工具有了扎实的了解,让我们利用这些知识来构建一个分布式 Web 应用程序。这个应用程序的目的将是索引和搜索文档。在本章中,我们将阐述这样一个应用程序的设计结构,并且我们还将看一下我们将在项目中使用的一些剩余主题和工具。
本章可以大致分为两个部分:
-
设计概述
-
项目结构
Goophr 是什么?
我们将构建一个应用程序来索引和搜索文档。这是我们每次使用 Google、Bing 或 DuckDuckGo 等搜索门户之一访问互联网时使用的功能。这也是一些网站借助搜索引擎提供的功能。
在接下来的几章中,我们将构建一个搜索引擎应用程序,从现有技术(如 Google、Solr 搜索引擎和 goroutines)中汲取灵感。我们的应用程序名称是对这三种技术的一种玩耍。
想象一下在任何搜索门户上搜索短语;在提交查询后,我们会得到一个包含来自我们搜索短语的术语的文本摘录的链接列表。很多时候,前几个链接往往是我们正在寻找的相关网页或文档。如何可能获得最相关文档的列表?Google 或其他搜索引擎实现这一点的方式非常复杂;他们有一个大型的计算机科学家团队不断调整搜索引擎。
我们不打算构建任何复杂的东西。通过拥有一个谦逊而实用的目标,我们可以创建一个最小但可用的搜索引擎。不过,首先让我们定义应用程序的目的和标准。
设计概述
既然我们已经简要描述了我们想要构建的应用程序以及构建它的原因,让我们来看看我们想要作为搜索引擎实现的功能列表:
-
它应该接受在 POST 请求中提供的文档链接并下载它们
-
它应该处理和索引下载的文档
-
它应该处理搜索查询,并以包含搜索词的摘录的文档列表作出响应
-
返回的文档列表应按文档中搜索词的出现次数较多的顺序排列
虽然我们列出了四个功能,但我们可以将应用程序分为两个主要组件:
-
Goophr 礼宾员:这是负责索引并返回搜索查询的文档列表的组件
-
Goophr 图书管理员:这是负责处理用户交互并与第一个组件交互的组件
这两个组件将作为两个 REST 服务器运行,并且所有交互都将遵循 REST 协议。因此,让我们为我们的组件定义 API 定义!在第四章中,RESTful Web,您注意到我们用来定义通过 REST 协议进行通信的各种 API 端点和数据定义的方法非常冗长和繁琐。如果我们有一种正式的方法来编写 API 定义,那不是更好吗?好消息是,随着 REST 协议的普及,有许多解决方案,其中一个解决方案是最广泛使用的行业标准——OpenAPI 格式。
OpenAPI 规范
OpenAPI 让我们以标准化的方式定义 RESTful API,并且可以在不受任何特定编程语言或框架的约束下进行定义。这为我们提供了一个强大的抽象,可以定义一个 API,该 API 的初始实现可以是 Java 或 Python 中的 RESTful 服务器;同时,我们也可以将代码库移植到 Go 中,服务的行为几乎不需要或只需要进行很少的更改。
让我们列出 OpenAPI 规范的一般结构,并使用它来重新定义第四章中描述的Books API
,RESTful Web。
如果我们看一下Books API
标题,我们可以定义以下元素来描述 API:
-
我们服务器的 URL
-
关于 API 意图的基本信息
-
我们 API 中可用的路径
-
API 中每个路径可用的方法
-
请求和响应的可能描述和示例有效载荷
-
请求和响应有效载荷的模式
考虑到这些要点,让我们来看看Books API
的 OpenAPI 规范:
# openapi/books.yaml
openapi: 3.0.0
servers:
- url: /api
info:
title: Books API
version: '1.0'
description: ;
API responsible for adding, reading and updating list of books.
paths:
/books:
get:
description: |
Get list of all books
responses:
'200':
description: |
Request successfully returned list of all books
content:
application/json:
schema:
$ref: '#/components/schemas/response'
/books/{id}:
get:
description: |
Get a particular books with ID 'id'
responses:
'200':
description: |
Request was successfully completed.
content:
application/json:
schema:
$ref: '#/components/schemas/document'
parameters:
- in: query
name: id
schema:
type: integer
description: Book ID of the book to get.
post:
description: |
Get a particular books with ID 'id'
responses:
'200':
description: |
Request was successfully completed.
content:
application/json:
schema:
$ref: '#/components/schemas/payload'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/document'
put:
description: |
Update the data of a Book with ID 'id' with the payload sent in the request body.
responses:
'200':
description: |
Request was successfully completed.
content:
application/json:
schema:
$ref: '#/components/schemas/payload'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/document'
delete:
description: |
Get a particular books with ID 'id'
responses:
'200':
description: |
Request was successfully completed.
parameters:
- in: query
name: id
schema:
type: integer
description: Book ID of the book to get.
components:
schemas:
response:
type: array
items:
$ref: '#/components/schemas/document'
document:
type: object
required:
- title
- link
properties:
id:
type: integer
description: Book ID
title:
type: string
description: Title of the book
link:
type: string
description: Link to the book
payload:
type: object
required:
- title
- link
properties:
title:
type: string
description: Title of the book
link:
type: string
description: Link to the book
Goophr Concierge API 定义
Goophr Concierge 是面向用户的组件,它有两个责任——索引新文档和返回查询结果。非正式地,我们可以定义 API 如下:
-
/api/feeder
:这是用户上传文档的 API 端点 -
如果有效载荷完整且正确,POST 请求将添加新文档
-
/api/query
:用户搜索针对此 API 端点查询的短语或术语 -
POST 请求包含带有搜索术语的有效载荷,并将返回文档列表
这个简单的 API 描述是为了我们的理解。现在让我们看看如何使用 OpenAPI 规范来制定它:
# openapi/concierge.yaml
openapi: 3.0.0
servers:
- url: /api
info:
title: Goophr Concierge API
version: '1.0'
description: >
API responsible for responding to user input and communicating with Goophr
Librarian.
paths:
/feeder:
post:
description: |
Register new document to be indexed.
responses:
'200':
description: |
Request was successfully completed.
content:
application/json:
schema:
$ref: '#/components/schemas/response'
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/response'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/document'
required: true
/query:
post:
description: |
Search query
responses:
'200':
description: |
Response consists of links to document
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/document'
requestBody:
content:
application/json:
schema:
type: array
items:
type: string
required: true
components:
schemas:
response:
type: object
properties:
code:
type: integer
description: Status code to send in response
msg:
type: string
description: Message to send in response
document:
type: object
required:
- title
- link
properties:
title:
type: string
description: Title of the document
link:
type: string
description: Link to the document
借助 API 描述,前面的 OpenAPI 定义应该是不言自明的。有关 OpenAPI 规范的详细信息可以在swagger.io/specification/
找到。我们可以使用 Swagger 提供的工具(editor.swagger.io/
)来更好地可视化表示我们的 API 定义。
以下是在 Swagger Editor 中查看的 Goophr Concierge OpenAPI 的屏幕截图:
在 Swagger Editor 上查看 OpenAPI
Goophr 图书管理员 API 定义
Goophr Librarian 实际上是一组文档索引的维护者,它的责任是向索引添加术语,并根据索引中可用的术语返回搜索术语的查询结果。
非正式地,我们可以定义 API 如下:
-
/api/index
:Goophr Concierge 调用此 API 端点以将术语添加到实际索引 -
POST 请求将术语添加到索引
-
/api/query
:Goophr Concierge 调用此端点来查询用户提交的搜索术语 -
POST 请求返回搜索术语的结果
以下是 Goophr 图书管理员的 OpenAPI 定义。
# openapi/librarian.yaml
openapi: 3.0.0
servers:
- url: /api
info:
title: Goophr Librarian API
version: '1.0'
description: |
API responsible for indexing & communicating with Goophr Concierge.
paths:
/index:
post:
description: |
Add terms to index.
responses:
'200':
description: |
Terms were successfully added to the index.
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/terms'
description: |
List of terms to be added to the index.
required: true
/query:
post:
description: |
Search for all terms in the payload.
responses:
'200':
description: |
Returns a list of all the terms along with their frequency,
documents the terms appear in and link to the said documents.
content:
application/json:
schema:
$ref: '#/components/schemas/results'
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
parameters: []
components:
schemas:
error:
type: object
properties:
msg:
type: string
term:
type: object
required:
- title
- token
- doc_id
- line_index
- token_index
properties:
title:
description: |
Title of the document to which the term belongs.
type: string
token:
description: |
The term to be added to the index.
type: string
doc_id:
description: |
The unique hash for each document.
type: string
line_index:
description: |
Line index at which the term occurs in the document.
type: integer
token_index:
description: |
Position of the term in the document.
type: integer
terms:
type: object
properties:
code:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/term'
results:
type: object
properties:
count:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/result'
result:
type: object
properties:
doc_id:
type: string
score:
type: integer
这两个 API 规范描述了两个组件如何相互交互,以及用户如何与它们交互。但是,这并不是完整的图片,因为即使我们只显示了两个 API 定义,实际的实现将有三个 Librarian 实例!
用户通过与 Concierge 通过/api/feeder
和/api/query
进行交互。Concierge 可以通过/api/index
和/api/query
与三个 librarian 实例进一步交互。下图显示了应用程序在广义上的外观:
Goophr 应用程序的设计
考虑到当我们需要构建一个真正的 Web 应用程序,该应用程序将被多个用户使用;在这种情况下,我们希望有多个我们的服务实例运行,以便它们可以同时为所有用户提供服务。我们可能还将我们的应用程序拆分为多个 API,并且我们需要深入了解如何设计我们的系统来处理这样的分布式工作负载。因此,为了了解如何处理这样的系统,我们将使用三个 Librarian 实例。
项目结构
根据上图,我们已经设计了我们的应用程序,其中包括一个 Goophr Concierge 实例和三个 Goophr Librarian 实例。为了保持我们的代码可管理,我们将把源代码分成两个主要实体和一个根级别的docker-compose
文件:
-
Concierge
-
图书管理员
-
docker-compose.yaml
在第一章,Go 的开发环境中,我们讨论了如何创建和运行 docker 镜像。docker run ...
对于单个镜像效果很好,但当我们想要创建一个相互交互的 docker 镜像网络时,可能会变得复杂。为了保持设置简单,我们将使用docker-compose
(docs.docker.com/compose/overview/
)。简而言之,docker-compose
需要一个YAML(另一种标记语言)文件,其中包含具体信息,例如要给正在运行的 docker 镜像命名,要在哪些端口上运行它们,以及要使用哪个Dockerfile
来构建这些 docker 镜像。
以下是我们项目中将使用的docker-compose.yaml
文件:
version: '3'
services:
concierge:
build: concierge/.
ports:
- "6060:9000"
a_m_librarian:
build: librarian/.
ports:
- "7070:9000"
n_z_librarian:
build: librarian/.
ports:
- "8080:9000"
others_librarian:
build: librarian/.
ports:
- "9090:9000"
请注意,a_m_librarian
,n_z_librarian
和others_librarian
都是从由librarian/Dockerfile
定义的相同 docker 镜像构建的。这比使用原始的docker
命令启动和配置多个实例更容易。
这是我们将要开始的项目结构:
$ tree . ├── concierge │ ├── api │ │ ├── feeder.go │ │ └── query.go │ ├── common │ │ ├── helpers.go │ │ └── state.go │ ├── Dockerfile │ └── main.go ├── docker-compose.yaml └── librarian ├── api │ ├── index.go │ └── query.go ├── common │ ├── helpers.go │ └── state.go ├── Dockerfile └── main.go
尽管我们已经建立了一个精心设计的结构,但目前,唯一具有任何有用代码的文件是concierge/main.go
,concierge/Dockerfile
,librarian/main.go
和librarian/Dockerfile
(为了方便起见,从现在开始,我们将使用简写符号{concierge,librarian}
/{main.go,Dockerfile}
来表示这些文件。这种表示法受到 Bash 的启发。)
让我们来看一下main.go
和Dockerfile
。这两个文件对于两个组件来说几乎是相同的。为了简洁起见,我们将分别展示这两种文件,并展示它们的区别所在。
让我们从main.go
开始:
// {concierge,librarian}/main.go
package main
import "fmt"
func main() {
fmt.Println("Hello from Concierge!") // Or, Hello from Librarian!
}
现在让我们来看一下Dockerfile
:
# {concierge,librarian}/Dockerfile FROM golang:1.9.1 # In case of librarian, '/concierge' will be replaced with '/librarian' ADD . /go/src/github.com/last-ent/distributed-go/chapter5/goophr/concierge WORKDIR /go/src/github.com/last-ent/distributed-go/chapter5/goophr/concierge RUN go install github.com/last-ent/distributed-go/chapter5/goophr/concierge ENTRYPOINT /go/bin/concierge EXPOSE 9000
如果我们运行完整的代码库,我们应该会看到类似以下的输出:
$ docker-compose up --build
# ...
Creating goophr_a_m_librarian_1 ...
Creating goophr_concierge_1 ...
Creating goophr_m_z_librarian_1 ...
Creating goophr_others_librarian_1 ...
Creating goophr_a_m_librarian_1
Creating goophr_m_z_librarian_1
Creating goophr_others_librarian_1
Creating goophr_others_librarian_1 ... done
Attaching to goophr_a_m_librarian_1, goophr_m_z_librarian_1, goophr_concierge_1, goophr_others_librarian_1
a_m_librarian_1 | Hello from Librarian!
m_z_librarian_1 | Hello from Librarian!
others_librarian_1 | Hello from Librarian!
concierge_1 | Hello from Concierge!
goophr_a_m_librarian_1 exited with code 0
goophr_m_z_librarian_1 exited with code 0
goophr_concierge_1 exited with code 0
goophr_others_librarian_1 exited with code 0
摘要
在本章中,我们首先描述了我们将在接下来的三章中构建的应用程序。然后我们将应用程序分成了两个主要组件——Goophr Concierge 和 Goophr Librarian。接下来,我们看了一下我们将在应用程序中使用的项目结构。我们还讨论了 OpenAPI,这是描述 REST API 的行业标准,并用它来定义我们的 Concierge 和 Librarian 的 API。最后,我们看了一下如何使用docker-compose
运行我们的分布式应用程序。
在下一章中,我们将看一下 Goophr Concierge,它将与用户交互以上传文档,并响应用户的搜索查询。
第六章:Goophr Concierge
在前一章第五章中,介绍 Goophr,我们将我们的应用程序分成了两个组件:Concierge 和 Librarian。在本章中,我们将看一下 Concierge 的设计和实现。本章的主要部分如下:
-
深入了解文档馈送器和查询处理程序 API
-
解释 Concierge 的架构和逻辑流的图表
-
Concierge 的测试
重新审视 API 定义
让我们再次查看 Concierge 的 API 定义,并讨论定义对 API 和应用程序预期行为的传达:
# openapi/concierge.yaml
openapi: 3.0.0
servers:
- url: /api
info:
title: Goophr Concierge API
version: '1.0'
description: >
API responsible for responding to user input and communicating with Goophr
Librarian.
paths:
/feeder:
post:
description: |
Register new document to be indexed.
responses:
'200':
description: |
Request was successfully completed.
content:
application/json:
schema:
$ref: '#/components/schemas/response'
'400':
description: >
Request was not processed because payload was incomplete or incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/response'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/document'
required: true
/query:
post:
description: |
Search query
responses:
'200':
description: |
Response consists of links to document
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/document'
requestBody:
content:
application/json:
schema:
type: array
items:
type: string
required: true
components:
schemas:
response:
type: object
properties:
code:
type: integer
description: Status code to send in response
msg:
type: string
description: Message to send in response
document:
type: object
required:
- title
- link
properties:
title:
type: string
description: Title of the document
link:
type: string
description: Link to the document
根据 API 定义,我们可以说明如下:
-
所有与 Concierge 的通信都使用 JSON 格式进行。
-
Concierge 有两个端点,分别是
/api/feeder
和/api/query
-
/api/feeder
:这使用POST
方法添加新文档 -
/api/query
:这使用POST
方法接收搜索查询词,并返回与搜索词相关的文档列表
现在让我们详细看看每个端点。
文档馈送器 - REST API 端点
/api/feeder
的主要目的是接收要索引的文档,处理它们,并将处理后的数据转发给图书管理员以添加到索引中。这意味着我们需要准确处理文档。但是,“处理文档”是什么意思呢?
它可以定义为以下一系列连续的任务:
-
我们依赖有效载荷为我们提供标题和文档链接。我们下载链接的文档并在我们的索引中使用它。
-
文档可以被视为一个大的文本块,可能会有多个具有相同标题的文档。我们需要能够唯一标识每个文档,并且能够轻松地检索它们。
-
搜索查询的结果期望所提供的单词出现在文档中。这意味着我们需要从文档中提取所有单词,并跟踪单词在文档中的位置。
-
区分“HELLO”、“hello”和“HELLO!!!”有意义吗?在它们出现的文本上下文中,它们确实传达了不同的含义。但是,对于索引来说,这取决于我们想要使索引变得多么复杂和准确。对于我们的情况,我们保持实现简单,因此我们规范化单词,也就是说,我们将单词的所有这些变体视为单个单元/令牌。此外,我们不索引代词、冠词、介词等。
对于搜索引擎来说,代词、冠词等被称为停用词,通常在索引中被忽略。主要原因是,虽然它们为用户提供了有价值的信息,但它们往往对索引几乎没有相关性。
- 最后,我们想将所有这些令牌添加到由图书管理员维护的索引中。
在 Concierge 的源代码中,每个前述任务都由特定的函数处理。以下是显示每个任务的相关函数的列表:
-
任务 1:
api.FeedHandler
和api.docProcessor
-
任务 2:
api.docStore
和api.lineStore
-
任务 3 和任务 4:
api.indexProcessor
和common.SimplifyToken
-
任务 5:
api.indexAdder
查询处理程序 - REST API 端点
同样,如果我们考虑在/api/query
处理搜索查询的情况,我们应该能够从有效载荷中获取搜索词,从图书管理员的各个实例请求结果,处理它们,然后以搜索相关性的降序返回搜索结果给用户。但是,由于我们尚未实现图书管理员,我们将在第八章中稍后讨论此端点的实现,部署 Goophr,分布式搜索索引。
约定
Concierge 的源代码有很多组成部分。在没有任何先前理解的情况下直接跳入代码可能不是最好的方法。相反,我们将把前几节中定义的任务作为流程图呈现出来。然而,首先让我们简要看一下我们在图表和代码中使用的符号和命名约定。
代码约定
以下是 Concierge 中的实体:
-
有效负载(p):这代表接收到的用于向索引添加新文档的有效负载。
-
文档(d):这代表表示唯一文档的所有元数据。
-
行(l):这代表文档中单行的所有元数据。
-
标记(t):这代表文档中每个标记的所有元数据。
-
消息(xMsg):对于给定的实体x,它提供了用于识别唯一实体和返回唯一实体的回调通道的信息。
-
处理通道(xProcessCh):对于给定的实体x,该通道由xProcessor goroutine 使用来消耗和处理实体。
-
存储(或数据存储):Concierge 还负责存储和维护系统中所有文档和行的信息。
-
存储通道(xStoreCh):对于给定的实体x,该通道用于更新实体的存储。
-
获取通道(xGetCh或xGetAllCh):这些通道由存储使用,提供一种机制来使用回调通道检索实体。
-
done:这是一个特殊的通道,一旦关闭,将停止所有正在运行的 goroutines。我们应该小心关闭这个通道,而不是在上面发送消息,原因是发送消息只会向一个 goroutine 发出停止信号。相反,如果我们关闭通道,所有监听该通道的 goroutines 都将收到停止消息。
让我们看一些例子,以便我们对约定有完美的理解:
-
dStoreCh:这是用于向文档存储添加新文档的通道
-
dGetCh:这是从文档存储获取单个文档的通道
图表约定
接下来,让我们看一下我们将在图表中使用的符号:
现在,让我们通过逻辑流程图来可视化 Concierge 的逻辑。
逻辑流程图
我们可以将 Concierge 的逻辑分为五个主要部分。我们将解决每个单独部分所需的逻辑流程,然后最后将它们合并在一起,以获得我们试图实现的整体情况。
文档处理器
首先,我们想要接受发送到端点的有效负载并开始处理文档。假设api.FeedHandler
接受、验证并将有效负载发送到pProcessCh:
文档存储
让我们来考虑一下dStoreCh,这是用于添加和检索文档的通道:
索引处理器
除了添加到docstore
中,docProcessor
还将文档发送到indexProcessor
,后者负责存储文档中的行并将行转换为标记:
行存储
indexProcessor
将文档拆分为行,lineStore
负责存储它们,并在查询时返回它们:
indexProcessor
还将行拆分为标记,并将它们添加到iAddCh
通道。indexAdder
负责将这些标记添加到索引(图书管理员)中。
综合流程图
现在我们已经定义了每个单独部分,您可能已经注意到它们相互衔接,并且它们之间有一些共享的组件。现在让我们整合所有这些流程图:
这可能是一个很好的机会,让你自己尝试构建 Concierge。但是,请阅读以下三个设计要点,以完全了解系统。
队列工作者
在综合流程图中,您可能已经注意到我们运行了四个docProcessor
、indexProcessor
和indexAdder
的实例。这样做的原因是这些 goroutine 处理的任务是尴尬地并行的,也就是说,它们可以在没有副作用的情况下并行运行。这使我们能够并行处理文档,加快处理速度。
单个存储
相比之下,我们将docStore
和lineStore
作为单个实例运行,因为我们希望为这些存储保持一致的状态。
缓冲通道
对于我们代码中的几乎所有通道,我们将使用容量为 8 的缓冲通道。这样可以避免在docProcessors
忙碌时阻塞api.FeedHandler
端点。另外,由于队列工作者和单个存储,lStoreCh
和dStoreCh
的容量分别为 16。
Concierge 源代码
现在我们已经详细讨论了 Concierge 的设计,让我们根据这些设计要点实现 Concierge。我们将在第八章,部署 Goophr中讨论api/query.go
和 Dockerfile 的实现。让我们看看项目结构和源代码:
$ tree
.
└── goophr
└── concierge
├── api
│ ├── feeder.go
│ ├── feeder_test.go
│ └── query.go
├── common
│ ├── helpers.go
├── Dockerfile
└── main.go
4 directories, 6 files
现在让我们看看每个文件的源代码:
main.go:
package main
import (
"net/http"
"github.com/last-ent/distributed-go/chapter6/goophr/concierge/api"
"github.com/last-ent/distributed-go/chapter6/goophr/concierge/common"
)
func main() {
common.Log("Adding API handlers...")
http.HandleFunc("/api/feeder", api.FeedHandler)
common.Log("Starting feeder...")
api.StartFeederSystem()
common.Log("Starting Goophr Concierge server on port :8080...")
http.ListenAndServe(":8080", nil)
}
common/helpers.go:
package common
import (
"fmt"
"log"
"regexp"
"strings"
)
// Log is used for simple logging to console.
func Log(msg string) {
log.Println("INFO - ", msg)
}
// Warn is used to log warning messages to console.
func Warn(msg string) {
log.Println("---------------------------")
log.Println(fmt.Sprintf("WARN: %s", msg))
log.Println("---------------------------")
}
var punctuations = regexp.MustCompile('^\p{P}+|\p{P}+$')
// List of stop words that we want to ignore in our index.
var stopWords = []string{
"a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "aren't", "as", "at",
"be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can't", "cannot", "could",
"couldn't", "did", "didn't", "do", "does", "doesn't", "doing", "don't", "down", "during", "each", "few", "for",
"from", "further", "had", "hadn't", "has", "hasn't", "have", "haven't", "having", "he", "he'd", "he'll", "he's",
"her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm",
"i've", "if", "in", "into", "is", "isn't", "it", "it's", "its", "itself", "let's", "me", "more", "most", "mustn't",
"my", "myself", "no", "nor", "not", "of", "off", "on", "once", "only", "or", "other", "ought", "our", "ours",
"ourselves", "out", "over", "own", "same", "shan't", "she", "she'd", "she'll", "she's", "should", "shouldn't",
"so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there",
"there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too",
"under", "until", "up", "very", "was", "wasn't", "we", "we'd", "we'll", "we're", "we've", "were", "weren't", "what",
"what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with",
"won't", "would", "wouldn't", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves"}
// SimplifyToken is responsible to normalizing a string token and
// also checks whether the token should be indexed or not.
func SimplifyToken(token string) (string, bool) {
simpleToken := strings.ToLower(punctuations.ReplaceAllString(token, ""))
for _, stopWord := range stopWords {
if stopWord == simpleToken {
return "", false
}
}
return simpleToken, true
}
api/feeder.go:
package api
import (
"crypto/sha1"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/last-ent/distributed-go/chapter6/goophr/concierge/common"
)
type payload struct {
URL string 'json:"url"'
Title string 'json:"title"'
}
type document struct {
Doc string 'json:"-"'
Title string 'json:"title"'
DocID string 'json:"DocID"'
}
type token struct {
Line string 'json:"-"'
Token string 'json:"token"'
Title string 'json:"title"'
DocID string 'json:"doc_id"'
LIndex int 'json:"line_index"'
Index int 'json:"token_index"'
}
type dMsg struct {
DocID string
Ch chan document
}
type lMsg struct {
LIndex int
DocID string
Ch chan string
}
type lMeta struct {
LIndex int
DocID string
Line string
}
type dAllMsg struct {
Ch chan []document
}
// done signals all listening goroutines to stop.
var done chan bool
// dGetCh is used to retrieve a single document from store.
var dGetCh chan dMsg
// lGetCh is used to retrieve a single line from store.
var lGetCh chan lMsg
// lStoreCh is used to put a line into store.
var lStoreCh chan lMeta
// iAddCh is used to add token to index (Librarian).
var iAddCh chan token
// dStoreCh is used to put a document into store.
var dStoreCh chan document
// dProcessCh is used to process a document and convert it to tokens.
var dProcessCh chan document
// dGetAllCh is used to retrieve all documents in store.
var dGetAllCh chan dAllMsg
// pProcessCh is used to process the /feeder's payload and start the indexing process.
var pProcessCh chan payload
// StartFeederSystem initializes all channels and starts all goroutines.
// We are using a standard function instead of 'init()'
// because we don't want the channels & goroutines to be initialized during testing.
// Unless explicitly required by a particular test.
func StartFeederSystem() {
done = make(chan bool)
dGetCh = make(chan dMsg, 8)
dGetAllCh = make(chan dAllMsg)
iAddCh = make(chan token, 8)
pProcessCh = make(chan payload, 8)
dStoreCh = make(chan document, 8)
dProcessCh = make(chan document, 8)
lGetCh = make(chan lMsg)
lStoreCh = make(chan lMeta, 8)
for i := 0; i < 4; i++ {
go indexAdder(iAddCh, done)
go docProcessor(pProcessCh, dStoreCh, dProcessCh, done)
go indexProcessor(dProcessCh, lStoreCh, iAddCh, done)
}
go docStore(dStoreCh, dGetCh, dGetAllCh, done)
go lineStore(lStoreCh, lGetCh, done)
}
// indexAdder adds token to index (Librarian).
func indexAdder(ch chan token, done chan bool) {
for {
select {
case tok := <-ch:
fmt.Println("adding to librarian:", tok.Token)
case <-done:
common.Log("Exiting indexAdder.")
return
}
}
}
// lineStore maintains a catalog of all lines for all documents being indexed.
func lineStore(ch chan lMeta, callback chan lMsg, done chan bool) {
store := map[string]string{}
for {
select {
case line := <-ch:
id := fmt.Sprintf("%s-%d", line.DocID, line.LIndex)
store[id] = line.Line
case ch := <-callback:
line := ""
id := fmt.Sprintf("%s-%d", ch.DocID, ch.LIndex)
if l, exists := store[id]; exists {
line = l
}
ch.Ch <- line
case <-done:
common.Log("Exiting docStore.")
return
}
}
}
// indexProcessor is responsible for converting a document into tokens for indexing.
func indexProcessor(ch chan document, lStoreCh chan lMeta, iAddCh chan token, done chan bool) {
for {
select {
case doc := <-ch:
docLines := strings.Split(doc.Doc, "\n")
lin := 0
for _, line := range docLines {
if strings.TrimSpace(line) == "" {
continue
}
lStoreCh <- lMeta{
LIndex: lin,
Line: line,
DocID: doc.DocID,
}
index := 0
words := strings.Fields(line)
for _, word := range words {
if tok, valid := common.SimplifyToken(word); valid {
iAddCh <- token{
Token: tok,
LIndex: lin,
Line: line,
Index: index,
DocID: doc.DocID,
Title: doc.Title,
}
index++
}
}
lin++
}
case <-done:
common.Log("Exiting indexProcessor.")
return
}
}
}
// docStore maintains a catalog of all documents being indexed.
func docStore(add chan document, get chan dMsg, dGetAllCh chan dAllMsg, done chan bool) {
store := map[string]document{}
for {
select {
case doc := <-add:
store[doc.DocID] = doc
case m := <-get:
m.Ch <- store[m.DocID]
case ch := <-dGetAllCh:
docs := []document{}
for _, doc := range store {
docs = append(docs, doc)
}
ch.Ch <- docs
case <-done:
common.Log("Exiting docStore.")
return
}
}
}
// docProcessor processes new document payloads.
func docProcessor(in chan payload, dStoreCh chan document, dProcessCh chan document, done chan bool) {
for {
select {
case newDoc := <-in:
var err error
doc := ""
if doc, err = getFile(newDoc.URL); err != nil {
common.Warn(err.Error())
continue
}
titleID := getTitleHash(newDoc.Title)
msg := document{
Doc: doc,
DocID: titleID,
Title: newDoc.Title,
}
dStoreCh <- msg
dProcessCh <- msg
case <-done:
common.Log("Exiting docProcessor.")
return
}
}
}
// getTitleHash returns a new hash ID everytime it is called.
// Based on: https://gobyexample.com/sha1-hashes
func getTitleHash(title string) string {
hash := sha1.New()
title = strings.ToLower(title)
str := fmt.Sprintf("%s-%s", time.Now(), title)
hash.Write([]byte(str))
hByte := hash.Sum(nil)
return fmt.Sprintf("%x", hByte)
}
// getFile returns file content after retrieving it from URL.
func getFile(URL string) (string, error) {
var res *http.Response
var err error
if res, err = http.Get(URL); err != nil {
errMsg := fmt.Errorf("Unable to retrieve URL: %s.\nError: %s", URL, err)
return "", errMsg
}
if res.StatusCode > 200 {
errMsg := fmt.Errorf("Unable to retrieve URL: %s.\nStatus Code: %d", URL, res.StatusCode)
return "", errMsg
}
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil {
errMsg := fmt.Errorf("Error while reading response: URL: %s.\nError: %s", URL, res.StatusCode, err.Error())
return "", errMsg
}
return string(body), nil
}
// FeedHandler start processing the payload which contains the file to index.
func FeedHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
ch := make(chan []document)
dGetAllCh <- dAllMsg{Ch: ch}
docs := <-ch
close(ch)
if serializedPayload, err := json.Marshal(docs); err == nil {
w.Write(serializedPayload)
} else {
common.Warn("Unable to serialize all docs: " + err.Error())
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte('{"code": 500, "msg": "Error occurred while trying to retrieve documents."}'))
}
return
} else if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var newDoc payload
decoder.Decode(&newDoc)
pProcessCh <- newDoc
w.Write([]byte('{"code": 200, "msg": "Request is being processed."}'))
}
api/feeder_test.go:
package api
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
)
func TestGetTitleHash(t *testing.T) {
h1 := getTitleHash("A-Title")
h2 := getTitleHash("Diff Title")
hDup := getTitleHash("A-Title")
for _, tc := range []struct {
name string
hashes []string
expected bool
}{
{"Different Titles", []string{h1, h2}, false},
{"Duplicate Titles", []string{h1, hDup}, false},
{"Same hashes", []string{h2, h2}, true},
} {
t.Run(tc.name, func(t *testing.T) {
actual := tc.hashes[0] == tc.hashes[1]
if actual != tc.expected {
t.Error(actual, tc.expected, tc.hashes)
}
})
}
}
func TestGetFile(t *testing.T) {
doc := "Server returned text!"
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(doc))
}))
defer testServer.Close()
rDoc, err := getFile(testServer.URL)
if err != nil {
t.Error("Error while retrieving document", err)
}
if doc != rDoc {
t.Error(doc, "!=", rDoc)
}
}
func TestIndexProcessor(t *testing.T) {
ch1 := make(chan document, 1)
ch2 := make(chan lMeta, 1)
ch3 := make(chan token, 3)
done := make(chan bool)
go indexProcessor(ch1, ch2, ch3, done)
ch1 <- document{
DocID: "a-hash",
Title: "a-title",
Doc: "Golang Programming rocks!",
}
for i, tc := range []string{
"golang", "programming", "rocks",
} {
t.Run(fmt.Sprintf("Testing if '%s' is returned. at index: %d", tc, i), func(t *testing.T) {
tok := <-ch3
if tok.Token != tc {
t.Error(tok.Token, "!=", tc)
}
if tok.Index != i {
t.Error(tok.Index, "!=", i)
}
})
}
close(done)
}
运行测试
在api/feeder_test.go
中,我们有三个主要的测试用例场景:
-
测试是否为每个新文档生成了唯一的哈希值
-
测试发送到
/api/feeder
端点的有效负载是否返回预期的文档内容 -
测试以确保文档的索引工作正常
在运行测试后,以下是预期的输出:
$ go test -v ./...
? github.com/last-ent/distributed-go/chapter6/goophr/concierge [no test files]
=== RUN TestGetTitleHash
=== RUN TestGetTitleHash/Different_Titles
=== RUN TestGetTitleHash/Duplicate_Titles
=== RUN TestGetTitleHash/Same_hashes
--- PASS: TestGetTitleHash (0.00s)
--- PASS: TestGetTitleHash/Different_Titles (0.00s)
--- PASS: TestGetTitleHash/Duplicate_Titles (0.00s)
--- PASS: TestGetTitleHash/Same_hashes (0.00s)
=== RUN TestGetFile
--- PASS: TestGetFile (0.00s)
=== RUN TestIndexProcessor
=== RUN TestIndexProcessor/Testing_if_'golang'_is_returned._at_index:_1
=== RUN TestIndexProcessor/Testing_if_'programming'_is_returned._at_index:_2
=== RUN TestIndexProcessor/Testing_if_'rocks'_is_returned._at_index:_3
--- PASS: TestIndexProcessor (0.00s)
--- PASS: TestIndexProcessor/Testing_if_'golang'_is_returned._at_index:_1 (0.00s)
--- PASS: TestIndexProcessor/Testing_if_'programming'_is_returned._at_index:_2 (0.00s)
--- PASS: TestIndexProcessor/Testing_if_'rocks'_is_returned._at_index:_3 (0.00s)
PASS
ok github.com/last-ent/distributed-go/chapter6/goophr/concierge/api 0.004s
? github.com/last-ent/distributed-go/chapter6/goophr/concierge/common [no test files]
Concierge 服务器
让我们尝试将书籍《黑客:计算机革命的英雄》发布到 Concierge 端点/api/feeder
。我们需要在另一个终端窗口中运行 Concierge 服务器:
$ curl -X POST -d '{"title": "Hackers: Heroes of Computer Revolution", "url": "http://www.gutenberg.org/cache/epub/729/pg729.txt"}' http://localhost:8080/api/feeder | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 162 100 51 100 111 51 111 0:00:01 --:--:-- 0:00:01 54000
{
"code": 200,
"msg": "Request is being processed."
}
接下来,让我们看看服务器上会发生什么:
$ go run main.go
2017/11/18 21:05:57 INFO - Adding API handlers...
2017/11/18 21:05:57 INFO - Starting feeder...
2017/11/18 21:05:57 INFO - Starting Goophr Concierge server on port :8080...
// ...
adding to librarian: gutenberg-tm
adding to librarian: including
adding to librarian: make
adding to librarian: u.s
adding to librarian: project
adding to librarian: gutenberg
/...
摘要
在本章中,我们深入研究了 Concierge 的feeder
组件。我们设计了系统,并使用逻辑流程图来理解代码的各个部分是如何交互的。接下来,我们用测试和一个真实的例子来测试我们的代码。
在下一章,第七章,Goophr 图书管理员中,我们将深入探讨 Goophr 图书管理员的设计和实现。
第七章:Goophr 图书管理员
在第六章中,Goophr Concierge,我们构建了负责接受新文档并将其分解为索引中使用的标记的端点。然而,Concierge 的api.indexAdder
的当前实现在打印标记到控制台后返回。在本章中,我们将实现 Goophr 图书管理员,它可以与 Concierge 交互以接受标记,并响应标记搜索查询。
在本章中,我们将讨论以下主题:
-
标准索引模型
-
倒排索引模型
-
文档索引器
-
查询解析器 API
标准索引模型
考虑一本书中的索引。每本书都有自己的索引,按字母顺序列出所有单词,并显示它们在书中的位置。然而,如果我们想要跟踪单词在多本书中的出现,检查每本书的索引就相当低效。让我们看一个例子。
一个例子 - 具有单词索引的书籍
假设我们有三本书:Book 1
,Book 2
和Book 3
,它们各自的索引如下。每个单词旁边的数字表示单词出现在哪一页:
* Book 1 (Index)
- apple - 4, 10, 20
- cat - 10, 21, 22
- zebra - 15, 25, 63
* Book 2 (Index)
- banana - 14, 19, 66
- cake - 10, 37, 45
- zebra - 67, 100, 129
* Book 3 (Index)
- apple - 36, 55, 74
- cake - 1, 9, 77
- Whale - 11, 59, 79
让我们尝试从书的索引中找到三个词。一个天真的方法可能是选择每本书并扫描它,直到找到或错过这个词:
-
苹果
-
香蕉
-
鹦鹉
* Searching for 'apple'
- Scanning Book 1\. Result: Found.
- Scanning Book 2\. Result: Not Found.
- Scanning Book 3\. Result: Found.
* Searching for 'banana'
- Scanning Book 1\. Result: Not Found.
- Scanning Book 2\. Result: Found.
- Scanning Book 3\. Result: Not Found.
* Searching for 'parrot'
- Scanning Book 1\. Result: Not Found.
- Scanning Book 2\. Result: Not Found.
- Scanning Book 3\. Result: Not Found.
简而言之,对于每个术语,我们都要遍历每本书的索引并搜索这个词。我们对每个单词都进行了整个过程,包括鹦鹉
,而这个词并不存在于任何一本书中!起初,这可能在性能上看起来是可以接受的,但是考虑当我们需要查找超过一百万本书时,我们意识到这种方法是不切实际的。
倒排索引模型
根据前面的例子,我们可以陈述如下:
-
我们需要快速查找以确定一个词是否存在于我们的索引中
-
对于任何给定的单词,我们需要一种高效的方法来列出该单词可能出现在的所有书籍
通过使用倒排索引,我们可以实现这两个好处。标准索引的映射顺序是书籍 → **单词 → 出现(页码、行号等),如前面的例子所示。如果我们使用倒排索引,映射顺序变为单词 → **书籍 → **出现(页码、行号等)。
这个改变可能看起来并不重要,但它大大改善了查找。让我们用另一个例子来看一下。
一个例子 - 书中单词的倒排索引
让我们从之前的相同例子中获取数据,但现在根据倒排索引进行分类:
* apple
- Book 1 - 4, 10, 20
- Book 3 - 36, 55, 74
* banana
- Book 2 - 14, 19, 66
* cake
- Book 2 - 10, 37, 45
- Book 3 - 1, 9, 77
* cat
- Book 1 - 10, 21, 22
* whale
- Book 3 - 11, 59, 79
* zebra
- Book 1 - 15, 25, 63
- Book 2 - 67, 100, 129
有了这个设置,我们可以高效地回答以下问题:
-
一个词是否存在于索引中?
-
一个词存在于哪些书中?
-
给定书中一个词出现在哪些页面上?
让我们再次尝试从倒排索引中找到三个单词:
-
苹果
-
香蕉
-
鹦鹉
* Searching for 'apple'
- Scanning Inverted Index. Result: Found a list of books.
* Searching for 'banana'
- Scanning Inverted Index. Result: Found a list of books.
* Searching for 'parrot'
- Scanning Inverted Index. Result: Not Found.
总结一下,我们不是逐本书进行查找,而是对每个术语进行单次查找,确定术语是否存在,如果存在,则返回包含该术语的书籍列表,这是我们的最终目标。
排名
排名和搜索结果的相关性是一个有趣且复杂的话题。所有主要的搜索引擎都有一群专门的软件工程师和计算机科学家,他们花费大量时间和精力来确保他们的算法最准确。
对于 Goophr,我们将简化排名并将其限制为搜索词的频率。搜索词频率越高,排名越高。
重新审视 API 定义
让我们来审视图书管理员的 API 定义:
openapi: 3.0.0
servers:
- url: /api
info:
title: Goophr Librarian API
version: '1.0'
description: |
API responsible for indexing & communicating with Goophr Concierge.
paths:
/index:
post:
description: |
Add terms to index.
responses:
'200':
description: |
Terms were successfully added to the index.
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/terms'
description: |
List of terms to be added to the index.
required: true
/query:
post:
description: |
Search for all terms in the payload.
responses:
'200':
description: |
Returns a list of all the terms along with their frequency,
documents the terms appear in and link to the said documents.
content:
application/json:
schema:
$ref: '#/components/schemas/results'
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
parameters: []
components:
schemas:
error:
type: object
properties:
msg:
type: string
term:
type: object
required:
- title
- token
- doc_id
- line_index
- token_index
properties:
title:
description: |
Title of the document to which the term belongs.
type: string
token:
description: |
The term to be added to the index.
type: string
doc_id:
description: |
The unique hash for each document.
type: string
line_index:
description: |
Line index at which the term occurs in the document.
type: integer
token_index:
description: |
Position of the term in the document.
type: integer
terms:
type: object
properties:
code:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/term'
results:
type: object
properties:
count:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/result'
result:
type: object
properties:
doc_id:
type: string
score:
type: integer
根据 API 定义,我们可以陈述如下:
-
所有通信都是通过 JSON 格式进行
-
图书管理员的两个端点是:
/api/index
和/api/query
-
/api/index
使用POST
方法向反向索引添加新的标记 -
/api/query
使用POST
方法接收搜索查询词,并返回索引包含的所有文档的列表
文档索引器 - REST API 端点
/api/index
的主要目的是接受 Concierge 的令牌并将其添加到索引中。让我们看看我们所说的“将其添加到索引”是什么意思。
文档索引可以定义为以下一系列连续的任务:
-
我们依赖有效负载提供我们存储令牌所需的所有元信息。
-
我们沿着倒排索引树向下,创建路径中尚未创建的任何节点,最后添加令牌详细信息。
查询解析器-REST API 端点
/api/query
的主要目的是在倒排索引中找到一组搜索词,并按相关性递减的顺序返回文档 ID 列表。让我们看看我们所说的“查询搜索词”和“相关性”是什么意思。
查询解析可以定义为以下一系列连续的任务:
-
对于每个搜索词,我们希望以倒排索引形式检索所有可用的书籍。
-
接下来,我们希望在简单的查找表(
map
)中存储每本书中所有单词的出现计数。 -
一旦我们有了一本书及其相应计数的映射,我们就可以将查找表转换为有序文档 ID 及其相应分数的数组。
代码约定
本章的代码非常简单直接,并且遵循与第六章相同的代码约定,Goophr Concierge。所以让我们直接进入代码。
Librarian 源代码
现在我们已经详细讨论了 Librarian 的设计,让我们看看项目结构和源代码:
$ tree . ├── api │ ├── index.go │ └── query.go ├── common │ ├── helpers.go ├── Dockerfile ├── main.go
两个目录和五个文件!
现在让我们看看每个文件的源代码。
main.go
源文件负责初始化路由,启动索引系统和启动 Web 服务器:
package main
import (
"net/http"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/api"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/common"
)
func main() {
common.Log("Adding API handlers...")
http.HandleFunc("/api/index", api.IndexHandler)
http.HandleFunc("/api/query", api.QueryHandler)
common.Log("Starting index...")
api.StartIndexSystem()
common.Log("Starting Goophr Librarian server on port :9090...")
http.ListenAndServe(":9090", nil)
}
common/helpers.go
源文件包含专门针对一个处理程序的代码。
package common
import (
"fmt"
"log"
)
func Log(msg string) {
log.Println("INFO - ", msg)
}
func Warn(msg string) {
log.Println("---------------------------")
log.Println(fmt.Sprintf("WARN: %s", msg))
log.Println("---------------------------")
}
api/index.go
包含代码以处理并向索引添加新项的源文件。
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// tPayload is used to parse the JSON payload consisting of Token data.
type tPayload struct {
Token string 'json:"token"'
Title string 'json:"title"'
DocID string 'json:"doc_id"'
LIndex int 'json:"line_index"'
Index int 'json:"token_index"'
}
type tIndex struct {
Index int
LIndex int
}
func (ti *tIndex) String() string {
return fmt.Sprintf("i: %d, li: %d", ti.Index, ti.LIndex)
}
type tIndices []tIndex
// document - key in Indices represent Line Index.
type document struct {
Count int
DocID string
Title string
Indices map[int]tIndices
}
func (d *document) String() string {
str := fmt.Sprintf("%s (%s): %d\n", d.Title, d.DocID, d.Count)
var buffer bytes.Buffer
for lin, tis := range d.Indices {
var lBuffer bytes.Buffer
for _, ti := range tis {
lBuffer.WriteString(fmt.Sprintf("%s ", ti.String()))
}
buffer.WriteString(fmt.Sprintf("@%d -> %s\n", lin, lBuffer.String()))
}
return str + buffer.String()
}
// documentCatalog - key represents DocID.
type documentCatalog map[string]*document
func (dc *documentCatalog) String() string {
return fmt.Sprintf("%#v", dc)
}
// tCatalog - key in map represents Token.
type tCatalog map[string]documentCatalog
func (tc *tCatalog) String() string {
return fmt.Sprintf("%#v", tc)
}
type tcCallback struct {
Token string
Ch chan tcMsg
}
type tcMsg struct {
Token string
DC documentCatalog
}
// pProcessCh is used to process /index's payload and start process to add the token to catalog (tCatalog).
var pProcessCh chan tPayload
// tcGet is used to retrieve a token's catalog (documentCatalog).
var tcGet chan tcCallback
func StartIndexSystem() {
pProcessCh = make(chan tPayload, 100)
tcGet = make(chan tcCallback, 20)
go tIndexer(pProcessCh, tcGet)
}
// tIndexer maintains a catalog of all tokens along with where they occur within documents.
func tIndexer(ch chan tPayload, callback chan tcCallback) {
store := tCatalog{}
for {
select {
case msg := <-callback:
dc := store[msg.Token]
msg.Ch <- tcMsg{
DC: dc,
Token: msg.Token,
}
case pd := <-ch:
dc, exists := store[pd.Token]
if !exists {
dc = documentCatalog{}
store[pd.Token] = dc
}
doc, exists := dc[pd.DocID]
if !exists {
doc = &document{
DocID: pd.DocID,
Title: pd.Title,
Indices: map[int]tIndices{},
}
dc[pd.DocID] = doc
}
tin := tIndex{
Index: pd.Index,
LIndex: pd.LIndex,
}
doc.Indices[tin.LIndex] = append(doc.Indices[tin.LIndex], tin)
doc.Count++
}
}
}
func IndexHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var tp tPayload
decoder.Decode(&tp)
log.Printf("Token received%#v\n", tp)
pProcessCh <- tp
w.Write([]byte('{"code": 200, "msg": "Tokens are being added to index."}'))
}
api/query.go
源文件包含负责根据搜索词返回排序结果的代码。
package api
import (
"encoding/json"
"net/http"
"sort"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/common"
)
type docResult struct {
DocID string 'json:"doc_id"'
Score int 'json:"doc_score"'
Indices tIndices 'json:"token_indices"'
}
type result struct {
Count int 'json:"count"'
Data []docResult 'json:"data"'
}
// getResults returns unsorted search results & a map of documents containing tokens.
func getResults(out chan tcMsg, count int) tCatalog {
tc := tCatalog{}
for i := 0; i < count; i++ {
dc := <-out
tc[dc.Token] = dc.DC
}
close(out)
return tc
}
func getFScores(docIDScore map[string]int) (map[int][]string, []int) {
// fScore maps frequency score to set of documents.
fScore := map[int][]string{}
fSorted := []int{}
for dID, score := range docIDScore {
fs := fScore[score]
fScore[score] = []string{}
}
fScore[score] = append(fs, dID)
fSorted = append(fSorted, score)
}
sort.Sort(sort.Reverse(sort.IntSlice(fSorted)))
return fScore, fSorted
}
func getDocMaps(tc tCatalog) (map[string]int, map[string]tIndices) {
// docIDScore maps DocIDs to occurences of all tokens.
// key: DocID.
// val: Sum of all occurences of tokens so far.
docIDScore := map[string]int{}
docIndices := map[string]tIndices{}
// for each token's catalog
for _, dc := range tc {
// for each document registered under the token
for dID, doc := range dc {
// add to docID score
var tokIndices tIndices
for _, tList := range doc.Indices {
tokIndices = append(tokIndices, tList...)
}
docIDScore[dID] += doc.Count
dti := docIndices[dID]
docIndices[dID] = append(dti, tokIndices...)
}
}
return docIDScore, docIndices
}
func sortResults(tc tCatalog) []docResult {
docIDScore, docIndices := getDocMaps(tc)
fScore, fSorted := getFScores(docIDScore)
results := []docResult{}
addedDocs := map[string]bool{}
for _, score := range fSorted {
for _, docID := range fScore[score] {
if _, exists := addedDocs[docID]; exists {
continue
}
results = append(results, docResult{
DocID: docID,
Score: score,
Indices: docIndices[docID],
})
addedDocs[docID] = false
}
}
return results
}
// getSearchResults returns a list of documents.
// They are listed in descending order of occurences.
func getSearchResults(sts []string) []docResult {
callback := make(chan tcMsg)
for _, st := range sts {
go func(term string) {
tcGet <- tcCallback{
Token: term,
Ch: callback,
}
}(st)
}
cts := getResults(callback, len(sts))
results := sortResults(cts)
return results
}
func QueryHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var searchTerms []string
decoder.Decode(&searchTerms)
results := getSearchResults(searchTerms)
payload := result{
Count: len(results),
Data: results,
}
if serializedPayload, err := json.Marshal(payload); err == nil {
w.Header().Add("Content-Type", "application/json")
w.Write(serializedPayload)
} else {
common.Warn("Unable to serialize all docs: " + err.Error())
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte('{"code": 500, "msg": "Error occurred while trying to retrieve documents."}'))
}
}
测试 Librarian
为了测试 Librarian 是否按预期工作,我们需要测试两件事:
-
检查
/api/index
是否接受索引项。 -
检查
/api/query
是否返回正确的结果并且顺序符合预期。
我们可以使用一个单独的程序/脚本feeder.go
来测试第 1 点,使用简单的 cURL 命令来测试第 2 点。
使用/api/index 测试feeder.go
这是feeder.go
脚本,用于检查/api/index
是否接受索引项:
package main
import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"net/http"
)
type tPayload struct {
Token string 'json:"token"'
Title string 'json:"title"'
DocID string 'json:"doc_id"'
LIndex int 'json:"line_index"'
Index int 'json:"token_index"'
}
type msgS struct {
Code int 'json:"code"'
Msg string 'json:"msg"'
}
func main() {
// Searching for "apple" should return Book 1 at the top of search results.
// Searching for "cake" should return Book 3 at the top.
for bookX, terms := range map[string][]string{
"Book 1": []string{"apple", "apple", "cat", "zebra"},
"Book 2": []string{"banana", "cake", "zebra"},
"Book 3": []string{"apple", "cake", "cake", "whale"},
} {
for lin, term := range terms {
payload, _ := json.Marshal(tPayload{
Token: term,
Title: bookX + term,
DocID: bookX,
LIndex: lin,
})
resp, err := http.Post(
"http://localhost:9090/api/index",
"application/json",
bytes.NewBuffer(payload),
)
if err != nil {
panic(err)
}
body, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
var msg msgS
json.Unmarshal(body, &msg)
log.Println(msg)
}
}
}
运行feeder.go
(在另一个窗口中运行 Librarian)的输出如下:
$ go run feeder.go
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
前述程序的 Librarian 输出如下:
$ go run goophr/librarian/main.go
2018/01/04 12:53:25 INFO - Adding API handlers...
2018/01/04 12:53:25 INFO - Starting index...
2018/01/04 12:53:25 INFO - Starting Goophr Librarian server on port :9090...
2018/01/04 12:53:31 Token received api.tPayload{Token:"banana", Title:"Book 2banana", DocID:"Book 2", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 2cake", DocID:"Book 2", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"zebra", Title:"Book 2zebra", DocID:"Book 2", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 3apple", DocID:"Book 3", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 3cake", DocID:"Book 3", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 3cake", DocID:"Book 3", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"whale", Title:"Book 3whale", DocID:"Book 3", LIndex:3, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 1apple", DocID:"Book 1", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 1apple", DocID:"Book 1", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cat", Title:"Book 1cat", DocID:"Book 1", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"zebra", Title:"Book 1zebra", DocID:"Book 1", LIndex:3, Index:0}
测试/api/query
为了测试/api/query
,我们需要维护服务器的前置状态以进行有用的查询:
$ # Querying for "apple" $ curl -LX POST -d '["apple"]' localhost:9090/api/query | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 202 100 193 100 9 193 9 0:00:01 --:--:-- 0:00:01 40400 { "count": 2, "data": [ { "doc_id": "Book 1", "doc_score": 2, "token_indices": [ { "Index": 0, "LIndex": 0 }, { "Index": 0, "LIndex": 1 } ] }, { "doc_id": "Book 3", "doc_score": 1, "token_indices": [ { "Index": 0, "LIndex": 0 } ] } ] } $ # Querying for "cake"
$ curl -LX POST -d '["cake"]' localhost:9090/api/query | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 201 100 193 100 8 193 8 0:00:01 --:--:-- 0:00:01 33500 { "count": 2, "data": [ { "doc_id": "Book 3", "doc_score": 2, "token_indices": [ { "Index": 0, "LIndex": 1 }, { "Index": 0, "LIndex": 2 } ] }, { "doc_id": "Book 2", "doc_score": 1, "token_indices": [ { "Index": 0, "LIndex": 1 } ] } ] }
总结
在本章中,我们了解了倒排索引并为 Librarian 实现了高效的存储和查找搜索词。我们还使用脚本feeder.go
和 cURL 命令检查了我们的实现。
在下一章,第八章,部署 Goophr,我们将重写 Concierge 的api.indexAdder
,以便它可以开始将要索引的令牌发送给 Librarian。我们还将重新访问docker-compose.yaml
,以便我们可以运行完整的应用程序并将其用作分布式系统进行使用/测试。
第八章:部署 Goophr
在第六章中,Goophr Concierge和第七章中,Goophr Librarian,我们构建了 Goophr 的两个组件:Concierge 和 Librarian。我们花时间了解了每个组件设计背后的原理,以及它们如何预期一起工作。
在本章中,我们将通过实现以下目标来完成 Goophr 的构建:
-
更新
concierge/api/query.go
,以便 Concierge 可以查询多个 Librarian 实例的搜索词 -
更新
docker-compose.yaml
,以便我们可以轻松运行完整的 Goophr 系统 -
通过向索引添加文档并通过 REST API 查询索引来测试设置
更新 Goophr Concierge
为了使 Concierge 按照 Goophr 的设计完全功能,我们需要执行以下操作:
-
从多个 Librarian 请求搜索结果
-
对组合搜索结果进行排名
让我们详细讨论这些要点。
处理多个 Librarian
Goophr Librarian 的核心功能是更新索引并根据搜索词返回相关的DocID
。正如我们在实现 Librarian 的代码库时所看到的,我们需要更新索引,检索相关的DocID
,然后根据相关性对其进行排序,然后返回查询结果。涉及许多操作,并且在查找和更新时使用了许多映射。这些操作可能看起来微不足道。然而,随着查找表(映射)的大小增加,查找表上的操作性能将开始下降。为了避免性能下降,可以采取许多方法。
我们的主要目标是在 Go 的上下文中理解分布式系统,因此,我们将拆分 Librarian 以仅处理一定范围的索引。分区是数据库中使用的标准技术之一,其中数据库被分成多个分区。在我们的情况下,我们将运行三个 Librarian 实例,每个实例负责处理分配给每个分区的字符范围内的所有令牌的索引:
-
a_m_librarian
:负责以字符“A”到“M”开头的令牌的图书管理员 -
n_z_librarian
:负责以字符“N”到“Z”开头的令牌的图书管理员 -
others_librarian
:负责以数字开头的令牌的图书管理员
聚合搜索结果
下一步将是从多个 Librarian 实例聚合搜索词的结果,并将它们作为有效载荷返回给查询请求。这将要求我们执行以下操作:
-
获取所有可用图书管理员的 URL 列表
-
在接收到查询时从所有 Librarian 请求搜索结果
-
根据
DocID
聚合搜索结果 -
按相关性分数降序排序结果
-
根据 Swagger API 定义形成并返回 JSON 有效载荷
现在我们了解了拥有多个 Librarian 实例的原因,以及我们将如何根据这个新配置处理查询,我们可以将这些更改应用到concierge/api/query.go
中。
使用 docker-compose 进行编排
我们一直在我们系统的 localhost 上以硬编码的网络端口值运行 Librarian 和 Concierge 的服务器。到目前为止,我们还没有遇到任何问题。然而,当我们考虑到我们将运行三个 Librarian 实例,需要连接所有这些实例到 Concierge 并且能够轻松地启动和监视服务器时,我们意识到有很多移动部分。这可能导致在操作系统时出现不必要的错误。为了让我们的生活变得更轻松,我们可以依赖于docker-compose
,它将为我们处理所有这些复杂性。我们所要做的就是定义一个名为docker-compose.yaml
的配置 YAML 文件,其中包含以下信息:
-
确定我们想要一起运行的服务
-
在 YAML 文件中为每个服务定义的相应的 Dockerfile 或 Docker 镜像的位置或名称,以便我们可以为所有这些服务构建 Docker 镜像并将它们作为容器运行
-
要为每个正在运行的容器公开的端口
-
我们可能想要注入到我们的服务器实例中的任何其他环境变量
-
确保 Concierge 容器可以访问所有其他正在运行的容器
环境变量和 API 端口
我们提到我们将在docker-compose.yaml
中指定我们希望每个容器运行的端口。但是,我们还需要更新{concierge,librarian}/main.go
,以便它们可以在环境变量定义的端口上启动服务器。我们还需要更新concierge/query.go
,以便它可以访问由docker-compose
定义的 URL 和端口上的 Librarian 实例。
文件服务器
为了通过将文档加载到索引中快速测试我们的设置,以便能够查询系统并验证查询结果,我们还将包括一个简单的 HTTP 服务器,用于提供包含几个单词的文档。
Goophr 源代码
在前两章中,第六章 Goophr Concierge 和 第七章 Goophr Librarian,我们分别讨论了 Concierge 和 Librarian 的代码。为了使用docker-compose
运行完整的 Goophr 应用程序,我们需要将 Librarian 和 Concierge 的代码库合并为一个单一的代码库。代码库还将包括docker-compose.yaml
和文件服务器的代码。
在本章中,我们不会列出 Librarian 和 Concierge 中所有文件的代码,而只列出有更改的文件。让我们先看一下完整项目的结构:
$ tree -a
.
ε2;── goophr
├── concierge
│ ├── api
│ │ ├── feeder.go
│ │ ├── feeder_test.go
│ │ └── query.go
│ ├── common
│ │ └── helpers.go
│ ├── Dockerfile
│ └── main.go
├── docker-compose.yaml
├── .env
├── librarian
│ ├── api
│ │ ├── index.go
│ │ └── query.go
│ ├── common
│ │ └── helpers.go
│ ├── Dockerfile
│ └── main.go
└── simple-server
├── Dockerfile
└── main.go
8 directories, 15 files
librarian/main.go
我们希望允许 Librarian 根据传递给它的环境变量API_PORT
在自定义端口上启动:
package main
import (
"fmt"
"net/http"
"os"
"github.com/last-ent/distributed-go/chapter8/goophr/librarian/api"
"github.com/last-ent/distributed-go/chapter8/goophr/librarian/common"
)
func main() {
common.Log("Adding API handlers...")
http.HandleFunc("/api/index", api.IndexHandler)
http.HandleFunc("/api/query", api.QueryHandler)
common.Log("Starting index...")
api.StartIndexSystem()
port := fmt.Sprintf(":%s", os.Getenv("API_PORT"))
common.Log(fmt.Sprintf("Starting Goophr Librarian server on port %s...", port))
http.ListenAndServe(port, nil)
}
concierge/main.go
允许 Concierge 根据传递给它的环境变量API_PORT
在自定义端口上启动:
package main
import (
"fmt"
"net/http"
"os"
"github.com/last-ent/distributed-go/chapter8/goophr/concierge/api"
"github.com/last-ent/distributed-go/chapter8/goophr/concierge/common"
)
func main() {
common.Log("Adding API handlers...")
http.HandleFunc("/api/feeder", api.FeedHandler)
http.HandleFunc("/api/query", api.QueryHandler)
common.Log("Starting feeder...")
api.StartFeederSystem()
port := fmt.Sprintf(":%s", os.Getenv("API_PORT"))
common.Log(fmt.Sprintf("Starting Goophr Concierge server on port %s...", port))
http.ListenAndServe(port, nil)
}
concierge/api/query.go
查询所有可用的 Librarian 实例以检索搜索查询结果,按顺序对其进行排名,然后将结果发送回去:
package api
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"sort"
"github.com/last-ent/distributed-go/chapter8/goophr/concierge/common"
)
var librarianEndpoints = map[string]string{}
func init() {
librarianEndpoints["a-m"] = os.Getenv("LIB_A_M")
librarianEndpoints["n-z"] = os.Getenv("LIB_N_Z")
librarianEndpoints["*"] = os.Getenv("LIB_OTHERS")
}
type docs struct {
DocID string 'json:"doc_id"'
Score int 'json:"doc_score"'
}
type queryResult struct {
Count int 'json:"count"'
Data []docs 'json:"data"'
}
func queryLibrarian(endpoint string, stBytes io.Reader, ch chan<- queryResult) {
resp, err := http.Post(
endpoint+"/query",
"application/json",
stBytes,
)
if err != nil {
common.Warn(fmt.Sprintf("%s -> %+v", endpoint, err))
ch <- queryResult{}
return
}
body, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
var qr queryResult
json.Unmarshal(body, &qr)
log.Println(fmt.Sprintf("%s -> %#v", endpoint, qr))
ch <- qr
}
func getResultsMap(ch <-chan queryResult) map[string]int {
results := []docs{}
for range librarianEndpoints {
if result := <-ch; result.Count > 0 {
results = append(results, result.Data...)
}
}
resultsMap := map[string]int{}
for _, doc := range results {
docID := doc.DocID
score := doc.Score
if _, exists := resultsMap[docID]; !exists {
resultsMap[docID] = 0
}
resultsMap[docID] = resultsMap[docID] + score
}
return resultsMap
}
func QueryHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var searchTerms []string
if err := decoder.Decode(&searchTerms); err != nil {
common.Warn("Unable to parse request." + err.Error())
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte('{"code": 400, "msg": "Unable to parse payload."}'))
return
}
st, err := json.Marshal(searchTerms)
if err != nil {
panic(err)
}
stBytes := bytes.NewBuffer(st)
resultsCh := make(chan queryResult)
for _, le := range librarianEndpoints {
func(endpoint string) {
go queryLibrarian(endpoint, stBytes, resultsCh)
}(le)
}
resultsMap := getResultsMap(resultsCh)
close(resultsCh)
sortedResults := sortResults(resultsMap)
payload, _ := json.Marshal(sortedResults)
w.Header().Add("Content-Type", "application/json")
w.Write(payload)
fmt.Printf("%#v\n", sortedResults)
}
func sortResults(rm map[string]int) []document {
scoreMap := map[int][]document{}
ch := make(chan document)
for docID, score := range rm {
if _, exists := scoreMap[score]; !exists {
scoreMap[score] = []document{}
}
dGetCh <- dMsg{
DocID: docID,
Ch: ch,
}
doc := <-ch
scoreMap[score] = append(scoreMap[score], doc)
}
close(ch)
scores := []int{}
for score := range scoreMap {
scores = append(scores, score)
}
sort.Sort(sort.Reverse(sort.IntSlice(scores)))
sortedResults := []document{}
for _, score := range scores {
resDocs := scoreMap[score]
sortedResults = append(sortedResults, resDocs...)
}
return sortedResults
}
simple-server/Dockerfile
让我们使用Dockerfile
来创建一个简单的文件服务器:
FROM golang:1.10
ADD . /go/src/littlefs
WORKDIR /go/src/littlefs
RUN go install littlefs
ENTRYPOINT /go/bin/littlefs
simple-server/main.go
让我们来看一个简单的程序,根据bookID
返回一组单词作为 HTTP 响应:
package main
import (
"log"
"net/http"
)
func reqHandler(w http.ResponseWriter, r *http.Request) {
books := map[string]string{
"book1": 'apple apple cat zebra',
"book2": 'banana cake zebra',
"book3": 'apple cake cake whale',
}
bookID := r.URL.Path[1:]
book, _ := books[bookID]
w.Write([]byte(book))
}
func main() {
log.Println("Starting File Server on Port :9876...")
http.HandleFunc("/", reqHandler)
http.ListenAndServe(":9876", nil)
}
docker-compose.yaml
该文件将允许我们从单个界面构建、运行、连接和停止我们的容器。
version: '3'
services:
a_m_librarian:
build: librarian/.
environment:
- API_PORT=${A_M_PORT}
ports:
- ${A_M_PORT}:${A_M_PORT}
n_z_librarian:
build: librarian/.
environment:
- API_PORT=${N_Z_PORT}
ports:
- ${N_Z_PORT}:${N_Z_PORT}
others_librarian:
build: librarian/.
environment:
- API_PORT=${OTHERS_PORT}
ports:
- ${OTHERS_PORT}:${OTHERS_PORT}
concierge:
build: concierge/.
environment:
- API_PORT=${CONCIERGE_PORT}
- LIB_A_M=http://a_m_librarian:${A_M_PORT}/api
- LIB_N_Z=http://n_z_librarian:${N_Z_PORT}/api
- LIB_OTHERS=http://others_librarian:${OTHERS_PORT}/api
ports:
- ${CONCIERGE_PORT}:${CONCIERGE_PORT}
links:
- a_m_librarian
- n_z_librarian
- others_librarian
- file_server
file_server:
build: simple-server/.
ports:
- ${SERVER_PORT}:${SERVER_PORT}
可以使用服务名称作为域名来引用链接的服务。
.env
.env
在docker-compose.yaml
中用于加载模板变量。它遵循<template-variable>=<value>
的格式:
CONCIERGE_PORT=9090
A_M_PORT=6060
N_Z_PORT=7070
OTHERS_PORT=8080
SERVER_PORT=9876
我们可以通过运行以下命令查看替换值后的docker-compose.yaml
:
$ pwd GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr $ docker-compose config services: a_m_librarian: build: context: /home/entux/Documents/Code/GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr/librarian environment: API_PORT: '6060' ports: - 6060:6060/tcp concierge: build: context: /home/entux/Documents/Code/GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr/concierge environment: API_PORT: '9090' LIB_A_M: http://a_m_librarian:6060/api LIB_N_Z: http://n_z_librarian:7070/api LIB_OTHERS: http://others_librarian:8080/api links: - a_m_librarian - n_z_librarian - others_librarian - file_server ports: - 9090:9090/tcp file_server: build: context: /home/entux/Documents/Code/GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr/simple-server ports: - 9876:9876/tcp n_z_librarian: build: context: /home/entux/Documents/Code/GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr/librarian environment: API_PORT: '7070' ports: - 7070:7070/tcp others_librarian: build: context: /home/entux/Documents/Code/GO-WORKSPACE/src/github.com/last-ent/distributed-go/chapter8/goophr/librarian environment: API_PORT: '8080' ports: - 8080:8080/tcp version: '3.0'
使用 docker-compose 运行 Goophr
现在我们已经准备就绪,让我们启动完整的应用程序:
$ docker-compose up --build Building a_m_librarian ... Successfully built 31e0b1a7d3fc Building n_z_librarian ... Successfully built 31e0b1a7d3fc Building others_librarian ... Successfully built 31e0cdb1a7d3fc Building file_server ... Successfully built 244831d4b86a Building concierge ... Successfully built ba1167718d29 Starting goophr_a_m_librarian_1 ... Starting goophr_file_server_1 ... Starting goophr_a_m_librarian_1 Starting goophr_n_z_librarian_1 ... Starting goophr_others_librarian_1 ... Starting goophr_file_server_1 Starting goophr_n_z_librarian_1 Starting goophr_others_librarian_1 ... done Starting goophr_concierge_1 ... Starting goophr_concierge_1 ... done Attaching to goophr_a_m_librarian_1, goophr_n_z_librarian_1, goophr_file_server_1, goophr_others_librarian_1, goophr_concierge_1 a_m_librarian_1 | 2018/01/21 19:21:00 INFO - Adding API handlers... a_m_librarian_1 | 2018/01/21 19:21:00 INFO - Starting index... a_m_librarian_1 | 2018/01/21 19:21:00 INFO - Starting Goophr Librarian server on port :6060... n_z_librarian_1 | 2018/01/21 19:21:00 INFO - Adding API handlers... others_librarian_1 | 2018/01/21 19:21:01 INFO - Adding API handlers... others_librarian_1 | 2018/01/21 19:21:01 INFO - Starting index... others_librarian_1 | 2018/01/21 19:21:01 INFO - Starting Goophr Librarian server on port :8080... n_z_librarian_1 | 2018/01/21 19:21:00 INFO - Starting index... n_z_librarian_1 | 2018/01/21 19:21:00 INFO - Starting Goophr Librarian server on port :7070... file_server_1 | 2018/01/21 19:21:01 Starting File Server on Port :9876... concierge_1 | 2018/01/21 19:21:02 INFO - Adding API handlers... concierge_1 | 2018/01/21 19:21:02 INFO - Starting feeder... concierge_1 | 2018/01/21 19:21:02 INFO - Starting Goophr Concierge server on port :9090...
向 Goophr 添加文档
由于我们的文件服务器中有三个文档,我们可以使用以下curl
命令将它们添加到 Goophr 中:
$ curl -LX POST -d '{"url":"http://file_server:9876/book1","title":"Book 1"}' localhost:9090/api/feeder | jq && > curl -LX POST -d '{"url":"http://file_server:9876/book2","title":"Book 2"}' localhost:9090/api/feeder | jq && > curl -LX POST -d '{"url":"http://file_server:9876/book3","title":"Book 3"}' localhost:9090/api/feeder | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 107 100 51 100 56 51 56 0:00:01 --:--:-- 0:00:01 104k { "code": 200, "msg": "Request is being processed." } % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 107 100 51 100 56 51 56 0:00:01 --:--:-- 0:00:01 21400 { "code": 200, "msg": "Request is being processed." } % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 107 100 51 100 56 51 56 0:00:01 --:--:-- 0:00:01 21400 { "code": 200, "msg": "Request is being processed." }
以下是由docker-compose
看到的前述 cURL 请求的日志:
n_z_librarian_1 | 2018/01/21 19:29:23 Token received api.tPayload{Token:"zebra", Title:"Book 1", DocID:"6911b2295fd23c77fca7d739c00735b14cf80d3c", LIndex:0, Index:3} concierge_1 | adding to librarian: zebra concierge_1 | adding to librarian: apple concierge_1 | adding to librarian: apple concierge_1 | adding to librarian: cat concierge_1 | 2018/01/21 19:29:23 INFO - Request was posted to Librairan. Msg:{"code": 200, "msg": "Tokens are being added to index."} ... concierge_1 | 2018/01/21 19:29:23 INFO - Request was posted to Librairan. Msg:{"code": 200, "msg": "Tokens are being added to index."} a_m_librarian_1 | 2018/01/21 19:29:23 Token received api.tPayload{Token:"apple", Title:"Book 1", DocID:"6911b2295fd23c77fca7d739c00735b14cf80d3c", LIndex:0, Index:0} ... n_z_librarian_1 | 2018/01/21 19:29:23 Token received api.tPayload{Token:"zebra", Title:"Book 2", DocID:"fbf2b6c400680389459dff13283cb01dfe9be7d6", LIndex:0, Index:2} concierge_1 | adding to librarian: zebra concierge_1 | adding to librarian: banana concierge_1 | adding to librarian: cake ... concierge_1 | adding to librarian: whale concierge_1 | adding to librarian: apple concierge_1 | adding to librarian: cake concierge_1 | adding to librarian: cake ... concierge_1 | 2018/01/21 19:29:23 INFO - Request was posted to Librairan. Msg:{"code": 200, "msg": "Tokens are being added to index."}
使用 Goophr 搜索关键词
现在我们已经运行了完整的应用程序并且索引中有一些文档,让我们通过搜索一些关键词来测试它。以下是我们将要搜索的术语列表以及预期的顺序:
-
"apple" - book1 (score: 2), book 3 (score: 1)
-
"cake" - book 3 (score: 2), book 2 (score: 1)
-
"apple", "cake" - book 3 (score 3), book 1 (score: 2), book 2 (score: 1)
搜索 – "apple"
让我们使用 cURL 命令单独搜索"apple"
:
$ curl -LX POST -d '["apple"]' localhost:9090/api/query | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 124 100 115 100 9 115 9 0:00:01 --:--:-- 0:00:01 41333
[
{
"title": "Book 1",
"url": "http://file_server:9876/book1"
},
{
"title": "Book 3",
"url": "http://file_server:9876/book3"
}
]
当我们搜索"apple"
时,以下是docker-compose
的日志:
concierge_1 | 2018/01/21 20:27:11 http://n_z_librarian:7070/api -> api.queryResult{Count:0, Data:[]api.docs{}}
concierge_1 | 2018/01/21 20:27:11 http://a_m_librarian:6060/api -> api.queryResult{Count:2, Data:[]api.docs{api.docs{DocID:"7bded23abfac73630d247b6ad24370214fe1811c", Score:2}, api.docs{DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", Score:1}}}
concierge_1 | []api.document{api.document{Doc:"apple apple cat zebra", Title:"Book 1", DocID:"7bded23abfac73630d247b6ad24370214fe1811c", URL:"http://file_server:9876/book1"}, api.document{Doc:"apple cake cake whale", Title:"Book 3", DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", URL:"http://file_server:9876/book3"}}
concierge_1 | 2018/01/21 20:27:11 http://others_librarian:8080/api -> api.queryResult{Count:0, Data:[]api.docs{}}
搜索 – "cake"
让我们使用 cURL 命令单独搜索"cake"
:
$ curl -LX POST -d '["cake"]' localhost:9090/api/query | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 123 100 115 100 8 115 8 0:00:01 --:--:-- 0:00:01 61500
[
{
"title": "Book 3",
"url": "http://file_server:9876/book3"
},
{
"title": "Book 2",
"url": "http://file_server:9876/book2"
}
]
当我们搜索"cake"
时,以下是docker-compose
的日志:
concierge_1 | 2018/01/21 20:30:13 http://a_m_librarian:6060/api -> api.queryResult{Count:2, Data:[]api.docs{api.docs{DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", Score:2}, api.docs{DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", Score:1}}}
concierge_1 | 2018/01/21 20:30:13 ---------------------------
concierge_1 | 2018/01/21 20:30:13 WARN: http://others_librarian:8080/api -> Post http://others_librarian:8080/api/query: http: ContentLength=8 with Body length 0
concierge_1 | 2018/01/21 20:30:13 ---------------------------
concierge_1 | 2018/01/21 20:30:13 http://n_z_librarian:7070/api -> api.queryResult{Count:0, Data:[]api.docs{}}
concierge_1 | []api.document{api.document{Doc:"apple cake cake whale", Title:"Book 3", DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", URL:"http://file_server:9876/book3"}, api.document{Doc:"banana cake zebra", Title:"Book 2", DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", URL:"http://file_server:9876/book2"}}
搜索 – "apple", "cake"
让我们使用 cURL 命令一起搜索"apple"
和"cake"
:
$ curl -LX POST -d '["cake", "apple"]' localhost:9090/api/query | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 189 100 172 100 17 172 17 0:00:01 --:--:-- 0:00:01 27000
[
{
"title": "Book 3",
"url": "http://file_server:9876/book3"
},
{
"title": "Book 1",
"url": "http://file_server:9876/book1"
},
{
"title": "Book 2",
"url": "http://file_server:9876/book2"
}
]
当我们搜索"apple"
和"cake"
时,以下是docker-compose
日志:
concierge_1 | 2018/01/21 20:31:06 http://a_m_librarian:6060/api -> api.queryResult{Count:3, Data:[]api.docs{api.docs{DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", Score:3}, api.docs{DocID:"7bded23abfac73630d247b6ad24370214fe1811c", Score:2}, api.docs{DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", Score:1}}}
concierge_1 | 2018/01/21 20:31:06 http://n_z_librarian:7070/api -> api.queryResult{Count:0, Data:[]api.docs{}}
concierge_1 | 2018/01/21 20:31:06 ---------------------------
concierge_1 | 2018/01/21 20:31:06 WARN: http://others_librarian:8080/api -> Post http://others_librarian:8080/api/query: http: ContentLength=16 with Body length 0
concierge_1 | 2018/01/21 20:31:06 ---------------------------
concierge_1 | []api.document{api.document{Doc:"apple cake cake whale", Title:"Book 3", DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", URL:"http://file_server:9876/book3"}, api.document{Doc:"apple apple cat zebra", Title:"Book 1", DocID:"7bded23abfac73630d247b6ad24370214fe1811c", URL:"http://file_server:9876/book1"}, api.document{Doc:"banana cake zebra", Title:"Book 2", DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", URL:"http://file_server:9876/book2"}}
使用 docker-compose 的个人日志
我们还可以单独查看每个服务的日志。以下是礼宾的日志:
$ docker-compose logs concierge
Attaching to goophr_concierge_1
concierge_1 | 2018/01/21 19:18:30 INFO - Adding API handlers...
concierge_1 | 2018/01/21 19:18:30 INFO - Starting feeder...
concierge_1 | 2018/01/21 19:18:30 INFO - Starting Goophr Concierge server on port :9090...
concierge_1 | 2018/01/21 19:21:02 INFO - Adding API handlers...
concierge_1 | 2018/01/21 19:21:02 INFO - Starting feeder...
concierge_1 | 2018/01/21 19:21:02 INFO - Starting Goophr Concierge server on port :9090...
concierge_1 | adding to librarian: zebra
concierge_1 | adding to librarian: apple
concierge_1 | adding to librarian: apple
concierge_1 | adding to librarian: cat
concierge_1 | 2018/01/21 19:25:40 INFO - Request was posted to Librairan. Msg:{"code": 200, "msg": "Tokens are being added to index."}
concierge_1 | 2018/01/21 20:31:06 http://a_m_librarian:6060/api -> api.queryResult{Count:3, Data:[]api.docs{api.docs{DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", Score:3}, api.docs{DocID:"7bded23abfac73630d247b6ad24370214fe1811c", Score:2}, api.docs{DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", Score:1}}}
concierge_1 | 2018/01/21 20:31:06 http://n_z_librarian:7070/api -> api.queryResult{Count:0, Data:[]api.docs{}}
concierge_1 | 2018/01/21 20:31:06 ---------------------------
concierge_1 | 2018/01/21 20:31:06 WARN: http://others_librarian:8080/api -> Post http://others_librarian:8080/api/query: http: ContentLength=16 with Body length 0
concierge_1 | 2018/01/21 20:31:06 ---------------------------
concierge_1 | []api.document{api.document{Doc:"apple cake cake whale", Title:"Book 3", DocID:"3c9c56d31ccd51bc7ac0011020819ef38ccd74a4", URL:"http://file_server:9876/book3"}, api.document{Doc:"apple apple cat zebra", Title:"Book 1", DocID:"7bded23abfac73630d247b6ad24370214fe1811c", URL:"http://file_server:9876/book1"}, api.document{Doc:"banana cake zebra", Title:"Book 2", DocID:"28582e23c02ed3f14f8b4bdae97f91106273c0fc", URL:"[`file_server:9876/book2`](http://file_server:9876/book2)"}}
Web 服务器上的授权
我们的搜索应用程序信任每个传入的请求。然而,有时限制访问可能是正确的方式。如果对每个传入请求都能够接受和识别来自某些用户的请求,那将是可取的。这可以通过授权令牌(auth tokens)来实现。授权令牌是在标头中发送的秘密代码/短语,用于密钥Authorization。
授权和认证令牌是深奥而重要的话题。在本节中不可能涵盖主题的复杂性。相反,我们将构建一个简单的服务器,该服务器将利用认证令牌来接受或拒绝请求。让我们看看源代码。
secure/secure.go
secure.go
显示了简单服务器的逻辑。它已分为四个函数:
-
requestHandler
函数用于响应传入的 HTTP 请求。 -
isAuthorized
函数用于检查传入请求是否经过授权。 -
getAuthorizedUser
函数用于检查令牌是否有关联用户。如果令牌没有关联用户,则认为令牌无效。 -
main
函数用于启动服务器。
现在让我们看看代码:
// secure/secure.go
package main
import (
"fmt"
"log"
"net/http"
"strings"
)
var authTokens = map[string]string{
"AUTH-TOKEN-1": "User 1",
"AUTH-TOKEN-2": "User 2",
}
// getAuthorizedUser tries to retrieve user for the given token.
func getAuthorizedUser(token string) (string, error) {
var err error
user, valid := authTokens[token]
if !valid {
err = fmt.Errorf("Auth token '%s' does not exist.", token)
}
return user, err
}
// isAuthorized checks request to ensure that it has Authorization header
// with defined value: "Bearer AUTH-TOKEN"
func isAuthorized(r *http.Request) bool {
rawToken := r.Header["Authorization"]
if len(rawToken) != 1 {
return false
}
authToken := strings.Split(rawToken[0], " ")
if !(len(authToken) == 2 && authToken[0] == "Bearer") {
return false
}
user, err := getAuthorizedUser(authToken[1])
if err != nil {
log.Printf("Error: %s", err)
return false
}
log.Printf("Successful request made by '%s'", user)
return true
}
var success = []byte("Received authorized request.")
var failure = []byte("Received unauthorized request.")
func requestHandler(w http.ResponseWriter, r *http.Request) {
if isAuthorized(r) {
w.Write(success)
} else {
w.WriteHeader(http.StatusUnauthorized)
w.Write(failure)
}
}
func main() {
http.HandleFunc("/", requestHandler)
fmt.Println("Starting server @ http://localhost:8080")
http.ListenAndServe(":8080", nil)
}
secure/secure_test.go
接下来,我们将尝试使用单元测试测试我们在secure.go
中编写的逻辑。一个好的做法是测试每个函数的所有可能的成功和失败情况。测试名称解释了测试的意图,所以让我们看看代码:
// secure/secure_test.go
package main
import (
"net/http"
"net/http/httptest"
"testing"
)
func TestIsAuthorizedSuccess(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Unable to create request")
}
req.Header["Authorization"] = []string{"Bearer AUTH-TOKEN-1"}
if isAuthorized(req) {
t.Log("Request with correct Auth token was correctly processed.")
} else {
t.Error("Request with correct Auth token failed.")
}
}
func TestIsAuthorizedFailTokenType(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Unable to create request")
}
req.Header["Authorization"] = []string{"Token AUTH-TOKEN-1"}
if isAuthorized(req) {
t.Error("Request with incorrect Auth token type was successfully processed.")
} else {
t.Log("Request with incorrect Auth token type failed as expected.")
}
}
func TestIsAuthorizedFailToken(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Unable to create request")
}
req.Header["Authorization"] = []string{"Token WRONG-AUTH-TOKEN"}
if isAuthorized(req) {
t.Error("Request with incorrect Auth token was successfully processed.")
} else {
t.Log("Request with incorrect Auth token failed as expected.")
}
}
func TestRequestHandlerFailToken(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Unable to create request")
}
req.Header["Authorization"] = []string{"Token WRONG-AUTH-TOKEN"}
// http.ResponseWriter it is an interface hence we use
// httptest.NewRecorder which implements the interface http.ResponseWriter
rr := httptest.NewRecorder()
requestHandler(rr, req)
if rr.Code == 401 {
t.Log("Request with incorrect Auth token failed as expected.")
} else {
t.Error("Request with incorrect Auth token was successfully processed.")
}
}
func TestGetAuthorizedUser(t *testing.T) {
if user, err := getAuthorizedUser("AUTH-TOKEN-2"); err != nil {
t.Errorf("Couldn't find User 2\. Error: %s", err)
} else if user != "User 2" {
t.Errorf("Found incorrect user: %s", user)
} else {
t.Log("Found User 2.")
}
}
func TestGetAuthorizedUserFail(t *testing.T) {
if user, err := getAuthorizedUser("WRONG-AUTH-TOKEN"); err == nil {
t.Errorf("Found user for invalid token!. User: %s", user)
} else if err.Error() != "Auth token 'WRONG-AUTH-TOKEN' does not exist." {
t.Errorf("Error message does not match.")
} else {
t.Log("Got expected error message for invalid auth token")
}
}
测试结果
最后,让我们运行测试,看看它们是否产生了预期的结果:
$ go test -v ./... === RUN TestIsAuthorizedSuccess 2018/02/19 00:08:06 Successful request made by 'User 1' --- PASS: TestIsAuthorizedSuccess (0.00s) secure_test.go:18: Request with correct Auth token was correctly processed. === RUN TestIsAuthorizedFailTokenType --- PASS: TestIsAuthorizedFailTokenType (0.00s) secure_test.go:35: Request with incorrect Auth token type failed as expected. === RUN TestIsAuthorizedFailToken --- PASS: TestIsAuthorizedFailToken (0.00s) secure_test.go:50: Request with incorrect Auth token failed as expected. === RUN TestRequestHandlerFailToken --- PASS: TestRequestHandlerFailToken (0.00s) secure_test.go:68: Request with incorrect Auth token failed as expected. === RUN TestGetAuthorizedUser --- PASS: TestGetAuthorizedUser (0.00s) secure_test.go:80: Found User 2\. === RUN TestGetAuthorizedUserFail --- PASS: TestGetAuthorizedUserFail (0.00s) secure_test.go:90: Got expected error message for invalid auth token PASS ok chapter8/secure 0.003s
总结
在本章中,我们首先尝试理解为什么需要运行多个 Goophr 图书管理员实例。接下来,我们看了如何实现更新的concierge/api/query.go
,以便它可以与多个图书管理员实例一起工作。然后,我们研究了使用docker-compose
编排应用程序可能是一个好主意的原因,以及使其工作的各种因素。我们还更新了图书管理员和礼宾代码库,以便它们可以与docker-compose
无缝工作。最后,我们使用一些小文档测试了完整的应用程序,并推理了预期结果的顺序。
我们能够使用docker-compose
在本地机器上编排运行完整的 Goophr 应用程序所需的所有服务器。然而,在互联网上设计一个能够承受大量用户流量的弹性 Web 应用程序的架构可能会非常具有挑战性。第九章,Web 规模架构的基础试图通过提供一些关于在 Web 设计时需要考虑的基本知识来解决这个问题。
第九章:Web 规模架构的基础
第五章,介绍 Goophr,第六章,Goophr Concierge,和第七章,Goophr Librarian,是关于从基本概念到运行各个组件并验证它们按预期工作的分布式搜索索引系统的设计和实现。在第八章,部署 Goophr,我们使用docker-compose将各个组件连接起来,以便我们可以以简单可靠的方式启动和连接所有组件。在过去的四章中,我们取得了相当大的进展,但你可能已经注意到我们在单台机器上运行了所有东西,很可能是我们的笔记本电脑或台式机。
理想情况下,我们应该尝试准备我们的分布式系统在大量用户负载下可靠工作,并将其暴露在 Web 上供一般使用。然而,现实情况是,我们将不得不对我们当前的系统进行大量升级,以使其足够可靠和有弹性,能够在真实世界的流量下工作。
在本章中,我们将讨论在尝试为 Web 设计时应该牢记的各种因素。我们将关注以下内容:
-
扩展 Web 应用程序
-
单体应用程序与微服务
-
部署选项
扩展 Web 应用程序
在本章中,我们将不讨论 Goophr,而是一个简单的用于博客的 Web 应用程序,以便我们可以专注于为 Web 扩展它。这样的应用程序可能包括运行数据库和博客服务器的单个服务器实例。
扩展 Web 应用程序是一个复杂的主题,我们将花费大量时间来讨论这个主题。正如我们将在本节中看到的,有多种方式可以扩展系统:
-
整体扩展系统
-
拆分系统并扩展各个组件
-
选择特定的解决方案以更好地扩展系统
让我们从最基本的设置开始,即单个服务器实例。
单个服务器实例
单服务器设置通常包括:
-
用于提供网页并处理服务器端逻辑的 Web 服务器
-
用于保存与博客相关的所有用户数据(博客文章、用户登录详细信息等)的数据库
以下图显示了这样一个服务器的外观:
该图显示了一个简单的设置,用户与博客服务器进行交互,博客服务器将在内部与数据库进行交互。这种在同一实例上设置数据库和博客服务器将仅在一定数量的用户上是高效和响应的。
当系统开始变慢或存储空间开始填满时,我们可以将我们的应用程序(数据库和博客服务器)重新部署到具有更多存储空间、RAM 和 CPU 功率的不同服务器实例上;这被称为垂直扩展。正如你可能怀疑的那样,这可能是耗时和不便的升级服务器的方式。如果我们能尽可能地推迟这次升级,那不是更好吗?
需要考虑的一个重要问题是,问题可能是由以下任何组合因素导致的:
-
由于数据库或博客服务器而导致内存不足
-
由于 Web 服务器或数据库需要更多 CPU 周期而导致性能下降
-
由于数据库的存储空间不足
为了解决上述任何因素,扩展完整应用程序并不是处理问题的最佳方式,因为我们在本可以用更少的资源解决问题的地方花费了很多钱!那么我们应该如何设计我们的系统,以便以正确的方式解决正确的问题呢?
为 Web 和数据库分层
如果我们考虑前面提到的三个问题,我们可以通过一两种方式解决每个问题。让我们首先看看它们:
问题#1:内存不足
解决方案:
-
由于数据库:为数据库增加 RAM
-
由于博客服务器:为博客服务器增加 RAM
问题#2:性能下降
解决方案:
-
由于数据库:增加数据库的 CPU 功率
-
由于博客服务器:增加博客服务器的 CPU 功率
问题#3:存储空间不足
解决方案:
- 由于数据库:增加数据库的存储空间
使用此列表,我们可以根据我们面临的特定问题随时升级我们的系统。然而,我们首先需要正确识别导致问题的组件。因此,即使在我们开始垂直扩展我们的应用程序之前,我们也应该像图中所示将我们的数据库与 Web 服务器分开。
具有数据库和博客服务器在单独的服务器实例上的新设置将使我们能够监视哪个组件存在问题,并且仅垂直扩展该特定组件。我们应该能够使用这种新设置为更大的用户流量提供服务。
然而,随着服务器负载的增加,我们可能会遇到其他问题。例如,如果我们的博客服务器变得无响应会发生什么?我们将无法继续提供博客文章,也没有人能够在博客文章上发表评论。这是没有人愿意面对的情况。如果我们能够在博客服务器宕机时继续提供流量,那不是很好吗?
多个服务器实例
使用单个服务器实例为我们的博客服务器或任何应用程序(业务逻辑)服务器提供大量用户流量是危险的,因为我们实质上正在创建一个单点故障。避免这种情况的最合乎逻辑和最简单的方法是复制我们的博客服务器实例以处理传入的用户流量。将单个服务器扩展到多个实例的这种方法称为横向扩展。然而,这带来了一个问题:我们如何可靠地在博客服务器的各个实例之间分发流量?为此,我们使用负载均衡器。
负载均衡器
负载均衡器是一种 HTTP 服务器,负责根据开发人员定义的规则将流量(路由)分发到各种 Web 服务器。总的来说,负载均衡器是一个非常快速和专业的应用程序。在 Web 服务器中尝试实现类似的逻辑可能不是最佳选择,因为您的 Web 服务器可用资源必须在处理业务逻辑的请求和需要路由的请求之间进行分配。此外,负载均衡器为我们提供了许多开箱即用的功能,例如:
-
负载均衡算法:以下是一些负载均衡的算法。
-
随机:在服务器之间随机分发。
-
轮询:在服务器之间均匀顺序地分发。
-
不对称负载:以一定比例在服务器之间分发。例如,对于 100 个请求,将 80 个发送到 A 服务器,20 个发送到 B 服务器。
-
最少连接:将新请求发送到具有最少活动连接数的服务器(不对称负载也可以与最少连接集成)。
-
会话持久性:想象一个电子商务网站,用户已将商品添加到购物车中,购物车中的商品信息存储在 A 服务器上。然而,当用户想要完成购买时,请求被发送到另一台服务器 B!这对用户来说是一个问题,因为与他的购物车相关的所有详细信息都在 A 服务器上。负载均衡器可以确保将这些请求重定向到相关的服务器。
-
HTTP 压缩:负载均衡器还可以使用
gzip
压缩传出响应,以便向用户发送更少的数据。这往往会极大地改善用户体验。 -
HTTP 缓存:对于提供 REST API 内容的站点,许多文件可以被缓存,因为它们不经常更改,并且缓存的内容可以更快地传递。
根据使用的负载均衡器,它们可以提供比上述列出的更多功能。这应该让人了解负载均衡器的能力。
以下图显示了负载均衡器和多个服务器如何一起工作:
用户的请求到达负载均衡器,然后将请求路由到博客服务器的多个实例之一。然而,请注意,即使现在我们仍然在使用相同的数据库进行读写操作。
多可用区域
在前一节中,我们谈到了单点故障以及为什么有多个应用服务器实例是一件好事。我们可以进一步扩展这个概念;如果我们所有的服务器都在一个位置,由于某种重大故障或故障,所有的服务器都宕机了怎么办?我们将无法为任何用户流量提供服务。
我们可以看到,将我们的服务器放在一个位置也会造成单点故障。解决这个问题的方法是在多个位置提供应用服务器实例。然后下一个问题是:我们如何决定部署服务器的位置?我们应该将服务器部署到单个国家内的多个位置,还是应该将它们部署到多个国家?我们可以使用云计算术语重新表达问题如下。
我们需要决定是否要将我们的服务器部署到多个区域或多个区域,或者两者兼而有之。
重要的一点要注意的是,部署到多个区域可能会导致网络延迟,我们可能希望先部署到多个地区。然而,在我们部署到多个地区和区域之前,我们需要确保两个事实:
-
我们的网站有大量流量,我们的单服务器设置已经无法处理
-
我们有相当多的用户来自另一个国家,将服务器部署在他们附近的区域可能是一个好主意
一旦我们考虑了这些因素并决定部署到额外的区域和区域,我们的博客系统整体可能看起来像这样:
数据库
我们一直在扩展应用程序/博客服务器,并看到了如何垂直和水平扩展服务器,以及如何为整个系统的高可用性和性能因素化多个区域和区域。
您可能已经注意到在所有先前的设计中,我们仍然依赖单个数据库实例。到现在为止,您可能已经意识到,任何服务/服务器的单个实例都可能成为单点故障,并可能使系统完全停滞。
棘手的部分是,我们不能像为应用服务器那样简单地运行多个数据库实例的策略。我们之所以能够为应用服务器使用这种策略,是因为应用服务器负责业务逻辑,它自身维护的状态很少是临时的,而所有重要的信息都被推送到数据库中,这构成了真相的唯一来源,也是讽刺的是,单点故障的唯一来源。在我们深入探讨数据库扩展的复杂性和随之而来的挑战之前,让我们首先看一下需要解决的一个重要主题。
SQL 与 NoSQL
对于初学者来说,数据库有两种类型:
-
关系型数据库:这些使用 SQL(略有变化)来查询数据库
-
NoSQL 数据库:这些可以存储非结构化数据并使用特定的数据库查询语言
关系数据库已经存在很长时间了,人们已经付出了大量的努力来优化它们的性能,并使它们尽可能健壮。然而,可靠性和性能要求我们计划和组织我们的数据到定义良好的表和关系中。我们的数据受限于数据库表的模式。每当我们需要向我们的表中添加更多字段/列时,我们将不得不将表迁移到新的模式,并且这将要求我们创建迁移脚本来处理添加新字段,并且还要提供条件和数据来填充已存在的表中的新创建字段。
NoSQL 数据库往往具有更自由的结构。我们不需要为我们的表定义模式,因为数据存储为单行/文档。我们可以将任何模式的数据插入单个表中,然后对其进行查询。鉴于数据不受模式规则的限制,我们可能会将错误或格式不正确的数据插入到我们的数据库中。这意味着我们将不得不确保我们检索到正确的数据,并且还必须采取预防措施,以确保不同模式的数据不会使程序崩溃。
我们应该使用哪种类型的数据库?
起初,人们可能会倾向于选择 NoSQL,因为这样我们就不需要担心构造我们的数据和连接查询。然而,重要的是要意识到,我们将不再以 SQL 形式编写这些查询,而是将所有数据检索到用户空间,即程序中,然后在程序中编写手动连接查询。
相反,如果我们依赖关系数据库,我们可以确保更小的存储空间,更高效的连接查询,以及具有定义良好模式的数据。所有关系数据库和一些 NoSQL 数据库都提供索引,这也有助于优化更快的搜索查询。然而,使用表和连接的关系数据库的一个主要缺点是,随着数据的增长,连接可能会变得更慢。到这个时候,您将清楚地知道您的数据的哪些部分可以利用 NoSQL 解决方案,并且您将开始在 SQL 和 NoSQL 系统的组合中维护您的数据。
简而言之,从关系数据库开始,一旦表中有大量数据且无法进行进一步的数据库调优,那么考虑将确实需要 NoSQL 数据存储的表移动过去。
数据库复制
既然我们已经确定了为什么选择使用关系数据库,让我们转向下一个问题:我们如何确保我们的数据库不会成为单点故障?
让我们首先考虑如果数据库失败会有什么后果:
-
我们无法向数据库中写入新数据
-
我们无法从数据库中读取
在这两种后果中,后者更为关键。考虑我们的博客应用,虽然能够写新的博客文章很重要,但我们网站上绝大多数的用户将是读者。这是大多数日常用户界面应用的常态。因此,我们应该尽量确保我们总是能够从数据库中读取数据,即使我们不再能够向其中写入新数据。
数据库复制和冗余性试图解决这些问题,通常解决方案作为数据库或插件的一部分包含在其中。在本节中,我们将讨论用于数据库复制的三种策略:
-
主-副本复制
-
主-主复制
-
故障转移集群复制
主-副本复制
这是最直接的复制方法。可以解释如下:
- 我们采用数据库集群:
数据库集群
- 将其中一个指定为主数据库,其余数据库为副本:
DB-3 被指定为主数据库
- 所有写入都是在主数据库上执行的:
主数据库上执行三次写入
- 所有读取都是从副本执行的:
从副本执行的读取
- 主数据库确保所有副本都具有最新状态,即主数据库的状态:
主数据库将所有副本更新为最新更新
- 主数据库故障仍允许从副本数据库读取,但不允许写入:
主数据库故障;只读取,不写入
主-主复制
您可能已经注意到主-副本设置存在两个问题:
-
主数据库被广泛用于数据库写入,因此处于持续压力之下
-
副本解决了读取的问题,但写入的单点故障仍然存在
主-主复制尝试通过使每个数据库成为主数据库来解决这些问题。可以解释如下:
- 我们采用数据库集群:
数据库集群
- 我们将每个数据库指定为主数据库:
所有数据库都被指定为主数据库
- 可以从任何主数据库执行读取:
在主数据库上执行读取
- 可以在任何主数据库上执行写入:
写入 DB-1 和 DB-3
- 每个主数据库都使用写入更新其他主数据库:
数据库状态在主数据库之间同步
- 因此,状态在所有数据库中保持一致:
DB-1 故障,成功读取和写入
这种策略似乎运行良好,但它有自己的局限性和挑战;主要的问题是解决写入之间的冲突。这里有一个简单的例子。
我们有两个主-主数据库DB-1和DB-2,并且两者都具有数据库系统的最新状态:
DB-1 和 DB-2 的最新状态
我们有两个同时进行的写操作,因此我们将“Bob”发送到DB-1,将“Alice”发送到DB-2.
将“Bob”写入 DB-1,将“Alice”写入 DB-2
现在,两个数据库都已将数据写入其表,它们需要使用自己的最新状态更新另一个主数据库:
DB 同步之前的状态
这将导致冲突,因为在两个表中,ID# 3分别填充了DB-1的Bob和DB-2的Alice:
在更新 DB-1 和 DB-2 状态时发生冲突,因为 ID# 3 已经被填充。
实际上,主-主策略将具有内置机制来处理这类问题,但它们可能会导致性能损失或其他挑战。这是一个复杂的主题,我们必须决定在使用主-主复制时值得做出哪些权衡。
故障转移集群复制
主-副本复制允许我们在潜在风险的情况下对读取和写入进行简单设置,无法写入主数据库。主-主复制允许我们在其中一个主数据库故障时能够读取和写入数据库。然而,要在所有主数据库之间保持一致状态的复杂性和可能的性能损失可能意味着它并不是在所有情况下的理想选择。
故障转移集群复制试图采取中间立场,提供两种复制策略的功能。可以解释如下:
-
我们采用数据库集群。
-
根据使用的主选择策略,将数据库分配为主数据库,这可能因数据库而异。
-
其余数据库被分配为副本。
-
主服务器负责将副本更新为数据库的最新状态。
-
如果主服务器因某种原因失败,将选择将剩余的数据库之一指定为新的主数据库。
那么我们应该使用哪种复制策略?最好从最简单的开始,也就是主-副本策略,因为这将非常轻松地满足大部分最初的需求。现在让我们看看如果我们使用主-副本策略进行数据库复制,我们的应用程序会是什么样子:
具有主-副本数据库设置的应用程序
单体架构与微服务
大多数新项目最初都是单一的代码库,所有组件通过直接函数调用相互交互。然而,随着用户流量和代码库的增加,我们将开始面临代码库的问题。以下是可能的原因:
-
您的代码库正在不断增长,这意味着任何新开发人员理解完整系统将需要更长的时间。
-
添加新功能将需要更长时间,因为我们必须确保更改不会破坏任何其他组件。
-
由于以下原因,为每个新功能重新部署代码可能会变得繁琐:
-
部署失败和/或
-
重新部署的组件出现了意外的错误,导致程序崩溃和/或
-
由于测试数量较多,构建过程可能需要更长时间
-
将完整应用程序扩展以支持 CPU 密集型组件
微服务通过将应用程序的主要组件拆分为单独的较小的应用程序/服务来解决这个问题。这是否意味着我们应该从一开始就将我们的应用程序拆分成微服务,以便我们不会面临这个问题?这是一种可能的处理方式。然而,这种方法也有一定的缺点:
-
移动部件过多:将每个组件分成自己的服务意味着我们必须监视和维护每个组件的服务器。
-
增加的复杂性:微服务增加了失败的可能原因。单体架构中的故障可能仅限于服务器宕机或代码执行问题。然而,对于微服务,我们必须:
-
识别哪个组件的服务器宕机或
-
如果一个组件失败,识别失败的组件,然后进一步调查失败是否是由于:
-
故障代码或
-
由于一个依赖组件的失败
-
整个系统更难调试:前面描述的增加的复杂性使得调试完整系统变得更加困难。
既然我们已经看到了微服务和单体架构的一些优缺点,哪一个更好呢?答案现在应该是相当明显的:
-
小到中等规模的代码库受益于单体架构提供的简单性
-
大型代码库受益于微服务架构提供的细粒度控制
这意味着我们应该设计我们的单体代码库,预期它最终可能会增长到非常庞大的规模,然后我们将不得不将其重构为微服务。为了尽可能轻松地将代码库重构为微服务,我们应该尽早确定可能的组件,并使用中介者设计模式实现它们与代码的其他部分之间的交互。
中介者设计模式
中介者充当代码中各个组件之间的中间人,这导致各个组件之间的耦合非常松散。这使我们可以对代码进行最小的更改,因为我们只需要更改中介者与被提取为自己的微服务的组件之间的交互。
让我们举个例子。我们有一个由 Codebase A 定义的单体应用。它由五个组件组成——Component 1 到 Component 5。我们意识到 Component 1 和 Component 2 依赖于与 Component 5 交互,而 Component 2 和 Component 3 依赖于 Component 4。如果 Component 1 和 Component 2 直接调用 Component 5,同样 Component 2 和 Component 4 直接调用 Component 4,那么我们将创建紧密耦合的组件。
如果我们引入一个函数,该函数从调用组件接收输入并调用必要的组件作为代理,并且所有数据都使用明确定义的结构传递,那么我们就引入了中介者设计模式。这可以在下图中看到:
通过中介者连接的代码库中的组件
现在,如果出现需要将其中一个组件分离成自己独立的微服务的情况,我们只需要改变代理函数的实现。在我们的例子中,Component 5
被分离成了自己独立的微服务,并且我们已经改变了代理函数 mediator 1 的实现,以使用 HTTP 和 JSON 与 Component 5 进行通信,而不是通过函数调用和结构体进行通信。如下图所示:
组件分离成微服务和中介者实现的更改
部署选项
我们已经研究了各种扩展应用程序的策略、不同类型的数据库、如何构建我们的代码,最后是如何使用中介者模式来实现从单体应用到微服务的过渡。然而,我们还没有讨论我们将在哪里部署所述的 Web 应用程序和数据库。让我们简要地看一下部署的情况。
直到 2000 年代初,大多数服务器都部署在由编写软件的公司拥有的硬件上。会有专门的基础设施和团队来处理这个软件工程的关键部分。这在很大程度上是数据中心的主题。
然而,在 2000 年代,公司开始意识到数据中心可以被抽象化,因为大多数开发人员对处理这些问题并不感兴趣。这使得软件的开发和部署变得更加便宜和快速,特别是对于 Web 应用程序。现在,开发人员不再购买数据中心的硬件和空间,而是可以通过 SSH 访问服务器实例。在这方面最著名的公司之一是亚马逊公司。这使他们的业务扩展到了电子商务之外。
这些服务也引发了一个问题:开发人员是否需要安装和维护诸如数据库、负载均衡器或其他类似服务的通用应用程序?事实是,并非所有开发人员或公司都希望参与维护这些服务。这导致了对现成应用实例的需求,这些实例将由销售这些应用作为服务的公司进行维护。
有许多最初作为软件公司开始并维护自己数据中心的公司——例如亚马逊、谷歌和微软等等——他们现在为一般消费者提供了一系列这样的服务。
多个实例的可维护性
提到的服务的可用性显著改善了我们的生活,但在维护跨多个服务器实例运行的大量应用程序时涉及了许多复杂性。例如:
-
如何更新服务器实例而不使整个服务停机?这可以用更少的工作量完成吗?
-
有没有一种可靠的方法可以轻松地扩展我们的应用程序(纵向和横向)?
考虑到所有现代部署都使用容器,我们可以利用容器编排软件来帮助解决可维护性问题。Kubernetes(kubernetes.io/
)和 Mesos(mesos.apache.org/
)是两种解决方案的例子。
总结
在本章中,我们以一个简单的博客应用为例,展示了如何扩展以满足不断增长的用户流量的需求。我们还研究了扩展数据库涉及的复杂性和策略。
然后,我们简要介绍了如何设计我们的代码库以及我们可能需要考虑的权衡。最后,我们看了一种将代码库从单体架构轻松重构为微服务的方法。
标签:http,--,分布式计算,go,api,Go,main,我们 From: https://www.cnblogs.com/apachecn/p/18172882