首页 > 其他分享 >折腾 Quickwit,Rust 编写的分布式搜索引擎 - 从不同的来源摄取数据

折腾 Quickwit,Rust 编写的分布式搜索引擎 - 从不同的来源摄取数据

时间:2024-09-02 12:53:53浏览次数:12  
标签:index name quickwit -- Quickwit source config Rust 分布式

折腾 Quickwit,Rust 编写的分布式搜索引擎 - 从不同的来源摄取数据_hive

摄取 API

在这节教程中,我们将介绍如何使用 Ingest API 向 Quickwit 发送数据。

要跟随这节教程,您需要有一个本地的 Quickwit 实例正在运行。

要启动它,请在终端中运行 ./quickwit run

创建索引

首先,我们创建一个无模式的索引。

# Create the index config file.
cat << EOF > stackoverflow-schemaless-config.yaml
version: 0.7
index_id: stackoverflow-schemaless
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF
# Use the CLI to create the index...
./quickwit index create --index-config stackoverflow-schemaless-config.yaml
# Or with cURL.
curl -XPOST -H 'Content-Type: application/yaml' 'http://localhost:7280/api/v1/indexes' --data-binary @stackoverflow-schemaless-config.yaml

摄取数据

让我们先下载 StackOverflow 数据集的一个样本。

# Download the first 10_000 Stackoverflow posts articles.
curl -O https://quickwit-datasets-public.s3.amazonaws.com/stackoverflow.posts.transformed-10000.json

您可以使用命令行界面或 cURL 来发送数据。命令行界面对于发送几 GB 的数据更为方便,因为当 Ingest 队列已满时,Quickwit 可能会返回 429 响应。在这种情况下,Quickwit 命令行界面将自动重试发送。

# Ingest the first 10_000 Stackoverflow posts articles with the CLI...
./quickwit index ingest --index stackoverflow-schemaless --input-path stackoverflow.posts.transformed-10000.json --force

# OR with cURL.
curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/stackoverflow-schemaless/ingest?commit=force' --data-binary @stackoverflow.posts.transformed-10000.json

执行搜索查询

现在您可以对索引进行搜索了。

curl 'http://localhost:7280/api/v1/stackoverflow-schemaless/search?query=body:python'

清理源(可选)

curl -XDELETE 'http://localhost:7280/api/v1/indexes/stackoverflow-schemaless'

至此完成了教程。现在您可以继续阅读下一教程。

本地文件

在这节教程中,我们将介绍如何使用 Quickwit 命令行界面来索引本地文件。

要跟随这节教程,您需要有Quickwit 二进制文件

创建索引

首先,我们创建一个无模式的索引。我们需要仅为了创建索引而启动 Quickwit 服务器,因此我们将启动它并在之后关闭它。

启动 Quickwit server。

./quickwit run

在另一个终端中创建索引。

# Create the index config file.
cat << EOF > stackoverflow-schemaless-config.yaml
version: 0.7
index_id: stackoverflow-schemaless
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF

./quickwit index create --index-config stackoverflow-schemaless-config.yaml

现在您可以通过在第一个终端中按下 Ctrl+C 来关闭服务器。

摄取文件

要发送文件,只需执行以下命令:

./quickwit tool local-ingest --index stackoverflow-schemaless --input-path stackoverflow.posts.transformed-10000.json

几秒钟后,您应该能看到以下输出:

❯ Ingesting documents locally...

---------------------------------------------------
 Connectivity checklist
 ✔ metastore
 ✔ storage
 ✔ _ingest-cli-source

 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  3.34MB/s Time 00:00:02
 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  2.23MB/s Time 00:00:03
 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  1.67MB/s Time 00:00:04

Indexed 10,000 documents in 4s.
Now, you can query the index with the following command:
quickwit index search --index stackoverflow-schemaless --config ./config/quickwit.yaml --query "my query"
Clearing local cache directory...
✔ Local cache directory cleared.
✔ Documents successfully indexed.

支持像 s3://mybucket/mykey.json 这样的对象存储 URI 作为 --input-path,前提是您的环境配置了适当的权限。

清理源(可选)

就这样!现在您可以清理创建的源。您可以通过运行以下命令来完成:

./quickwit run

在另一个终端中:

./quickwit index delete --index-id stackoverflow-schemaless

