首页 > 其他分享 >单体应用提高性能及处理高并发-异步处理与消息队列

单体应用提高性能及处理高并发-异步处理与消息队列

时间:2024-08-10 12:53:09浏览次数:17  
标签:异步 String 处理 队列 CompletableFuture import public

        在单体应用中,应对高并发和提升性能是开发者常面对的挑战。异步处理与消息队列是两个有效的手段,可以帮助开发者将耗时操作与主线程分离,减少阻塞,提高系统的响应速度和吞吐量。

1. 异步处理

异步处理允许应用程序在执行耗时操作时不阻塞主线程。这对于提高系统性能和并发处理能力至关重要。异步编程模型在不同编程语言中都有实现,比如Java的CompletableFuture和Python的async/await。

1.1 Java中的异步编程:CompletableFuture

CompletableFuture是Java 8中引入的一种支持异步编程的工具类。它允许将多个异步任务链式组合,并在任务完成后进行处理。

        实例1:并发调用多个API

        在某个单体应用中,可能需要并发调用多个外部服务,例如支付处理、库存查询等。同步调用这些服务会阻塞主线程,而异步调用则可以显著减少整体响应时间。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncApiCaller {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> paymentFuture = CompletableFuture.supplyAsync(() -> {
            return processPayment();
        });

        CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> {
            return checkStock();
        });

        CompletableFuture<Void> allOf = CompletableFuture.allOf(paymentFuture, stockFuture);

        allOf.thenRun(() -> {
            try {
                String paymentResult = paymentFuture.get();
                String stockResult = stockFuture.get();
                System.out.println("Payment Result: " + paymentResult);
                System.out.println("Stock Result: " + stockResult);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

        allOf.get();  // 阻塞,等待所有任务完成
    }

    private static String processPayment() {
        // 模拟支付处理
        try { Thread.sleep(3000); } catch (InterruptedException e) { }
        return "Payment Successful";
    }

    private static String checkStock() {
        // 模拟库存检查
        try { Thread.sleep(2000); } catch (InterruptedException e) { }
        return "Stock Available";
    }
}

        在这个示例中,CompletableFuture.supplyAsync用于异步调用支付处理和库存检查服务。CompletableFuture.allOf确保所有任务完成后,结果会被统一处理。这种方式避免了阻塞主线程,从而提高了系统的响应速度。

        实例2:异步文件上传处理

        在很多Web应用中,文件上传是一个常见功能,尤其是在需要处理大文件的场景中。同步处理文件上传通常会占用服务器大量资源,并导致用户体验下降。通过异步处理,文件上传过程可以不阻塞主线程,从而提高系统的并发处理能力。

        假设我们有一个简单的文件上传服务,用户可以上传图片文件。我们希望在用户上传文件后立即返回响应,文件的保存和处理在后台异步进行。

import java.io.File;
import java.io.FileOutputStream;
import java.util.concurrent.CompletableFuture;

public class FileUploadService {

    public CompletableFuture<String> uploadFileAsync(byte[] fileData, String fileName) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟文件保存
                File file = new File("/uploads/" + fileName);
                try (FileOutputStream fos = new FileOutputStream(file)) {
                    fos.write(fileData);
                }
                System.out.println("File saved: " + fileName);
                return "File uploaded successfully";
            } catch (Exception e) {
                e.printStackTrace();
                return "File upload failed";
            }
        });
    }
    
    public static void main(String[] args) throws Exception {
        FileUploadService service = new FileUploadService();
        byte[] mockFileData = new byte[1024];  // 模拟文件数据
        String fileName = "image.png";
        
        // 异步上传文件
        CompletableFuture<String> uploadFuture = service.uploadFileAsync(mockFileData, fileName);
        
        // 立即返回响应
        System.out.println("Upload initiated...");
        
        // 等待上传完成
        String result = uploadFuture.get();
        System.out.println("Upload result: " + result);
    }
}

        在这个示例中,uploadFileAsync方法异步保存文件数据,并立即返回给用户一个“上传已启动”的消息,用户无需等待文件保存完成。这种方式大幅提高了文件上传的处理效率,尤其是在大文件上传的场景中。

