本文主要介绍 BookKeeper 的 API,文中所使用到的软件版本:Java 1.8.0_341、BookKeeper 4.16.5。
1、引入依赖
<dependency> <groupId>org.apache.bookkeeper</groupId> <artifactId>bookkeeper-server</artifactId> <version>4.16.5</version> </dependency>
2、Ledger API
2.1、传统 API
2.1.1、创建客户端
public void createBookKeeper() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机 //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群 configuration.setAddEntryTimeout(2000); bookKeeper = new BookKeeper(configuration); }
2.1.2、创建 ledger 并添加 entry
//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes()); LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes()); log.info("ledgerId={}", ledgerHandle.getId()); for (int i = 0; i < 10; i++) { long id = ledgerHandle.addEntry(("abc" + i).getBytes()); log.info("id={}", id); } ledgerHandle.close();
创建 ledger 时可以指定存储该 ledger 的节点个数、副本个数、几个副本 ack 表示写入成功。
2.1.3、从 ledger 中读取 entry
public void readLedger() throws Exception { LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes()); Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed()); while (ledgerEntrys.hasMoreElements()) { LedgerEntry ledgerEntry = ledgerEntrys.nextElement(); log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry())); } ledgerHandle.close(); }
2.1.4、删除 ledger
public void deleteLedger() throws Exception { bookKeeper.deleteLedger(0L); }
2.1.5、完整代码
package com.abc.demo.bk; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ClientConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Enumeration; @Slf4j public class LedgerCase { private BookKeeper bookKeeper; @Before public void createBookKeeper() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); //configuration.setZkServers("10.49.196.33:2181"); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers"); //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers"); configuration.setAddEntryTimeout(2000); bookKeeper = new BookKeeper(configuration); } @After public void closeBookKeeper() throws Exception { bookKeeper.close(); } @Test public void createLedger() throws Exception { //LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes()); LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes()); log.info("ledgerId={}", ledgerHandle.getId()); for (int i = 0; i < 10; i++) { long id = ledgerHandle.addEntry(("abc" + i).getBytes()); log.info("id={}", id); } ledgerHandle.close(); } @Test public void readLedger() throws Exception { LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes()); Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed()); while (ledgerEntrys.hasMoreElements()) { LedgerEntry ledgerEntry = ledgerEntrys.nextElement(); log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry())); } ledgerHandle.close(); } @Test public void deleteLedger() throws Exception { bookKeeper.deleteLedger(0L); } }LedgerCase.java
2.2、新 API
自 4.6 版本开始,BookKeeper 提供了一个新的客户端 API,利用了Java 8 的 CompletableFuture 功能。引入了 WriteHandle、WriteAdvHandle 和 ReadHandle 来替换通用的 LedgerHandle。新的 API 在 org.apache.bookkeeper.client.api 包中,应该只使用该包中定义的接口。4.6 版本的新 API 仍然是实验性的,并可能在后续的次要版本中进行更改。
2.2.1、创建客户端
public void createBookKeeper() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机 //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群 configuration.setAddEntryTimeout(2000); bookKeeper = BookKeeper .newBuilder(configuration) .build(); }
2.2.2、创建 ledger 并添加 entry
public void write() throws Exception { WriteHandle writeHandle = bookKeeper.newCreateLedgerOp() .withDigestType(DigestType.MAC) .withPassword("123".getBytes()) .withEnsembleSize(3) .withWriteQuorumSize(3) .withAckQuorumSize(2) .execute() .get(); log.info("ledgerId={}", writeHandle.getId()); for (int i = 0; i < 10; i++) { long id = writeHandle.append(("bcd" + i).getBytes()); log.info("id={}", id); } writeHandle.close(); }
2.2.3、从 ledger 中读取 entry
public void read() throws Exception { ReadHandle readHandle = bookKeeper.newOpenLedgerOp() .withLedgerId(1L) .withDigestType(DigestType.MAC) .withPassword("123".getBytes()) .execute() .get(); LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed()); Iterator<LedgerEntry> iterator = ledgerEntries.iterator(); while (iterator.hasNext()) { LedgerEntry ledgerEntry = iterator.next(); log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes())); } readHandle.close(); }
2.2.4、删除 ledger
public void delete() throws Exception { bookKeeper.newDeleteLedgerOp() .withLedgerId(101L) .execute() .get(); }
2.2.5、完整代码
package com.abc.demo.bk; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.*; import org.apache.bookkeeper.conf.ClientConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Iterator; @Slf4j public class LedgerNewCase { private BookKeeper bookKeeper; @Before public void createBookKeeper() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); //configuration.setZkServers("10.49.196.33:2181"); //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers"); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers"); configuration.setAddEntryTimeout(2000); bookKeeper = BookKeeper .newBuilder(configuration) .build(); } @After public void closeBookKeeper() throws Exception { bookKeeper.close(); } @Test public void write() throws Exception { WriteHandle writeHandle = bookKeeper.newCreateLedgerOp() .withDigestType(DigestType.MAC) .withPassword("123".getBytes()) .withEnsembleSize(3) .withWriteQuorumSize(3) .withAckQuorumSize(2) .execute() .get(); log.info("ledgerId={}", writeHandle.getId()); for (int i = 0; i < 10; i++) { long id = writeHandle.append(("bcd" + i).getBytes()); log.info("id={}", id); } writeHandle.close(); } @Test public void read() throws Exception { ReadHandle readHandle = bookKeeper.newOpenLedgerOp() .withLedgerId(1L) .withDigestType(DigestType.MAC) .withPassword("123".getBytes()) .execute() .get(); LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed()); Iterator<LedgerEntry> iterator = ledgerEntries.iterator(); while (iterator.hasNext()) { LedgerEntry ledgerEntry = iterator.next(); log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes())); } readHandle.close(); } @Test public void delete() throws Exception { bookKeeper.newDeleteLedgerOp() .withLedgerId(101L) .execute() .get(); } }LedgerNewCase.java
3、Advanced Ledger API
在 4.5.0 版本中,BookKeeper 引入了一些高级 API 用于高级功能。高级 API 和普通 API 主要区别在写 entry,读 entry 是一致的。
3.1、LedgerHandleAdv
LedgerHandleAdv 是 LedgerHandle 的高级扩展,在创建时可以指定 LedgerId,在添加 entry 时需要传入 entryId。
public void ledgerHandleAdv() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers"); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers"); configuration.setAddEntryTimeout(2000); org.apache.bookkeeper.client.BookKeeper bookKeeper = new org.apache.bookkeeper.client.BookKeeper(configuration); //LedgerHandleAdv ledgerHandle = (LedgerHandleAdv) bookKeeper.createLedgerAdv(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes()); LedgerHandleAdv ledgerHandleAdv = (LedgerHandleAdv) bookKeeper.createLedgerAdv(100L, 3, 2, 2, org.apache.bookkeeper.client.BookKeeper.DigestType.MAC, "123".getBytes(), Collections.emptyMap()); log.info("ledgerId={}", ledgerHandleAdv.getId()); for (int i = 0; i < 10; i++) { long id = ledgerHandleAdv.addEntry(i, ("abc" + i).getBytes()); //entry id 需从 0 开始 log.info("id={}", id); } ledgerHandleAdv.close(); bookKeeper.close(); }
3.2、4.6 版本新 API 的 LedgerHandleAdv
public void writeAdvHandle() throws Exception { ClientConfiguration configuration = new ClientConfiguration(); //configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers"); configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers"); org.apache.bookkeeper.client.api.BookKeeper bookKeeper = org.apache.bookkeeper.client.api.BookKeeper .newBuilder(configuration) .build(); WriteAdvHandle writeAdvHandle = bookKeeper.newCreateLedgerOp() .withDigestType(DigestType.MAC) .withPassword("123".getBytes()) .withEnsembleSize(3) .withWriteQuorumSize(3) .withAckQuorumSize(2) .makeAdv() //CreateBuilder 转为 CreateAdvBuilder .withLedgerId(101L) .execute() .get(); log.info("ledgerId={}", writeAdvHandle.getId()); for (int i = 0; i < 10; i++) { long id = writeAdvHandle.write(i, ("bcd" + i).getBytes()); log.info("id={}", id); } writeAdvHandle.close(); bookKeeper.close(); }
参考:
https://bookkeeper.apache.org/docs/api/overview