至此完成了教程。现在您可以继续阅读下一教程。

Kafka

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kafka 摄取数据。首先,我们将创建一个索引并配置 Kafka 源。然后,我们将创建一个 Kafka 主题并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

前提条件

要完成这篇教程,您需要以下条件:

  • 正在运行的 Kafka 集群(参见 Kafka 快速入门
  • 本地安装的 Quickwit 指南

创建索引

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
  field_mappings:
    - name: id
      type: text
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: public
      type: bool
      fast: true
    - name: payload
      type: json
      tokenizer: default
    - name: org
      type: json
      tokenizer: default
    - name: repo
      type: json
      tokenizer: default
    - name: actor
      type: json
      tokenizer: default
    - name: other
      type: json
      tokenizer: default
    - name: created_at
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: created_at

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引:

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

创建并填充 Kafka topic

现在,我们创建一个 Kafka 主题并将一些事件加载到其中。

# Create a topic named `gh-archive` with 3 partitions.
bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz

# Load the events into Kafka topic.
gunzip -c 2022-05-12*.json.gz | \
bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092

创建 Kafka 源

这篇教程假设 Kafka 集群在默认端口(9092)上本地可用。
如果情况并非如此,请相应地更新 bootstrap.servers 参数。

#
# Kafka source config file.
#
version: 0.8
source_id: kafka-source
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: localhost:9092

运行这些命令来下载源配置文件并创建源。

# Download Kafka source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kafka-source.yaml

如果您遇到以下错误:

Command failed: Topic `gh-archive` has no partitions.

这意味着 Kafka 主题 gh-archive 在前一步骤中未正确创建。

启动索引和搜索服务

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kafka 主题,并开始从组成主题的分区流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 Quickwit 用户界面 访问这些结果。

  • http://localhost:7280/ui/search?query=org.login%3Akubernetes+AND+repo.name%3Akubernetes&index_id=gh-archive&max_hits=10

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
  "query":"org.login:kubernetes AND repo.name:kubernetes",
  "max_hits":0,
  "aggs":{
    "count_by_event_type":{
      "terms":{
        "field":"type"
      }
    }
  }
}'

安全的 Kafka 连接(可选)

Quickwit 的 Kafka 源支持 SSL 和 SASL 身份验证。这对于从外部 Kafka 服务消费数据特别有用。

证书和密钥文件必须存在于所有 Quickwit 节点上,以便创建 Kafka 源并成功运行索引管道。

SSL 配置

version: 0.8
source_id: kafka-source-ssl
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: your-kafka-broker.com
    security.protocol: SSL
    ssl.ca.location: /path/to/ca.pem
    ssl.certificate.location: /path/to/service.cert
    ssl.key.location: /path/to/service.key

SASL 配置

version: 0.8
source_id: kafka-source-sasl
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: your-kafka-broker.com
    ssl.ca.location: /path/to/ca.pem
    security.protocol: SASL_SSL
    sasl.mechanisms: SCRAM-SHA-256
    sasl.username: your_sasl_username
    sasl.password: your_sasl_password

如果您遇到以下错误:

Client creation error: ssl.ca.location failed: error:05880002:x509 certificate routines::system lib

通常意味着 CA 证书的路径不正确。请相应地更新 ssl.ca.location 参数。

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete Kafka topic.
bin/kafka-topics.sh --delete --topic gh-archive --bootstrap-server localhost:9092

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kafka-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

Pulsar

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Pulsar 摄取数据。首先,我们将创建一个索引并配置 Pulsar 源。然后,我们将创建一个 Pulsar topic 并将一些事件从 Stack Overflow 数据集 加载到其中。最后,我们将执行一些搜索。

前提条件

要完成这篇教程,您需要以下条件:

Quickwit 设置

下载 Quickwit 并启动一个服务器。然后打开一个新的终端,使用同一个二进制文件执行 CLI 命令。

./quickwit run

测试集群是否正在运行:

./quickwit index list

Pulsar 设置

Local

wget https://archive.apache.org/dist/pulsar/pulsar-2.11.0/apache-pulsar-2.11.0-bin.tar.gz
tar xvfz apache-pulsar-2.11.0-bin.tar.gz
cd apache-pulsar-2.11.0
bin/pulsar standalone

Docker

docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.11.0 bin/pulsar standalone

