首页 > 其他分享 >BookKeeper 介绍(3)--API

BookKeeper 介绍(3)--API

时间:2024-06-23 10:20:24浏览次数:20  
标签:-- bookKeeper public 2181 API configuration BookKeeper 10.49

本文主要介绍 BookKeeper 的 API,文中所使用到的软件版本:Java 1.8.0_341、BookKeeper 4.16.5。

 1、引入依赖

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server</artifactId>
    <version>4.16.5</version>
</dependency>

2、Ledger API

2.1、传统 API

2.1.1、创建客户端

public void createBookKeeper() throws Exception {
    ClientConfiguration configuration = new ClientConfiguration();
    configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机
    //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群
    configuration.setAddEntryTimeout(2000);
    bookKeeper = new BookKeeper(configuration);
}

2.1.2、创建 ledger 并添加 entry

//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());
LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes());
log.info("ledgerId={}", ledgerHandle.getId());
for (int i = 0; i < 10; i++) {
    long id = ledgerHandle.addEntry(("abc" + i).getBytes());
    log.info("id={}", id);
}

ledgerHandle.close();

创建 ledger 时可以指定存储该 ledger 的节点个数、副本个数、几个副本 ack 表示写入成功。

2.1.3、从 ledger 中读取 entry

public void readLedger() throws Exception {
    LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());
    Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());
    while (ledgerEntrys.hasMoreElements()) {
        LedgerEntry ledgerEntry = ledgerEntrys.nextElement();
        log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));
    }

    ledgerHandle.close();
}

2.1.4、删除 ledger

public void deleteLedger() throws Exception {
    bookKeeper.deleteLedger(0L);
}

2.1.5、完整代码

package com.abc.demo.bk;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Enumeration;


@Slf4j
public class LedgerCase {
    private BookKeeper bookKeeper;

    @Before
    public void createBookKeeper() throws Exception {
        ClientConfiguration configuration = new ClientConfiguration();
        //configuration.setZkServers("10.49.196.33:2181");
        configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");
        //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");
        configuration.setAddEntryTimeout(2000);
        bookKeeper = new BookKeeper(configuration);
    }

    @After
    public void closeBookKeeper() throws Exception {
        bookKeeper.close();
    }

    @Test
    public void createLedger() throws Exception {
        //LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());
        LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes());
        log.info("ledgerId={}", ledgerHandle.getId());
        for (int i = 0; i < 10; i++) {
            long id = ledgerHandle.addEntry(("abc" + i).getBytes());
            log.info("id={}", id);
        }

        ledgerHandle.close();
    }

    @Test
    public void readLedger() throws Exception {
        LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());
        Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());
        while (ledgerEntrys.hasMoreElements()) {
            LedgerEntry ledgerEntry = ledgerEntrys.nextElement();
            log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));
        }

        ledgerHandle.close();
    }

    @Test
    public void deleteLedger() throws Exception {
        bookKeeper.deleteLedger(0L);
    }
}
LedgerCase.java

2.2、新 API

自 4.6 版本开始,BookKeeper 提供了一个新的客户端 API,利用了Java 8 的 CompletableFuture 功能。引入了 WriteHandle、WriteAdvHandle 和 ReadHandle 来替换通用的 LedgerHandle。新的 API 在 org.apache.bookkeeper.client.api 包中,应该只使用该包中定义的接口。4.6 版本的新 API 仍然是实验性的,并可能在后续的次要版本中进行更改。

2.2.1、创建客户端

public void createBookKeeper() throws Exception {
    ClientConfiguration configuration = new ClientConfiguration();
    configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机
    //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群
    configuration.setAddEntryTimeout(2000);
    bookKeeper = BookKeeper
            .newBuilder(configuration)
            .build();
}

2.2.2、创建 ledger 并添加 entry

public void write() throws Exception {
    WriteHandle writeHandle = bookKeeper.newCreateLedgerOp()
            .withDigestType(DigestType.MAC)
            .withPassword("123".getBytes())
            .withEnsembleSize(3)
            .withWriteQuorumSize(3)
            .withAckQuorumSize(2)
            .execute()
            .get();
    log.info("ledgerId={}", writeHandle.getId());
    for (int i = 0; i < 10; i++) {
        long id = writeHandle.append(("bcd" + i).getBytes());
        log.info("id={}", id);
    }

    writeHandle.close();
}

2.2.3、从 ledger 中读取 entry

