首页 > 编程语言 >Pulsar 入门实战(5)--Java 操作 Pulsar

Pulsar 入门实战(5)--Java 操作 Pulsar

时间:2024-11-17 08:58:15浏览次数:1  
标签:Java log -- topic admin Pulsar message my public

本文主要介绍使用 Java 来操作 Pulsar,文中所使用到的软件版本:Java 17.0.7(Pulsar 服务使用)、Java 1.8.0_341(客户端使用)、Pulsar 3.3.0、pulsar-client 3.3.0。

1、引入依赖

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-admin</artifactId>
    <version>3.3.0</version>
</dependency>

2、初始化 PulsarClient 和 PulsarAdmin

PulsarClient:

@Before
public void before() throws PulsarClientException {
    client = PulsarClient.builder()
            .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
            .build();
}

PulsarAdmin:

@Before
public void before() throws PulsarClientException {
    admin = PulsarAdmin.builder()
            .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
            .build();
}

3、消费者

3.1、同步消费

@Test
public void sync() throws PulsarClientException {
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic") //主题
            .subscriptionName("my-subscription") //订阅名称
            .subscriptionType(SubscriptionType.Shared) //订阅模式
            .subscribe();

    while (true) {
        Message<String> message = consumer.receive();
        try {
            log.info("topicName={},value={}", message.getTopicName(), message.getValue());
            consumer.acknowledge(message);
        } catch (Exception e) {
            consumer.negativeAcknowledge(message);
        }
    }
}

3.2、异步消费

public void async() throws InterruptedException {
    client.newConsumer(Schema.STRING)
            .topic("my-topic2")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .subscribeAsync()
            .thenAccept(this::receiveAsync);

    Thread.sleep(1000 * 500);
}

private void receiveAsync(Consumer<String> consumer) {
    consumer.receiveAsync().thenAccept(message -> {
        try {
            log.info("messageId={},value={}", message.getMessageId(), message.getValue());
            consumer.acknowledge(message);
        } catch (Exception e) {
            consumer.negativeAcknowledge(message);
        }
        receiveAsync(consumer);
    });
}

3.3、使用 MessageListener

@Test
public void messageListener() throws Exception {
    PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
            .listenerThreads(3)
            .build();
    MessageListener<String> messageListener = (consumer, msg) -> {
        try {
            log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    };

    client.newConsumer(Schema.STRING)
            .topic("my-topic")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .messageListener(messageListener)
            .subscribe();
    Thread.sleep(1000 * 500);
}

3.4、重试信主题

/**
 * 重试信主题
 * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
 */
@Test
public void retryTopic() throws Exception {
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic-r")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .enableRetry(true)
            .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(5)
                    .build())
            .subscribe();
    while (true) {
        Message<String> message = consumer.receive();
        try {
            log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
            //TODO:业务处理,可能发生异常
            //throw new RuntimeException();
            consumer.acknowledge(message);
        } catch (Exception e) {
            log.error("", e);
            consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
        }
    }
}

3.5、死信主题

/**
 * 死信主题
 * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
 */
@Test
public void deadTopic() throws Exception {
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic-d")
            .subscriptionName("my-subscription")
            .subscriptionType(SubscriptionType.Shared)
            .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(5)
                    .initialSubscriptionName("init-sub")
                    .build())
            .subscribe();
    while (true) {
        Message<String> message = consumer.receive();
        try {
            log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
            //TODO:业务处理,可能发生异常
            //throw new RuntimeException();
            consumer.acknowledge(message);
        } catch (Exception e) {
            log.error("", e);
            consumer.negativeAcknowledge(message);
        }
    }
}

/**
 * 订阅死信队列,处理其中的消息
 */
@Test
public void consumerDeadTopic() throws Exception {
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
            .topic("my-topic-d-my-subscription-DLQ")
            .subscriptionName("init-sub")
            .subscriptionType(SubscriptionType.Shared)
            .subscribe();
    while (true) {
        Message<String> message = consumer.receive();
        try {
            log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
        } catch (Exception e) {
            log.error("", e);
        }
        consumer.acknowledge(message);
    }
}