请参阅 官方文档 的详细信息。

准备 Quickwit

首先,我们创建一个新的索引。以下是与 Stack Overflow 帖子模式对应的索引配置和文档映射:

#
# Index config file for Stack Overflow dataset.
#
version: 0.7

index_id: stackoverflow

doc_mapping:
  field_mappings:
    - name: user
      type: text
      fast: true
      tokenizer: raw
    - name: tags
      type: array<text>
      fast: true
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: title
      type: text
      tokenizer: default
      record: position
      stored: true
    - name: body
      type: text
      tokenizer: default
      record: position
      stored: true
    - name: questionId
      type: u64
    - name: answerId
      type: u64
    - name: acceptedAnswerId
      type: u64
    - name: creationDate
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: creationDate

search_settings:
  default_search_fields: [title, body]

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 stackoverflow 索引。

# Download stackoverflow index config.
wget -O stackoverflow.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/index-config.yaml

# Create index.
./quickwit index create --index-config stackoverflow.yaml

创建 Pulsar 源

Pulsar 源只需要定义主题列表和实例地址。

#
# Pulsar source config file.
#
version: 0.7
source_id: pulsar-source
source_type: pulsar
params:
  topics:
    - stackoverflow
  address: pulsar://localhost:6650

运行这些命令来下载源配置文件并创建源。

# Download Pulsar source config.
wget -O stackoverflow-pulsar-source.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/pulsar-source.yaml

# Create source.
./quickwit source create --index stackoverflow --source-config stackoverflow-pulsar-source.yaml

一旦创建了 Pulsar 源,Quickwit 控制平面将请求索引器启动一个新的索引管道。您可以在索引器上看到类似下面的日志:

INFO spawn_pipeline{index=stackoverflow gen=0}:pulsar-consumer{subscription_name="quickwit-stackoverflow-pulsar-source" params=PulsarSourceParams { topics: ["stackoverflow"], address: "pulsar://localhost:6650", consumer_name: "quickwit", authentication: None } current_positions={}}: quickwit_indexing::source::pulsar_source: Seeking to last checkpoint positions. positions={}

创建并填充 Pulsar topic

我们将使用 Pulsar 的默认租户/命名空间 public/default。为了填充主题,我们将使用一个 Python 脚本:

import json
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('public/default/stackoverflow')

with open('stackoverflow.posts.transformed-10000.json', encoding='utf8') as file:
   for i, line in enumerate(file):
       producer.send(line.encode('utf-8'))
       if i % 100 == 0:
           print(f"{i}/10000 messages sent.", i)

client.close()

安装本地 Python 客户端,更多详情请参阅 文档页面

# Download the first 10_000 Stackoverflow posts articles.
curl -O https://quickwit-datasets-public.s3.amazonaws.com/stackoverflow.posts.transformed-10000.json

# Install pulsar python client.
# Requires a python version < 3.11
pip3 install 'pulsar-client==2.10.1'
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/send_messages_to_pulsar.py
python3 send_messages_to_pulsar.py

开始搜索!

您可以运行此命令来检查索引的属性并查看当前发布的分片和文档数量:

# Display some general information about the index.
./quickwit index describe --index stackoverflow

您将特别注意到发布的文档数量。

现在您可以执行一些查询了。

curl 'http://localhost:7280/api/v1/stackoverflow/search?query=search+AND+engine'

如果您的 Quickwit 服务器是本地的,您可以通过 Quickwit UI 访问结果,网址为 localhost:7280。

  • http://localhost:7280/ui/search?query=&index_id=stackoverflow&max_hits=10

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete quickwit index.
./quickwit index delete --index stackoverflow --yes
# Delete Pulsar topic.
bin/pulsar-admin topics delete stackoverflow

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

Kinesis

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kinesis 摄取数据。首先,我们将创建一个索引并配置 Kinesis 源。然后,我们将创建一个 Kinesis 流并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

在这篇教程中使用 Amazon Kinesis 服务会产生一些费用。

前提条件

要完成这篇教程,您需要以下条件:

  • 本地安装的 Quickwit 指南

jq 用于重塑事件成为可通过 Amazon Kinesis API 发送的记录。