public void read() throws Exception {
    ReadHandle readHandle = bookKeeper.newOpenLedgerOp()
            .withLedgerId(1L)
            .withDigestType(DigestType.MAC)
            .withPassword("123".getBytes())
            .execute()
            .get();

    LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());
    Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
    while (iterator.hasNext()) {
        LedgerEntry ledgerEntry = iterator.next();
        log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));
    }

    readHandle.close();
}

2.2.4、删除 ledger

public void delete() throws Exception {
   bookKeeper.newDeleteLedgerOp()
           .withLedgerId(101L)
           .execute()
           .get();
}

2.2.5、完整代码

package com.abc.demo.bk;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.*;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Iterator;


@Slf4j
public class LedgerNewCase {
    private BookKeeper bookKeeper;

    @Before
    public void createBookKeeper() throws Exception {
        ClientConfiguration configuration = new ClientConfiguration();
        //configuration.setZkServers("10.49.196.33:2181");
        //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");
        configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");
        configuration.setAddEntryTimeout(2000);
        bookKeeper = BookKeeper
                .newBuilder(configuration)
                .build();
    }

    @After
    public void closeBookKeeper() throws Exception {
        bookKeeper.close();
    }

    @Test
    public void write() throws Exception {
        WriteHandle writeHandle = bookKeeper.newCreateLedgerOp()
                .withDigestType(DigestType.MAC)
                .withPassword("123".getBytes())
                .withEnsembleSize(3)
                .withWriteQuorumSize(3)
                .withAckQuorumSize(2)
                .execute()
                .get();
        log.info("ledgerId={}", writeHandle.getId());
        for (int i = 0; i < 10; i++) {
            long id = writeHandle.append(("bcd" + i).getBytes());
            log.info("id={}", id);
        }

        writeHandle.close();
    }

    @Test
    public void read() throws Exception {
        ReadHandle readHandle = bookKeeper.newOpenLedgerOp()
                .withLedgerId(1L)
                .withDigestType(DigestType.MAC)
                .withPassword("123".getBytes())
                .execute()
                .get();

        LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());
        Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
        while (iterator.hasNext()) {
            LedgerEntry ledgerEntry = iterator.next();
            log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));
        }

        readHandle.close();
    }

    @Test
    public void delete() throws Exception {
       bookKeeper.newDeleteLedgerOp()
               .withLedgerId(101L)
               .execute()
               .get();
    }
}
LedgerNewCase.java

3、Advanced Ledger API

在 4.5.0 版本中,BookKeeper 引入了一些高级 API 用于高级功能。高级 API 和普通 API 主要区别在写 entry,读 entry 是一致的。

3.1、LedgerHandleAdv

LedgerHandleAdv 是 LedgerHandle 的高级扩展,在创建时可以指定 LedgerId,在添加 entry 时需要传入 entryId。

public void ledgerHandleAdv() throws Exception {
    ClientConfiguration configuration = new ClientConfiguration();
    //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");
    configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");
    configuration.setAddEntryTimeout(2000);
    org.apache.bookkeeper.client.BookKeeper bookKeeper = new org.apache.bookkeeper.client.BookKeeper(configuration);

    //LedgerHandleAdv ledgerHandle = (LedgerHandleAdv) bookKeeper.createLedgerAdv(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());
    LedgerHandleAdv ledgerHandleAdv = (LedgerHandleAdv) bookKeeper.createLedgerAdv(100L, 3, 2, 2, org.apache.bookkeeper.client.BookKeeper.DigestType.MAC, "123".getBytes(), Collections.emptyMap());

    log.info("ledgerId={}", ledgerHandleAdv.getId());
    for (int i = 0; i < 10; i++) {
        long id = ledgerHandleAdv.addEntry(i, ("abc" + i).getBytes()); //entry id 需从 0 开始
        log.info("id={}", id);
    }

    ledgerHandleAdv.close();
    bookKeeper.close();
}

3.2、4.6 版本新 API 的 LedgerHandleAdv

public void writeAdvHandle() throws Exception {
    ClientConfiguration configuration = new ClientConfiguration();
    //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");
    configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");

    org.apache.bookkeeper.client.api.BookKeeper bookKeeper = org.apache.bookkeeper.client.api.BookKeeper
            .newBuilder(configuration)
            .build();
    WriteAdvHandle writeAdvHandle = bookKeeper.newCreateLedgerOp()
            .withDigestType(DigestType.MAC)
            .withPassword("123".getBytes())
            .withEnsembleSize(3)
            .withWriteQuorumSize(3)
            .withAckQuorumSize(2)
            .makeAdv() //CreateBuilder 转为 CreateAdvBuilder
            .withLedgerId(101L)
            .execute()
            .get();
    log.info("ledgerId={}", writeAdvHandle.getId());
    for (int i = 0; i < 10; i++) {
        long id = writeAdvHandle.write(i, ("bcd" + i).getBytes());
        log.info("id={}", id);
    }

    writeAdvHandle.close();
    bookKeeper.close();
}

 

 