1.2 Python中的异步编程:async/await

        Python的asyncio库提供了异步编程的支持,通过asyncawait关键字,开发者可以实现非阻塞的I/O操作。

实例:异步读取多个文件

import asyncio

async def read_file(file_name):
    print(f'Start reading {file_name}')
    await asyncio.sleep(2)  # 模拟I/O操作
    print(f'Finished reading {file_name}')
    return f'Content of {file_name}'

async def main():
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [read_file(file) for file in files]
    results = await asyncio.gather(*tasks)
    print(f'Read results: {results}')

asyncio.run(main())

        在这个示例中,asyncio.gather并发执行多个文件读取操作。await关键字确保在等待I/O操作完成时不会阻塞其他任务的执行。这种方式适用于I/O密集型任务,如文件读取、网络请求等。

2. 消息队列

        消息队列是一种用于异步通信的机制,生产者将消息放入队列,消费者从队列中读取并处理消息。消息队列能够解耦应用程序中的不同模块,并使得长时间运行的任务能够异步处理,避免阻塞主线程。

2.1 RabbitMQ

        RabbitMQ是一个流行的消息队列系统,它支持多种消息传递协议,并具有良好的扩展性和可靠性。

实例:订单处理系统

        在一个电商平台中,用户下单后,需要进行一系列操作,如库存检查、支付处理、物流通知等。这些操作通常较为耗时,通过RabbitMQ可以将这些任务异步处理。

生产者代码:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class OrderProducer {
    private final static String QUEUE_NAME = "orderQueue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String orderMessage = "Order ID: 12345, Product: Laptop, Quantity: 1";
            channel.basicPublish("", QUEUE_NAME, null, orderMessage.getBytes());
            System.out.println(" [x] Sent '" + orderMessage + "'");
        }
    }
}

消费者代码:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class OrderConsumer {
    private final static String QUEUE_NAME = "orderQueue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages.");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                processOrder(message);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }

    private static void processOrder(String order) {
        // 模拟订单处理逻辑
        System.out.println(" [x] Processing order: " + order);
        try {
            Thread.sleep(5000);  // 模拟长时间订单处理
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(" [x] Order processed: " + order);
    }
}

        在这个实例中,生产者(OrderProducer)将订单消息放入RabbitMQ队列,消费者(OrderConsumer)从队列中读取消息并异步处理订单。这样,用户下单后可以立即返回响应,而订单处理则在后台异步完成。

2.2 Kafka

        Kafka是一个分布式流处理平台,具有高吞吐量、低延迟的特点,非常适合处理实时数据流和日志收集。

实例:用户行为日志收集

        在一个Web应用中,用户的行为数据(如登录、点击、搜索)需要被实时收集和分析。通过Kafka,可以将这些行为数据从多个服务异步发送到日志处理系统。

生产者代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class UserActivityProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String logMessage = "User ID: 789, Action: LOGIN, Timestamp: 2024-08-10 12:00:00";
        producer.send(new ProducerRecord<>("userActivityTopic", logMessage));

        System.out.println("Log message sent: " + logMessage);

        producer.close();
    }
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Collections;
import java.util.Properties;

public class UserActivityConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "userActivityGroup");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("userActivityTopic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Log message received: %s%n", record.value());
                processUserActivity(record.value());
            }
        }
    }

    private static void processUserActivity(String logMessage) {
        // 处理用户行为日志的业务逻辑
        System.out.println("Processing log message: " + logMessage);
    }
}

        在这个实例中,生产者(UserActivityProducer)将用户行为日志发送到Kafka主题中,消费者(UserActivityConsumer)从主题中消费日志数据并进行处理。Kafka的分布式特性确保了在高并发场景下,日志数据能够被高效地收集和处理。

3. 总结

        异步处理和消息队列是单体应用优化性能和处理高并发的重要技术。通过异步编程,可以避免长时间的I/O操作阻塞主线程,从而提高系统的响应速度;而消息队列能够解耦系统中的不同组件,将耗时操作异步化处理,减少对主线程的压力。结合实际项目场景,合理运用这些技术,可以显著提升单体应用的性能和并发处理能力。