3.6、完整代码

package com.abc.demo.pulsar;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

@Slf4j
public class ConsumerCase {
    private PulsarClient client;

    @Before
    public void before() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
                .build();
    }

    @Test
    public void sync() throws PulsarClientException {
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("my-topic") //主题
                .subscriptionName("my-subscription") //订阅名称
                .subscriptionType(SubscriptionType.Shared) //订阅模式
                .subscribe();

        while (true) {
            Message<String> message = consumer.receive();
            try {
                log.info("topicName={},value={}", message.getTopicName(), message.getValue());
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }

    @Test
    public void async() throws InterruptedException {
        client.newConsumer(Schema.STRING)
                .topic("my-topic2")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscribeAsync()
                .thenAccept(this::receiveAsync);

        Thread.sleep(1000 * 500);
    }

    private void receiveAsync(Consumer<String> consumer) {
        consumer.receiveAsync().thenAccept(message -> {
            try {
                log.info("messageId={},value={}", message.getMessageId(), message.getValue());
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
            receiveAsync(consumer);
        });
    }

    @Test
    public void messageListener() throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
                .listenerThreads(3)
                .build();
        MessageListener<String> messageListener = (consumer, msg) -> {
            try {
                log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        };

        client.newConsumer(Schema.STRING)
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .messageListener(messageListener)
                .subscribe();
        Thread.sleep(1000 * 500);
    }

    /**
     * 重试信主题
     * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
     */
    @Test
    public void retryTopic() throws Exception {
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("my-topic-r")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(5)
                        .build())
                .subscribe();
        while (true) {
            Message<String> message = consumer.receive();
            try {
                log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
                //TODO:业务处理,可能发生异常
                //throw new RuntimeException();
                consumer.acknowledge(message);
            } catch (Exception e) {
                log.error("", e);
                consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
            }
        }
    }

    /**
     * 死信主题
     * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
     */
    @Test
    public void deadTopic() throws Exception {
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("my-topic-d")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(5)
                        .initialSubscriptionName("init-sub")
                        .build())
                .subscribe();
        while (true) {
            Message<String> message = consumer.receive();
            try {
                log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
                //TODO:业务处理,可能发生异常
                //throw new RuntimeException();
                consumer.acknowledge(message);
            } catch (Exception e) {
                log.error("", e);
                consumer.negativeAcknowledge(message);
            }
        }
    }

    /**
     * 订阅死信队列,处理其中的消息
     */
    @Test
    public void consumerDeadTopic() throws Exception {
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("my-topic-d-my-subscription-DLQ")
                .subscriptionName("init-sub")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
        while (true) {
            Message<String> message = consumer.receive();
            try {
                log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
            } catch (Exception e) {
                log.error("", e);
            }
            consumer.acknowledge(message);
        }
    }
}
ConsumerCase.java

4、Reader

@Test
public void reader() throws Exception {
    //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
    MessageId messageId = MessageId.earliest;
    Reader reader = client.newReader(Schema.STRING)
            .topic("my-topic")
            .startMessageId(messageId)
            .create();

    while (true) {
        Message msg = reader.readNext();
        log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
    }
}

完整代码:

package com.abc.demo.pulsar;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.junit.Before;
import org.junit.Test;

import java.util.Base64;

@Slf4j
public class ReaderCase {
    private PulsarClient client;

    @Before
    public void before() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
                .build();
    }

    @Test
    public void reader() throws Exception {
        //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
        MessageId messageId = MessageId.earliest;
        Reader reader = client.newReader(Schema.STRING)
                .topic("my-topic")
                .startMessageId(messageId)
                .create();

        while (true) {
            Message msg = reader.readNext();
            log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
        }
    }
}
ReaderCase.java

5、生产者

5.1、同步发送

@Test
public void sync() throws PulsarClientException {
    Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("my-topic-d")
            .create();
    for (int i = 0; i < 3; i++) {
        MessageId messageId = producer.send(("hello" + i));
        log.info("messageId={}", messageId);
    }
    producer.close();
}

