概念
在计算机科学中,有两种不同类型的程序:IO 密集型和 CPU 密集型。这两种程序的主要差别在于它们在执行任务时瓶颈所在的地方。
- IO 密集型:这类程序主要通过读写磁盘文件、网络通信等外部设备来完成任务,因此它们大多数时间都在等待外部设备的响应。这些程序在处理等待时间方面效率较低,但对于存储和传输数据方面效率较高。
- CPU 密集型:这类程序主要通过使用CPU完成任务,因此它们大多数时间都在计算。这些程序在处理计算任务方面效率较高,但对于读写磁盘文件、网络通信等外部设备的响应方面效率较低。
需要注意的是,大多数程序都是一种折衷的方式,即同时存在 IO 密集型和 CPU 密集型的特征。因此,在设计程序时需要适当考虑两者的平衡,以确保程序的效率和可用性。
如何设计 IO 密集型程序?
在设计 IO 密集型程序时,需要考虑如何提高程序的并发性和线程安全性,以最大化利用外部设备的性能。下面是一些建议:
- 使用线程池:可以使用线程池技术来提高程序的并发性。线程池可以重复利用线程,从而减少线程创建和销毁的开销。
- 采用异步 I/O:异步 I/O 技术可以有效提高程序的线程安全性。当程序读写外部设备时,异步 I/O 会立即返回,而不是等待操作完成,从而使得线程可以继续执行其他任务。
- 使用缓存:缓存可以大大提高程序的性能,尤其是在处理频繁读写的数据时。通过将数据读入内存缓存,可以减少对磁盘的访问,从而提高程序的速度。
- 使用预读:预读可以提前读取数据,从而减少后续读取操作的时间。当程序读取大量数据时,可以考虑使用预读技术。
下面是一个使用 Java 实现的简单 IO 密集型程序的例子:
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class IoIntensiveProgram {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(10);
File file = new File("data.txt");
List<String> lines = Files.readAllLines(Paths.get(file.getAbsolutePath()), StandardCharsets.UTF_8);
for (String line : lines) {
executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
processLine(line);
return null;
}
});
}
executorService.shutdown();
}
private static void processLine(String line) throws IOException {
// ...
}
}
如果把上面的例子改成 CPU 密集型程序,只需要把 processLine
函数内部改成大量的计算操作,就可以得到一个 CPU 密集型程序。
下面是一个使用 Java 实现的简单 CPU 密集型程序的例子:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CpuIntensiveProgram {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10000; i++) {
executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
processData();
return null;
}
});
}
executorService.shutdown();
}
private static void processData() {
double result = 0;
for (int i = 0; i < 1000000; i++) {
result += Math.sin(i);
}
}
}
当然,上面只是一个简单的例子,实际上你可以根据你的需求来实现 CPU 密集型程序。
总的来说,如果你的程序更多地依赖于 I/O 操作,那么应该使用 IO 密集型程序;如果你的程序更多地依赖于 CPU 计算,那么应该使用 CPU 密集型程序。希望以上内容能帮助你更好地设计你的程序。
下面分享一些笔者最近工作中实际遇到的问题与经验。
起因
笔者最近的一项工作是对大批数据进行分析并将分析后的数据生成离线图表文件。主要存在以下问题和难点:
- 数据包含近四百辆车过去近三年的历史数据,总的数据量很大,总计有30多G的Excel
- 由于单个Excel数据量有限,所以每辆车的数据拆分成多个Excel,分析前需要先将数据进行整合
- 具体分析要求相关技术人员已经给出,但是仍涉及一些计算,例如求电压差,计算电池SOC等指标
- 每辆车需要进行多维度分析,因此最终需要生成多个离线图表文件
- 数据可能存在一些缺失值和异常值,需要进行判断和筛选过滤
- 数据的体量和时间跨度都很大,分析人员要求可以按时间范围筛选和查看数据
- 时间要求比较紧急,因为需要分析事故原因和追查隐患,必须尽快完成
首先在技术选型上笔者主要是python
+ plotly
+ numpy
+ pandas
。plotly
可以生成动态的离线图表文件,pandas
是python
生态最常用的数据分析工具。
最开始采取的是最朴素的方案,直接遍历每辆车的数据,挨个生成文件。但是发现每辆车的处理时间都超过10分钟有的甚至需要半小时,时间上不可接受。最终的方案就是采用多线程同时处理多个文件。但如何确定最佳的线程数其实也踩了很多坑。
线程数如何设计?
如何针对不同类型的程序确定线程数网上有很多介绍,概括来说就是:
- 对于CPU密集型程序,线程数公式:
,+1是为了预防有个线程被阻塞,cpu可以调用其他线程。
- 对于IO密集型程序,线程数的确定有两种说法:
最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时);(一般为2*CPU核心数)
最佳线程数 = CPU核心数/(1-阻塞系数),阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
从实际应用来看,线程数的多少没有确定公式可遵循,具体得看设备的硬件情况和程序要求。下面两张图分别是线程数=2*CPU数和线程数 = CPU数/(1-阻塞系数)的系统监控图。当线程数较小时,没有充分发挥系统能力,当然这不会有什么问题,只是可能耗费较多时间。但是当线程数过多时,系统内存占用急速上升,资源不够导致程序奔溃。
因此,线程数的确定并不是一个固定的数值,而是需要根据系统的实际情况进行调整的。因此,最好的方法是通过测试和实验来确定线程数
- 对于 CPU 密集型程序,线程数的确定主要考虑 CPU 核心数和 CPU 使用率。一般情况下,线程数可以等于 CPU 核心数,这样可以充分利用 CPU 的性能。当然,线程数也可以大于 CPU 核心数,但这样会增加 CPU 上下文切换的代价,从而影响程序的性能。因此,通常需要在实际测试的基础上确定线程数。
- 对于 IO 密集型程序,线程数的确定主要考虑两个因素:I/O 设备的数量和最大并发数。一般情况下,线程数可以等于 I/O 设备的数量或者是 I/O 设备最大并发数,这取决于应用程序的需求和系统的能力。
主要代码
整合Excel
import multiprocessing
import threading
import os
import glob
import numpy as np
import pandas as pd
cpu_num = multiprocessing.cpu_count()
block_coef = 0.9
thread_num = int(np.ceil(cpu_num/(1-block_coef)))
def merge(files: list = [], is_old=True):
if files is None or files == []:
return
file_name = os.path.basename(os.path.dirname(files[0]))
if is_old:
file_name = file_name + '-1.csv'
else:
file_name = file_name + '-2.csv'
df = pd.DataFrame()
for f in files:
print(f)
tmp = pd.read_excel(f)
df = pd.concat([df, tmp])
df.to_csv('./output/'+file_name, index=None, encoding='utf-8')
def resolve(batch: list = []):
"""
每个线程负责处理一批数据
"""
for b in batch:
merge(b)
if __name__ == '__main__':
dirs = glob.glob('./raw_data/*')
file_list = []
for dir in dirs:
file_list.append([os.path.join(dir, i) for i in os.listdir(dir)])
file_list = sorted(file_list)
batch_size = int(len(dirs) / thread_num)
split = int(np.ceil(len(dirs) / batch_size))
for i in range(split):
threading.Thread(target=resolve,
args=(file_list[i * batch_size:(i + 1) * batch_size],)).start()
压差计算
df = util.get_dataset(car_id)
# 解析时间格式字段
df["上报时间"] = pd.to_datetime(df["上报时间"])
df['压差'] = df['电池单体电压最高值'] - df['电池单体电压最低值']
区间频率统计
统计不同区间内的数据量
import pandas as pd
import numpy as np
pd.set_option('display.max_rows', None)
df = pd.read_csv(r"xxx.csv")
df["上报时间"] = pd.to_datetime(df["上报时间"])
df.set_index(df["上报时间"], inplace=True)
df['压差'] = (df['电池单体电压最高值'] - df['电池单体电压最低值'])*1000
bins = [-1e-10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 180, 200, np.inf]
result = pd.cut(df['压差'], bins=bins,right=True)
# 全量数据的区间统计
result.value_counts()
# 按月区间统计
d = pd.cut(data['压差'], bins=bins,right=True).groupby([data.index.year, data.index.month]).value_counts()
d.index.names = ['year', 'month', 'group']
d = pd.DataFrame(d)
# 查询2022年6月的数据情况
d.loc[2022].loc[6]['压差']
其他
绘制图表,控制显示或者保存
import os
import plotly
import plotly.graph_objects as go
def plot_fig(data, x="上报时间", y=None, y2=None, title='', mode='lines', to_html=False, carid=''):
fig = go.Figure()
# fig = px.line(data, x=x, y=y)
if type(y) == str:
fig.add_trace(go.Scatter(x=data[x], y=data[y], name=y, mode=mode))
if y == '压差':
fig.update_layout(yaxis=dict(range=[data[y].min(), 0.5]))
else:
for col in y:
fig.add_trace(go.Scatter(x=data[x], y=data[col], name=col, mode=mode))
if y2 is not None:
# 设置双y轴
fig.add_trace(go.Scatter(x=data[x], y=data[y2], xaxis="x", yaxis="y2", name=y2))
fig.update_layout(yaxis2=dict(anchor='x', overlaying='y', side='right'))
fig.update_xaxes(
rangeslider_visible=True,
rangeselector=dict(
buttons=list(
[
dict(count=1, label="1月", step="month", stepmode="backward"),
dict(count=6, label="半年", step="month", stepmode="backward"),
dict(count=1, label="今年", step="year", stepmode="todate"),
dict(count=1, label="1年", step="year", stepmode="backward"),
dict(step="all"),
]
)
),
type="date",
tickformat='%Y-%m-%d %H:%M:%S'
)
fig.update_layout(width=1500, height=600, title=carid + "-" + title)
if to_html:
parent_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'output', carid)
print(parent_dir)
if not os.path.exists(parent_dir):
os.mkdir(parent_dir)
filename = parent_dir + '/' + title + '.html'
# plotly.io.write_html(fig, filename)
# 关闭自动用浏览器打开,真坑爹,浏览器莫名其妙打开
plotly.offline.plot(fig, filename=filename, auto_open=False)
else:
fig.show()
标签:df,程序,线程,IO,import,密集型,CPU
From: https://blog.51cto.com/u_12966357/8173781