标签:异步,String,处理,队列,CompletableFuture,import,public
From: https://blog.csdn.net/chenkun_321/article/details/141088866

相关文章

  • 单体应用提高性能和处理高并发-资源管理连接池
    在单体应用中,资源管理是提高性能和处理高并发的重要方面。资源管理通常涉及到数据库连接、线程管理等,这些资源如果管理不当,会导致系统性能下降甚至崩溃。连接池和线程池是两个常用的资源管理手段,能够显著提高资源的复用性,减少频繁创建和销毁资源的开销。1.数据库连接池数据......
  • React深度解析三:高级用法HOC、Hooks、异步组件
    本文分文三部分:HOC高阶组件higherordercomponentHooks16.8版本后新增的钩子API异步组件使用lazy和suspense两个api实现组件代码打包分割和异步加载一、HOC高阶组件1、定义高阶组件不是组件而是函数,是react中用于复用组件逻辑的高级技巧,HOC本身不是react一部分,是基于......
  • Android ndk string处理
    1.AndroidNDKNDK开发过程中常用的库定义在android-ndk-r25c/toolchains/llvm/prebuilt/linux-x86_64/sysroot/usr/lib/aarch64-linux-android如libc++_shared.solibc++_static.alibstdc++.a库ndk工具链下载:./bin/sdkmanager--install"ndk;25.0.8775105"2.链接问......
  • 回溯函数(算法)杂谈 -----可主动控制撤回逻辑处理的递归函数
    概述回溯,对接触了算法的人而言,并不陌生,其实严谨地说回溯函数就是递归函数,只不过在使用上,人们将它的一些细节抽离出来,进而演化出了所谓的回溯,在算法导论中,与其相关的被称为“回溯搜索算法”。回溯本质是递归的副产物,只要有递归调用就会有回溯。回溯法也经常和二叉树或N叉树......
  • grep文本处理工具
    bash的特性: 变量 快捷键 命令别名 命令行展开:{},~ 管道 输入、输出重定向 编程 命令行补全 路径补全 1、文本处理命令   字符统计wc:wordcount,wc[options]FILE       -l:只显示行数      -w:只显示单词 ......
  • vit的图像预处理过程
    在VisionTransformer(ViT)中,图像的预处理过程主要包括将图像转换为适合Transformer模型输入的格式。以下是从原始图像到模型输入所进行的主要操作步骤:1.图像尺寸调整(Resize)将输入图像调整为固定大小,通常是正方形(例如,224x224像素)。这是为了统一所有输入图像的尺寸,使得后......
  • 信息学奥赛一本通 1128 图像模糊处理
    1128:图像模糊处理时间限制:1000ms      内存限制:65536KB提交数:69990   通过数: 30350【题目描述】给定n行m列的图像各像素点的灰度值,要求用如下方法对其进行模糊化处理:1.四周最外侧的像素点灰度值不变;2.中间各像素点新灰度值为该像素点及其上下左......
  • 面对员工的抱怨,该怎么办?教你4个方法,定能妥善处理!
    面对员工的抱怨,该怎么办?教你4个方法,定能妥善处理!一:给员工一个反映意见的平台在这个平台上,员工可以畅所欲言,作为管理者,不能只是允许员工去歌颂企业,而不允许员工提出一些批评和建议。每个管理者都希望在批评员工的时候,不管对错都先接受;同样的,作为管理者,面对员工的抱怨或批评更......
  • Element学习(axios异步加载数据、案例操作)(5)
    1、这次学习的是上次还未完成好的恶element案例,对列表数据的异步加载,并渲染展示。——>axios来发送异步请求(1)(2)在vue当中安装axios(注意在当前的项目目录,并且安装完之后再将项目重启一下)(3)这里我用到数据的url地址为:https://mock.apifox.cn/m1/3128855-0-default/emp/list......
  • 异步FIFO设计
    AsynchronousFIFODesign总结来自CliffordE.Cummings论文《SimulationandSynthesisTechniquesforAsynchronousFIFODesign》一、设计难点使用格雷码计数时空和满的判断。同步FIFO读写时钟相同,而异步FIFO读写来自不同两个读写时钟,需要考虑跨时钟域设计。二、设......