5.2、异步发送

@Test
public void async() throws InterruptedException {
    client.newProducer(Schema.STRING)
            .topic("my-topic2")
            .createAsync()
            .thenAccept(producer -> {
                for (int i = 0; i < 10; i++) {
                    producer.sendAsync("hello" + i).thenAccept(messageId -> {
                                log.info("messageId={}", messageId);
                    });
                }
            });
    Thread.sleep(1000 * 5);
}

5.3、详细设置消息

@Test
public void configMessage() throws PulsarClientException {
    Producer<byte[]> producer = client.newProducer()
            .topic("my-topic")
            .create();
    MessageId messageId = producer.newMessage(Schema.STRING)
            .key("my-key") //设置消息key
            .eventTime(System.currentTimeMillis()) //设置事件事件
            .sequenceId(123) //设置 sequenceId
            .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
            .property("my-key", "my-value") //自定义属性
            .value("content")
            .send();
    log.info("messageId={}", messageId);
    producer.close();
}

5.4、完整代码

package com.abc.demo.pulsar;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;


@Slf4j
public class ProducerCase {
    private PulsarClient client;

    @Before
    public void before() throws PulsarClientException {
        client = PulsarClient.builder()
                //.serviceUrl("pulsar://10.49.196.33:6650")
                .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
                .build();
    }

    @After
    public void after() throws PulsarClientException {
        client.close();
    }

    @Test
    public void sync() throws PulsarClientException {
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("my-topic-d")
                .create();
        for (int i = 0; i < 3; i++) {
            MessageId messageId = producer.send(("hello" + i));
            log.info("messageId={}", messageId);
        }
        producer.close();
    }

    @Test
    public void async() throws InterruptedException {
        client.newProducer(Schema.STRING)
                .topic("my-topic2")
                .createAsync()
                .thenAccept(producer -> {
                    for (int i = 0; i < 10; i++) {
                        producer.sendAsync("hello" + i).thenAccept(messageId -> {
                                    log.info("messageId={}", messageId);
                        });
                    }
                });
        Thread.sleep(1000 * 5);
    }

    @Test
    public void configMessage() throws PulsarClientException {
        Producer<byte[]> producer = client.newProducer()
                .topic("my-topic")
                .create();
        MessageId messageId = producer.newMessage(Schema.STRING)
                .key("my-key") //设置消息key
                .eventTime(System.currentTimeMillis()) //设置事件事件
                .sequenceId(123) //设置 sequenceId
                .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
                .property("my-key", "my-value") //自定义属性
                .value("content")
                .send();
        log.info("messageId={}", messageId);
        producer.close();
    }
}
ProducerCase.java

6、Admin

6.1、Brokers

6.1.1、列出活动 broker

admin.brokers().getActiveBrokers(clusterName)

6.1.2、列出 broker 的命名空间

admin.brokers().getOwnedNamespaces(cluster,brokerUrl);

6.1.3、获取动态配置名称

admin.brokers().getDynamicConfigurationNames();

6.1.4、更新动态配置

admin.brokers().updateDynamicConfiguration(configName, configValue);

6.1.5、获取已经更新过的动态配置

admin.brokers().getAllDynamicConfigurations();

6.1.6、获取 leader broker

admin.brokers().getLeaderBroker()

6.2、Clusters

6.2.1、获取集群配置信息

admin.clusters().getCluster(clusterName);

6.2.2、获取集群列表

admin.clusters().getClusters();

6.3、Tenant

6.3.1、列出租户

admin.tenants().getTenants();

6.3.2、创建租户

admin.tenants().createTenant(tenantName, tenantInfo);

6.3.3、获取租户配置信息

admin.tenants().getTenantInfo(tenantName);

6.3.4、删除租户

admin.Tenants().deleteTenant(tenantName);

6.3.5、更新租户

admin.tenants().updateTenant(tenantName, tenantInfo);

6.4、Namespaces

6.4.1、创建命名空间

admin.namespaces().createNamespace(namespace);