创建索引

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
  field_mappings:
    - name: id
      type: text
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: public
      type: bool
      fast: true
    - name: payload
      type: json
      tokenizer: default
    - name: org
      type: json
      tokenizer: default
    - name: repo
      type: json
      tokenizer: default
    - name: actor
      type: json
      tokenizer: default
    - name: other
      type: json
      tokenizer: default
    - name: created_at
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: created_at

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引。

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

创建并填充 Kinesis 流

现在,我们创建一个 Kinesis 流并将一些事件加载到其中。

这一步可能相当慢,具体取决于可用带宽。当前命令通过仅取从 GH Archive 下载的每个文件的前 10,000 行来限制要发送的数据量。如果您有足够的带宽,可以移除它来发送整套文件。您也可以通过增加分片的数量和/或 parallel 启动的任务数量 (-j 选项) 来加快速度。

# Create a stream named `gh-archive` with 3 shards.
aws kinesis create-stream --stream-name gh-archive --shard-count 8

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..12}.json.gz

# Load the events into Kinesis stream
gunzip -c 2022-05-12*.json.gz | \
head -n 10000 | \
parallel --gnu -j8 -N 500 --pipe \
'jq --slurp -c "{\"Records\": [.[] | {\"Data\": (. | tostring), \"PartitionKey\": .id }], \"StreamName\": \"gh-archive\"}" > records-{%}.json && \
aws kinesis put-records --cli-input-json file://records-{%}.json --cli-binary-format raw-in-base64-out >> out.log'

创建 Kinesis 源

#
# Kinesis source config file.
#
version: 0.7
source_id: kinesis-source
source_type: kinesis
params:
  stream_name: gh-archive

运行这些命令来下载源配置文件并创建源。

# Download Kinesis source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kinesis-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kinesis-source.yaml

如果此命令出现以下错误消息而失败:

Command failed: Stream gh-archive under account XXXXXXXXX not found.

Caused by:
    0: Stream gh-archive under account XXXXXXXX not found.
    1: Stream gh-archive under account XXXXXXXX not found.

这意味着 Kinesis 流在前一步骤中未正确创建。

启动索引和搜索服务

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kinesis 流,并开始从组成流的分片流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

也可以通过 Quickwit 用户界面 获取索引信息。

  • http://localhost:7280/ui/indexes/gh-archive

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 用户界面 访问这些结果。

  • http://localhost:7280/ui/search?query=org.login%3Akubernetes+AND+repo.name%3Akubernetes&index_id=gh-archive&max_hits=10

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
  "query":"org.login:kubernetes AND repo.name:kubernetes",
  "max_hits":0,
  "aggs":{
    "count_by_event_type":{
      "terms":{
        "field":"type"
      }
    }
  }
}'

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete Kinesis stream.
aws kinesis delete-stream --stream-name gh-archive

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kinesis-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

具有 SQS 通知的 S3

在这篇教程中,我们介绍如何设置 Quickwit 以从 S3 摄取数据,其中桶通知事件通过 SQS 流式传输。我们首先使用 Terraform 创建 AWS 源(S3 桶、SQS 队列、通知)。然后配置 Quickwit 索引和文件源。最后,我们将一些数据发送到源桶并验证其是否被正确索引。

AWS 源

完整的 Terraform 脚本可以从 这里 下载。

首先,创建接收源数据文件(NDJSON 格式)的桶:

resource "aws_s3_bucket" "file_source" {
  bucket_prefix = "qw-tuto-source-bucket"
}

然后设置 SQS 队列,当文件添加到桶时,队列将承载通知。队列配置了一个策略,允许源桶向其写入 S3 通知消息。同时创建一个死信队列 (DLQ),用于接收文件源无法处理的消息(例如损坏的文件)。消息在经过 5 次索引尝试后会被移动到 DLQ。

locals {
  sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
}

data "aws_iam_policy_document" "sqs_notification" {
  statement {
    effect = "Allow"

    principals {
      type        = "*"
      identifiers = ["*"]
    }

    actions   = ["sqs:SendMessage"]
    resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"
      values   = [aws_s3_bucket.file_source.arn]
    }
  }
}

resource "aws_sqs_queue" "s3_events_deadletter" {
  name = "${locals.sqs_notification_queue_name}-deadletter"
}

resource "aws_sqs_queue" "s3_events" {
  name   = local.sqs_notification_queue_name
  policy = data.aws_iam_policy_document.sqs_notification.json

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
    maxReceiveCount     = 5
  })
}

resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
  queue_url = aws_sqs_queue.s3_events_deadletter.id

  redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue",
    sourceQueueArns   = [aws_sqs_queue.s3_events.arn]
  })
}

配置桶通知,每当源桶中创建新文件时,都会向 SQS 写入消息:

resource "aws_s3_bucket_notification" "bucket_notification" {
  bucket = aws_s3_bucket.file_source.id

  queue {
    queue_arn = aws_sqs_queue.s3_events.arn
    events    = ["s3:ObjectCreated:*"]
  }
}

只支持 s3:ObjectCreated:* 类型的事件。
其他类型(例如 ObjectRemoved)会被确认,并且会记录警告日志。

源需要能够访问通知队列和源桶。以下策略文档包含了源所需的最小权限:

data "aws_iam_policy_document" "quickwit_node" {
  statement {
    effect = "Allow"
    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:ChangeMessageVisibility",
      "sqs:GetQueueAttributes",
    ]
    resources = [aws_sqs_queue.s3_events.arn]
  }
  statement {
    effect    = "Allow"
    actions   = ["s3:GetObject"]
    resources = ["${aws_s3_bucket.file_source.arn}/*"]
  }
}

创建 IAM 用户和凭证,以便将其与本地 Quickwit 实例关联:

resource "aws_iam_user" "quickwit_node" {
  name = "quickwit-filesource-tutorial"
  path = "/system/"
}

resource "aws_iam_user_policy" "quickwit_node" {
  name   = "quickwit-filesource-tutorial"
  user   = aws_iam_user.quickwit_node.name
  policy = data.aws_iam_policy_document.quickwit_node.json
}

resource "aws_iam_access_key" "quickwit_node" {
  user = aws_iam_user.quickwit_node.name
}

我们不建议在生产环境中使用 IAM 用户凭证运行 Quickwit 节点。
这只是为了简化教程设置。在 EC2/ECS 上运行时,应将策略文档附加到 IAM 角色上。

下载 完整的 Terraform 脚本,并使用 terraform initterraform apply 部署它。成功执行后,将列出配置 Quickwit 所需的输出。您可以使用以下命令显示敏感输出(密钥 ID 和密钥)的值:

terraform output quickwit_node_access_key_id
terraform output quickwit_node_secret_access_key

运行 Quickwit

本地安装 Quickwit,然后在安装目录中,使用必要的访问权限运行 Quickwit,将 <quickwit_node_access_key_id><quickwit_node_secret_access_key> 替换为匹配的 Terraform 输出值:

AWS_ACCESS_KEY_ID=<quickwit_node_access_key_id> \
AWS_SECRET_ACCESS_KEY=<quickwit_node_secret_access_key> \
AWS_REGION=us-east-1 \
./quickwit run

配置索引和源

在另一个终端中,在 Quickwit 安装目录中,创建一个索引:

cat << EOF > tutorial-sqs-file-index.yaml
version: 0.7
index_id: tutorial-sqs-file
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF

./quickwit index create --index-config tutorial-sqs-file-index.yaml

<notification_queue_url> 替换为相应的 Terraform 输出值,为该索引创建一个文件源:

cat << EOF > tutorial-sqs-file-source.yaml
version: 0.8
source_id: sqs-filesource
source_type: file
num_pipelines: 2
params:
  notifications:
    - type: sqs
      queue_url: <notification_queue_url>
      message_type: s3_notification
EOF

./quickwit source create --index tutorial-sqs-file --source-config tutorial-sqs-file-source.yaml

num_pipeline 配置控制了多少个消费者将并行地从队列中轮询。根据您想要为此源分配的索引器计算资源选择数字。一般而言,每 2 个核心配置 1 个管道。

摄取数据

我们现在可以通过将文件上传到 S3 来向 Quickwit 发送数据。如果您已安装 AWS CLI,运行以下命令,将 <source_bucket_name> 替换为关联的 Terraform 输出:

curl https://quickwit-datasets-public.s3.amazonaws.com/hdfs-logs-multitenants-10000.json | \
    aws s3 cp - s3://<source_bucket_name>/hdfs-logs-multitenants-10000.json

如果您不想使用 AWS CLI,您也可以下载文件并通过 AWS 控制台手动将其上传到源桶。

等待大约 1 分钟,数据应该会出现在索引中:

./quickwit index describe --index tutorial-sqs-file

清理源

这节教程中实例化的 AWS 源不会产生固定成本,但我们仍然建议您完成后删除它们。在包含 Terraform 脚本的目录中,运行 terraform destroy

更多

1. Binance 如何使用 Quickwit 构建 100PB 日志服务(Quickwit 博客)



标签:index,name,quickwit,--,Quickwit,source,config,Rust,分布式
From: https://blog.51cto.com/u_15168528/11896869

相关文章

  • 折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之日志管理
    Quickwit从底层构建,旨在高效地索引非结构化数据,并在云存储上轻松搜索这些数据。此外,Quickwit开箱即支持OpenTelemetrygRPC和HTTP(仅protobuf)协议,并提供了一个RESTAPI,可以接收任何JSON格式的日志。这让Quickwit成为了日志的理想选择!.https://quickwit.io/docs/guides/......
  • 折腾 Quickwit,Rust 编写的分布式搜索引擎 - 可观测性之分布式追踪
    概述分布式追踪是一种跟踪应用程序请求流经不同服务(如前端、后端、数据库等)的过程。它是一个强大的工具,可以帮助您了解应用程序的工作原理并调试性能问题。Quickwit是一个用于索引和搜索非结构化数据的云原生引擎,这使其非常适合用作追踪数据的后端。此外,Quickwit本地支持OpenTel......
  • 文章标题:Java中的分布式缓存策略:从原理到实现的深度解析
    在现代分布式系统中,缓存是提高系统性能和响应速度的关键组件之一。尤其是在Java开发中,分布式缓存不仅可以大幅降低数据库的负载,还能显著提高数据访问的速度。本篇博客将详细解析Java中的分布式缓存策略,从基本原理到实际实现,带你全面了解分布式缓存的奥秘。一、为什么需要分布......
  • 分布式概念及选举算法
    概念  由很多自主的计算机组成。很容易地把运行在不同计算机上的不同应用程序集成到单个系统中。清晰的记录接口。轻松的扩展。分布式类型:分布式计算系统、分布式信息系统(数据处理)互斥    集中式算法      每个程序在需要访问临界资源时,先给协调......
  • 探索Java中的分布式任务调度:从理论到实践
    引言在现代企业级应用中,定时任务调度是一项至关重要的功能。无论是数据备份、日志清理还是批处理任务,都离不开任务调度系统。随着系统的规模和复杂度的增加,传统的单机任务调度已经无法满足需求。因此,分布式任务调度应运而生。本篇博文将详细介绍Java中的分布式任务调度,从基本......
  • 又一个Rust练手项目-wssh(SSH over Websocket Client)
    原文地址https://blog.fanscore.cn/a/61/1.wssh1.1开发背景公司内部的发布系统提供一个连接到k8spod的web终端,可以在网页中连接到k8spod内。实现原理大概为通过websocket协议代理了k8spodssh,然后在前端通过xterm.js+websocket实现了web终端的效果。但是每次需要进pod内......
  • 【学习】VirusTotal威胁研究发布详细指南
    https://mp.weixin.qq.com/s/psge60vK8t3gYj1PatDLvw......
  • Git 基本工作(最先进的分布式版本控制系统)
    1、Git,首先,在https://github.com注册一个新用户。2、下载安装Git,http://git-scm.com/downloads3、或者用淘宝镜像:https://registry.npmmirror.com/binary.html?path=git-for-windows4、Git配置:打开gitbash命令窗口,执行:其中""的内容为个人内容gitconfig--globaluser.......
  • Java分布式系统设计:CAP定理与BASE理论
    Java分布式系统设计:CAP定理与BASE理论大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在构建分布式系统时,CAP定理和BASE理论是两个核心概念,它们帮助开发者理解分布式系统的特性和设计原则。本文将深入探讨这两个理论,并提供在Java中实现分布式系统时的......
  • 分布式缓存
    高并发环境下缓存的重要性在高并发环境下,例如淘宝双11秒杀活动,几分钟内上亿用户涌入平台,短时间内产生的海量请求如果直接涌向数据库,将会对数据库产生巨大的压力。由于磁盘I/O的速度远低于内存访问速度,如果不加以控制,数据库将不堪重负,进而导致服务中断。为了避免这种情况,通......