参考:
https://bookkeeper.apache.org/docs/api/overview

标签:--,bookKeeper,public,2181,API,configuration,BookKeeper,10.49
From: https://www.cnblogs.com/wuyongyin/p/18163128

相关文章

  • WPF频繁更新UI卡顿问题
    我的WPF程序,需要连接PLC、CCD、RFID、扫码枪、控制卡所以我写了InitHardware();privatevoidInitHardware(){vartasks=newTask[]{//后台线程长连接,不取消令牌Task.Factory.StartNew(()=>InitConnPLC(),CancellationToken.None,Ta......
  • 24-06-19
    说下原生jdbc操作数据库流程?Class.forName()加载数据库连接驱动DriverManager.getConnection()获取数据连接对象根据SQL获取sql会话对象,有两种方式Statement,PreparedStatement执行SQL处理结果集,执行SQL前如果有参数值就设置参数值setXXX()关闭结果集,关闭会话,关闭连接为什......
  • 机器学习各个算法的优缺点!(上篇) 建议收藏。
      下篇地址:机器学习各个算法的优缺点!(下篇)建议收藏。-CSDN博客.......纯干货..........回归正则化算法集成算法决策树算法支持向量机降维算法聚类算法贝叶斯算法人工神经网络深度学习感兴趣的朋友可以点赞、转发起来,让更多的朋......
  • 行业发展:冷却超级计算机和量子计算机
    每天一篇行业发展资讯,让大家更及时了解外面的世界。更多资讯,请关注B站【莱歌数字】,有视频教程~~超级计算机每秒可以执行数十亿次计算(千兆兆次运算),也能应对影响笔记本电脑和手持设备的同样问题——过热。冷却超级计算机通常是通过设施级的空调来实现的。一些系统还通过热交换......
  • 密码、手机等隐私信息的保存方式
    将用户密码以密文形式存储进数据库是保护用户信息安全的重要措施。以下是一些常见的密码加密存储方案:哈希加密:使用哈希函数(如SHA-256,SHA-1,MD5等)将密码转换为固定长度的哈希值。SHA-256是目前较为安全的哈希算法,它生成的哈希值长度为64个字符,极大地提高了安全性。需要注......
  • 647. 回文子串(leetcode)
    647.回文子串(leetcode)题目描述给你一个字符串s,请你统计并返回这个字符串中回文子串的数目。回文字符串是正着读和倒过来读一样的字符串。子字符串是字符串中的由连续字符组成的一个序列。示例1输入:s=“abc”输出:3解释:三个回文子串:“a”,“b”,“c”......
  • Lightroom Classic 2023 for Mac(摄影后期图像编辑工具) v12.4版
    lightingClassic是Adobe公司推出的一款图像处理软件,是数字摄影后期制作的重要工具之一。与其他图像处理软件相比,LightroomClassic具有以下特点:LightroomClassic2023forMac(摄影后期图像编辑工具)软件地址高效的图像管理:LightroomClassic提供了强大的图像管理功能,可以......
  • Navicat Premium for Mac(多协议数据库管理工具) 16.3.4版
    NavicatPremium16是一款功能强大的跨平台数据库管理工具,支持多种数据库类型,如MySQL、MariaDB、Oracle、SQLite、PostgreSQL等等。它提供了丰富的数据库管理功能和工具,可以帮助开发人员和数据库管理员快速地创建、管理和维护数据库。NavicatPremiumforMac(多协议数据库管......
  • Redis Desktop Manager for Mac(Redis桌面管理工具) v2022.5.0中文版
    RedisDesktopManagerforMac是Mac平台上一款非常实用的Redis可视化工具。RDM支持SSL/TLS加密,SSH隧道,基于SSH隧道的TLS,为您提供了一个易于使用的GUI,可以访问您的Redis数据库并执行一些基本操作:将键视为树,CRUD键,通过shell执行命令。RedisDesktopManagerforMac(Redis桌......
  • 66Uptime – 网站服务器 & Cronjob 监控工具 v35.0.0扩展中文版安装
    66Uptime是一款自托管、易于使用、轻量级且高性能的网站服务器和Cronjob监控工具。以其丰富的功能和便捷的管理方式,为用户提供了全方位的网站服务器和Cronjob监控解决方案:主要功能:监控网站服务器和Cronjob的运行状态,确保它们持续稳定运行。提供从多个位置检查显示器的功......