6.4.2、获取命名空间策略

admin.namespaces().getPolicies(namespace);

6.4.3、列出命名空间

admin.namespaces().getNamespaces(tenant);

6.4.4、删除命名空间

admin.namespaces().deleteNamespace(namespace);

6.5、Topics

6.5.1、查看订阅的待消费消息(不会消费消息)

admin.topics().peekMessages(topic, subName, numMessages);

6.5.2、通过消息 ID 获取消息

admin.topics().getMessageById(topic, ledgerId, entryId);

6.5.3、通过相对于最早或最新消息的位置来查找消息

admin.topics().examineMessage(topic, "latest", 1);

6.5.4、通过时间获取消息的 ID

admin.topics().getMessageIdByTimestamp(topic, timestamp);

6.5.5、跳过特定主题的某个订阅中的若干条未消费消息

admin.topics().skipMessages(topic, subName, numMessages);

6.5.6、跳过特定主题的某个订阅中的所有未消费消息

admin.topics().skipAllMessages(topic, subName);

6.5.7、根据时间重置游标

admin.topics().resetCursor(topic, subName, timestamp);

6.5.8、获取主题所属的 broker

admin.lookups().lookupTopic(topic);

6.5.9、获取分区主题所属的 broker

admin.lookups().lookupPartitionedTopic(topic);

6.5.10、获取主题的订阅信息

admin.topics().getSubscriptions(topic);

6.5.11、获取最新消息的 ID

admin.topics().getLastMessage(topic);

6.5.12、创建非分区主题

admin.topics().createNonPartitionedTopic(topicName);

6.5.13、删除非分区主题

admin.topics().delete(topic);

6.5.14、列出非分区主题

admin.topics().getList(namespace);

6.5.15、获取非分区主题状态

admin.topics().getStats(topic, false /* is precise backlog */);

6.5.16、获取非分区主题内部状态

admin.topics().getInternalStats(topic);

6.5.17、创建分区主题

admin.topics().createPartitionedTopic(topicName, numPartitions);

6.5.18、获取分区主题元数据信息

admin.topics().getPartitionedTopicMetadata(topicName);

6.5.19、更新分区主题的分区数(只能比原来大)

admin.topics().updatePartitionedTopic(topic, numPartitions);

6.5.20、删除分区主题

admin.topics().deletePartitionedTopic(topic);

6.5.21、列出分区主题

admin.topics().getPartitionedTopicList(namespace);

6.5.22、获取分区主题状态

admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);

6.5.23、获取分区主题内部状态

admin.topics().getPartitionedInternalStats(topic);

6.5.24、创建订阅

admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);

6.5.25、获取订阅

admin.topics().getSubscriptions(topic);

6.5.26、删除订阅

admin.topics().deleteSubscription(topic, subscriptionName);

6.6、完整代码

package com.abc.demo.pulsar;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.HashSet;

@Slf4j
public class AdminCase {
    private PulsarAdmin admin;

    @Before
    public void before() throws PulsarClientException {
        admin = PulsarAdmin.builder()
                .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
                .build();
    }

    @After
    public void after() {
        admin.close();
    }

    @Test
    public void broker() throws PulsarAdminException {
        log.info("getActiveBrokers={}", admin.brokers().getActiveBrokers());
        log.info("getOwnedNamespaces={}", admin.brokers().getOwnedNamespaces("pulsar-cluster-1", "app1:8080"));
        log.info("getDynamicConfigurationNames={}", admin.brokers().getDynamicConfigurationNames());
        admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true");
        log.info("getAllDynamicConfigurations={}", admin.brokers().getAllDynamicConfigurations());
        log.info("getLeaderBroker={}", admin.brokers().getLeaderBroker());
    }

    @Test
    public void clusters() throws PulsarAdminException {
        log.info("getCluster={}", admin.clusters().getCluster("pulsar-cluster-1"));
        log.info("getClusters={}", admin.clusters().getClusters());
    }
    
    @Test
    public void tenants() throws PulsarAdminException {
        log.info("getTenants={}", admin.tenants().getTenants());
        TenantInfoImpl tenantInfo = new TenantInfoImpl();
        tenantInfo.setAdminRoles(new HashSet<>());
        tenantInfo.setAllowedClusters(Collections.singleton("pulsar-cluster-1"));
        admin.tenants().createTenant("test-tenant", tenantInfo);
        log.info("getTenantInfo={}", admin.tenants().getTenantInfo("test-tenant"));
        admin.tenants().updateTenant("test-tenant", tenantInfo);
        admin.tenants().deleteTenant("test-tenant");
    }

    @Test
    public void namespaces() throws PulsarAdminException {
        admin.namespaces().createNamespace("public/test-ns");
        log.info("getPolicies={}", admin.namespaces().getPolicies("public/default"));
        log.info("getNamespaces={}", admin.namespaces().getNamespaces("public"));
        admin.namespaces().deleteNamespace("public/test-ns");
    }
    
    @Test
    public void topics() throws PulsarAdminException {
        log.info("peekMessages={}", admin.topics().peekMessages("persistent://public/default/my-topic", "my-subscription", 3));
        Message<byte[]> message = admin.topics().getMessageById("persistent://public/default/my-topic", 171, 16);
        log.info("getMessageById={}", new String(message.getData()));
        message = admin.topics().examineMessage("persistent://public/default/my-topic", "latest", 1);
        log.info("examineMessage={}", new String(message.getData()));
        log.info("getMessageIdByTimestamp={}", admin.topics().getMessageIdByTimestamp("persistent://public/default/my-topic", System.currentTimeMillis()));
        admin.topics().skipMessages("persistent://public/default/my-topic", "my-subscription", 1);
        admin.topics().skipAllMessages("persistent://public/default/my-topic", "my-subscription");
        admin.topics().resetCursor("persistent://public/default/my-topic", "my-subscription", System.currentTimeMillis() - 1000 * 60 * 15);
        log.info("lookupTopic={}", admin.lookups().lookupTopic("persistent://public/default/my-topic"));
        log.info("lookupPartitionedTopic={}", admin.lookups().lookupPartitionedTopic("persistent://public/default/my-topic2"));
        log.info("getSubscriptions={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
        log.info("getLastMessageId={}", admin.topics().getLastMessageId("persistent://public/default/my-topic"));

        admin.topics().createNonPartitionedTopic("persistent://public/default/test-topic");
        admin.topics().delete("persistent://public/default/test-topic");
        log.info("getList={}", admin.topics().getList("public/default"));
        log.info("getStats={}", admin.topics().getStats("persistent://public/default/my-topic", false));
        log.info("getInternalStats={}", admin.topics().getInternalStats("persistent://public/default/my-topic"));

        admin.topics().createPartitionedTopic("persistent://public/default/test-topic-p", 2);
        log.info("getPartitionedTopicMetadata={}", admin.topics().getPartitionedTopicMetadata("persistent://public/default/test-topic-p"));
        admin.topics().updatePartitionedTopic("persistent://public/default/test-topic-p", 3);
        admin.topics().deletePartitionedTopic("persistent://public/default/test-topic-p");
        log.info("getStats={}", admin.topics().getPartitionedStats("persistent://public/default/my-topic2", false));
        log.info("getInternalStats={}", admin.topics().getPartitionedInternalStats("persistent://public/default/my-topic2"));

        admin.topics().createSubscription("persistent://public/default/my-topic", "test-subscription", MessageId.latest);
        log.info("getStats={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
        admin.topics().deleteSubscription("persistent://public/default/my-topic", "test-subscription");
    }
}
AdminCase.java

 

 

参考:
https://pulsar.apache.org/docs/3.3.x/client-libraries-java/
https://pulsar.apache.org/docs/3.3.x/admin-api-overview/

标签:Java,log,--,topic,admin,Pulsar,message,my,public
From: https://www.cnblogs.com/wuyongyin/p/18336575

相关文章

  • 模拟赛 2
    11.16T2先考虑前两个限制,发现都是与奇偶性相关的,考虑建二分图,在不考虑第三个限制下是一个最大独立集计数。发现由于连边方式是每一位向相邻两位连边,那么最大独立集数一定是\(\frac{n}{2}\),并且一定形如先选一段奇数再选一段偶数的形式。再考虑一下第三个限制,考虑对每个配对的......
  • WireGuard 的工作原理(转)
    原文地址——https://fonzcci.cn/#simple-network-interface简单网络接口WireGuard的工作原理是添加一个(或多个)网络接口,如eth0或wlan0,称为wg0(或wg1、wg2、等)。然后可以使用或wg3正常配置此网络接口,使用或为其添加和删除路由,等等,所有普通的网络实用程序都可以使用。使用工具配置......
  • 巅峰极客 2023
    easy_Forensic先查看镜像信息,vol.py-fsecret.rawimageinfo再查看进程vol.py-fsecret.raw--profile=Win7SP1x64pslist没什么用再获取文件vol.py-fsecret.raw--profile=Win7SP1x64filescan发现了一个压缩包一个jpg一个txt文本或者直接过滤vol.py-f......
  • i-MES生产制造管理系统-可视化看板
    可视化看板最主要的目的是为了将生产状况透明化,让大家能够快速了解当前的生产状况以及进度,通过大数据汇总分析,为管理层做决策提供数据支撑,看板数据必须达到以下基本要求:数据准确--真实反映生产情况数据及时--实时反馈数据,避免决策滞后简单易用--无需复杂培训,简单了解后即......
  • sicp每日一题[2.80]
    Exercise2.80Defineagenericpredicate=zero?thattestsifitsargumentiszero,andinstallitinthegenericarithmeticpackage.Thisoperationshouldworkforordinarynumbers,rationalnumbers,andcomplexnumbers.这道题更简单,直接与0相比较就行,以下......
  • 2024闽盾杯
    2024闽盾杯签到题-学会SM解题思路、相关代码和Flag截图:根据题目可以看出是SM3加密https://btool.cn/file-sm3#google_vignette去这个网站解一个Logo用Stegsolve查看Lsb隐写,这个地方藏了信息,查看得到flag。学会Office然后根据计算机的成绩排序最后再宏解密得到flagflag{jisuanjiche......
  • HarmonyOS4+NEXT星河版入门与项目实战--------TypeScript语法(循环控制与函数方法)
    文章目录1、循环控制1、for循环与while循环2、数组快捷迭代方法2、函数1、function关键字2、可选参数3、默认参数4、匿名函数5、函数表达式6、结合使用7、函数声明案例1、循环控制1、for循环与while循环2、数组快捷迭代方法数组除了使用常规的for循环......
  • 基于 FMEA 的质量标准与国际认证体系的融合与对接
    【大家好,我是唐Sun,唐Sun的唐,唐Sun的Sun。】摘要:本文深入探讨了基于FMEA(失效模式及后果分析)的质量标准与国际认证体系(如ISO等)的融合与对接。阐述了它们之间的关系、融合的必要性和优势,分析了在融合与对接过程中面临的挑战,并提出了相应的解决策略,以促进企业质量管理水平的提......
  • FMEA 与数字化技术的融合:提升效率与准确性之路
    【大家好,我是唐Sun,唐Sun的唐,唐Sun的Sun。】摘要: 本文旨在深入探讨如何将FMEA(失效模式及后果分析)与数字化技术,如大数据分析和智能制造系统相结合,以显著提高其效率和准确性。随着工业4.0时代的到来,数字化技术的飞速发展为传统的质量管理方法带来了新的机遇和挑战。通过整......
  • HarmonyOS4+NEXT星河版入门与项目实战--------TypeScript语法(变量声明与条件控制)
    文章目录1、变量声明1、格式与案例2、在线体验TypeScript2、条件控制1、if-else条件控制switch条件控制1、变量声明1、格式与案例TypeScript常见变量主要有string字符串、number数值、boolen布尔、any不确定类型、Object对象类型、Array数组类型以及......