前言
最近在搞一个根因分析相关的项目,内部用到一个原因模拟器,自动生成各种问题可能导致的告警现象, 算是大数据的边缘,一提到大数据,数据量就大了, 项目大概需要模拟3000+个根源节点,连边关系大概16000+,然后随机游走生成1600k条可能的告警现象。 准备用这1600k的告警数据进行深度学习。接下来问题就出现了, 这些数据准备先放在pandas的dataframe数据结构内, 然后遇到效率问题 (处理慢)和 空间问题(数据量过大oom),表现为:数据存储慢,数据加载到内存困难。 这里就记录下处理超大数据集用到的方法,以供大家参考。
废话一下: 如果是小数据集无需考虑这些, 直接放在csv 或者dataframe 然后处理就行, 没有任何阻碍。
首先是效率问题
简单总结下用到的方法:
一般数据读取,加载和保存在现有的dataframe上没有太好的解决办法,但是可以充分利用现在服务器的高性能多核的特性(利用所有IDLE CPU内核),当然这里有有点也有缺点,需要看哪种方式更适合数据处理的需求:
- A) Pandas Dataframe
- B) Pandas Dataframe + 使用pandas chunksize, engine, iterator and memory_map 参数节省内存
- C) Dask Dataframe
- D) Datatable Library
- E) Modin-Dask/Ray Library
- F) 其他并行处理库 : swifter, pandaral-lel, dispy, multiprocessing, joblib and many more.
- G) 使用各种数据格式保存和加载数据
除此之外,还有很多使用二进制格式存储数据,降低数据内存消耗,提升处理速度在 G章节中. 包括:
- csv (default)
- feather
- hdf
- msgpack
- parquet
- pickle
- jay
- numpy array(.npy format) - for numerical data
展示原始数据加载
In [1]:
# import the library
import gc
import numpy as np
import pandas as pd
import os
import time
print(f'numpy version: {np.__version__}')
print(f'pandas version: {pd.__version__}')
numpy version: 1.18.5
pandas version: 1.1.3
这里演示 nfl_big_data_bowl_2021 数据集 (~2.2 Gb in size).
In [2]:
%%time
path = "/kaggle/input/nfl-big-data-bowl-2021/"
# I am using a function to avoid any kind of additional unnecassary variable - helps in RAM saving
# As this creates a new scope for the intermediate variables
# and removes them automatically when the interpreter exits the function.
# Note: I am using 50e5 rows because of memory limitation on Kaggle platform
def load_data():
weekly_data = [csv for csv in os.listdir(path) if csv[:5] == "week1"]
# selecting data for all the weeks having "1" in week name and using 20e5 rows due to the memory limitation of Kaggle notebook.
# As only 16 gigs is allowed to use.
dataframe = pd.DataFrame()
for files in weekly_data:
df = pd.read_csv(filepath_or_buffer = "/kaggle/input/nfl-big-data-bowl-2021/%s"%files, nrows=3000000)
dataframe = pd.concat([dataframe,df])
return dataframe[:]
dataframe = load_data()
CPU times: user 46.3 s, sys: 17.5 s, total: 1min 3s
Wall time: 1min 7s
A) 保存和加载数据的一般方法-使用Pandas CSV
- 保存为CSV格式
In [3]:
%%time
# Saving data in CSV Format
dataframe.to_csv("/kaggle/working/csv_data.csv",index = False)
CPU times: user 3min 2s, sys: 2.5 s, total: 3min 5s
Wall time: 3min 5s
- 读取CSV数据
In [4]:
%%time
# Reading data in CSV Format
csv_data = pd.read_csv("/kaggle/working/csv_data.csv")
CPU times: user 28.4 s, sys: 3.07 s, total: 31.5 s
Wall time: 31.5 s
In [5]:
del dataframe #we won't use this dataframe so i am deleting those to free up some space.
del csv_data
gc.collect() # collect garbage value from the memory
Out[5]:
20
数据保存和数据加载比较耗时。接下来就是优化
B) 使用 pandas 的chunksize, engine, iterator and memory_map 参数
以下参数证明有助于提高性能。. 可以尝试.
- chunksize: 有时,在加载/读取数据时,由于系统的内存限制(有限的RAM),我们可能会遇到MemoryError。好消息是我们可以在熊猫身上把更大的数据文件读成更小的块。Pandas read_csv有一个内置参数“chunksize”,它以块的形式读取数据。这意味着,在任何单一时间内,为了适应本地内存,要读入数据帧的行数。使用chunksize将导致用于迭代的TextFileReader对象。我们可以对每个块执行操作,并将每个块连接起来,形成如下所示的数据帧,以适应本地内存。
- engine: “c”和“python”引擎。这是一个可选参数。这些名称指示了编写解析器的语言。Pandas尽可能使用C解析器(指定为engine=' C '),但如果指定了C不支持的选项,则可能退回到Python。
主要目的是在我们可以使用C引擎时使用C引擎,因为C引擎更快。更多信息: https://stackoverflow.com/questions/52774459/engines-in-python-pandas-read-csv
- iterator: 随着时间产生一系列值(TextfileReader对象),而不是一次计算它们。节省内存。
- memory_map: 将文件对象直接映射到内存中,并直接从那里访问数据。
In [6]:
%%time
# let's define a function to avoid creating unnecessary variable
def load_data_chunk(filepath, chunksize =500000, iterator = True, engine = 'c', memory_map = True):
chunk_df = [x for x in pd.read_csv(filepath_or_buffer = filepath,nrows=3000000, chunksize=chunksize, iterator=iterator, engine=engine, memory_map= memory_map)]
chunk_data = pd.concat(chunk_df,sort=False)
return chunk_data
chunk_data = load_data_chunk(filepath= "/kaggle/working/csv_data.csv")
CPU times: user 9.41 s, sys: 456 ms, total: 9.87 s
Wall time: 9.87 s
通过这种方式,我们可以消除MemoryError,但它有一个代价。有时候执行时间会比一般方法多一点。但事实证明,它比默认的pandas更快。read_csv方法。
In [7]:
del chunk_data # we won't use this dataframe so i am deleting those to free up some space.
gc.collect() # collect garbage value from the memory
Out[7]:
20
C) 使用Dask DataFrame
简单来说,dask基本上是一个并行计算库,可以在不将整个csv加载到内存的情况下分析csv数据,并通过利用多核cpu(空闲cpu内核)和从磁盘有效地流数据在单机上执行更快的计算。
Dask DataFrame由许多较小的Pandas DataFrame组成,沿着索引分割。这些Pandas dataframe可能存在于磁盘上,用于在一台机器上进行大于内存的计算,也可能存在于集群中的许多不同机器上。
dask或dask dataframe最好的部分是可以使用这个库,就像使用pandas库一样,因为dusk API是建立在pandas API之上的。
可以在以下情况下使用Dask库::
- 操纵大型数据集,即使这些数据集不适合内存
- 通过使用多核来加速长时间计算
- 使用标准Pandas操作(如分组、连接和时间序列计算)对大型数据集进行分布式计算
注:数据转换不能利用并行计算的好处。
In [8]:
# !pip install dask[complete] --> for complete installation
!pip install dask # installing dask
# import dask dataframe
import dask.dataframe as dd
Requirement already satisfied: dask in /opt/conda/lib/python3.7/site-packages (2.30.0)
Requirement already satisfied: pyyaml in /opt/conda/lib/python3.7/site-packages (from dask) (5.3.1)
注意:从现在开始,将使用Dask库代替Pandas库进行演示。
正如已经提到的,Dask API是Pandas API的一个子集,所以可以像在Pandas中一样使用相同的方法。现在将使用Dask库复制数据保存和加载/读取(就像上面使用Pandas一样)。
准备数据:用“dd”代替“pd”。除此之外,下面的一切都是一样的。In [9]:
%%time
# Reading data in CSV Format using Dask Dataframe
dask_csv_data = dd.read_csv(urlpath= "/kaggle/working/csv_data.csv")
dask_csv_data.head(3)
CPU times: user 1.59 s, sys: 191 ms, total: 1.78 s
Wall time: 1.79 s
Out[9]:
time | x | y | s | a | dis | o | dir | event | nflId | displayName | jerseyNumber | position | frameId | team | gameId | playId | playDirection | route | |
0 | 2018-11-16T01:24:15.799Z | 77.97 | 18.61 | 0.0 | 0.0 | 0.0 | 109.88 | 289.98 | None | 497236.0 | Jimmy Graham | 80.0 | TE | 1 | away | 2018111500 | 90 | right | CROSS |
1 | 2018-11-16T01:24:15.799Z | 79.41 | 23.71 | 0.0 | 0.0 | 0.0 | 90.31 | 159.68 | None | 2506363.0 | Aaron Rodgers | 12.0 | QB | 1 | away | 2018111500 | 90 | right | NaN |
2 | 2018-11-16T01:24:15.799Z | 85.05 | 22.71 | 0.0 | 0.0 | 0.0 | 288.53 | 141.92 | None | 2532966.0 | Bobby Wagner | 54.0 | MLB | 1 | home | 2018111500 | 90 | right | NaN |
哇!,可以看到上面的方法使用Dask Dataframe是非常非常快的。可以应用一些快速的数据操作方法,如下所示:
In [10]:
dask_csv_data.columns
Out[10]:
Index(['time', 'x', 'y', 's', 'a', 'dis', 'o', 'dir', 'event', 'nflId',
'displayName', 'jerseyNumber', 'position', 'frameId', 'team', 'gameId',
'playId', 'playDirection', 'route'],
dtype='object')
In [11]:
dask_csv_data.shape
Out[11]:
(Delayed('int-d7b776a2-ca6d-4108-8ddb-841882ea67ec'), 19)
正如在上面所看到的,无法获得数据框架中的总行数。Dask延迟函数修饰了你的函数,使它们可以懒惰地运行。它不是立即执行函数,而是延迟执行,将函数及其参数放入任务图中。
因此,可以通过调用.compute()方法或dask.compute(…)函数将任何dask集合转换为具体值。这个函数将阻塞,直到计算完成,直接从延迟dask集合到本地内存中的具体值。
In [12]:
del dask_csv_data
gc.collect()
Out[12]:
75
In [13]:
%%time
# Reading data in CSV Format using Dask Dataframe
dask_csv_data = dd.read_csv(urlpath= "/kaggle/working/csv_data.csv", blocksize= 64e6).compute() # 64MB chunks
dask_csv_data.head(3)
CPU times: user 40.4 s, sys: 4.14 s, total: 44.5 s
Wall time: 19.7 s
Out[13]:
time | x | y | s | a | dis | o | dir | event | nflId | displayName | jerseyNumber | position | frameId | team | gameId | playId | playDirection | route | |
0 | 2018-11-16T01:24:15.799Z | 77.97 | 18.61 | 0.0 | 0.0 | 0.0 | 109.88 | 289.98 | None | 497236.0 | Jimmy Graham | 80.0 | TE | 1 | away | 2018111500 | 90 | right | CROSS |
1 | 2018-11-16T01:24:15.799Z | 79.41 | 23.71 | 0.0 | 0.0 | 0.0 | 90.31 | 159.68 | None | 2506363.0 | Aaron Rodgers | 12.0 | QB | 1 | away | 2018111500 | 90 | right | NaN |
2 | 2018-11-16T01:24:15.799Z | 85.05 | 22.71 | 0.0 | 0.0 | 0.0 | 288.53 | 141.92 | None | 2532966.0 | Bobby Wagner | 54.0 | MLB | 1 | home | 2018111500 | 90 | right | NaN |
In [14]:
%%time
# Storing data in CSV Format - using dask
dask_csv_data.to_csv("/kaggle/working/dask_data.csv",index = False)
CPU times: user 3min 9s, sys: 2.04 s, total: 3min 11s
Wall time: 3min 10s
In [15]:
del dask_csv_data # free up some space.
gc.collect() # collect garbage value from the memory
Out[15]:
40
对于上述方法,可以得出结论,数据保存需要时间,但数据读取相对较快。您可以使用不同的数据集并检查结果。
D) 使用Datatable Library
datatable提供了快速和方便的文本(csv)文件解析。最近了解了这个库,它对进行数据分析很有帮助!得益于作者 author.
数据表的灵感来自R的数据。表,增强了数据的可访问性。它比pandas还快。数据表解析器
- 自动检测分隔符,标题,列类型,引用规则等。
- 读取文件,URL, shell,原始文本,档案,glob
- 提供最大速度的多线程文件读取
- 在读取大文件时包含一个进度指示器
In [16]:
!pip install datatable # install datatable
import datatable as dt # import datatable
print(f'Version: {dt.__version__}')
Collecting datatable
Downloading datatable-0.11.0-cp37-cp37m-manylinux2010_x86_64.whl (83.9 MB)
|████████████████████████████████| 83.9 MB 128 kB/s eta 0:00:01
Installing collected packages: datatable
Successfully installed datatable-0.11.0
Version: 0.11.0
使用fread()函数创建一个帧,它既强大又非常快。它可以自动检测大多数文本文件的解析参数,从.zip存档或url加载数据,读取Excel文件,等等。
In [17]:
%%time
frame = dt.fread("csv_data.csv")
CPU times: user 14.7 s, sys: 2.33 s, total: 17 s
Wall time: 4.28 s
In [18]:
frame.head(2)
Out[18]:
time | x | y | s | a | dis | o | dir | event | nflId | … | team | gameId | playId | playDirection | route | |
▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ||
0 | 2018-11-16T01:24:15.799Z | 77.97 | 18.61 | 0 | 0 | 0 | 109.88 | 289.98 | None | 497236 | … | away | 2018111500 | 90 | right | CROSS |
1 | 2018-11-16T01:24:15.799Z | 79.41 | 23.71 | 0 | 0 | 0 | 90.31 | 159.68 | None | 2.50636e+06 | … | away | 2018111500 | 90 | right |
2 rows × 19 columns
真是快得难以置信!可以轻松地将现有的Frame转换为numpy数组,分别使用to_numpy和to_pandas()方法将pandas DataFrame转换为numpy数组,并像在pandas中那样执行数据操作。
非常快速简单,对吧?
In [19]:
# %%time
# pandas_df = frame.to_pandas()
# pandas_df = dt.fread("csv_data.csv").to_pandas() # we can combine both the above steps
# pandas_df.head(2)
还可以将帧保存为磁盘上的.jay(二进制)格式,然后立即打开它,而不管数据大小。In [20]:
%%time
# save data to .jay format
frame.to_jay("data_table.jay")
CPU times: user 528 ms, sys: 3.13 s, total: 3.65 s
Wall time: 3.61 s
In [21]:
%%time
#read the .jay data foramt
dt.open("data_table.jay").head(2) # we would get Frame
# -- we can easily covert frames to pandas dataframe directly
# jay_data = dt.open("data_table.jay").to_pandas()
CPU times: user 1.58 ms, sys: 40 µs, total: 1.62 ms
Wall time: 1.63 ms
/opt/conda/lib/python3.7/site-packages/datatable/__init__.py:163: FutureWarning: Function dt.open() is deprecated since 0.10.0, and will be removed in version 1.0.
Please use dt.fread(file), or dt.Frame(file) instead
category=FutureWarning)
Out[21]:
time | x | y | s | a | dis | o | dir | event | nflId | … | team | gameId | playId | playDirection | route | |
▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ▪▪▪▪ | ||
0 | 2018-11-16T01:24:15.799Z | 77.97 | 18.61 | 0 | 0 | 0 | 109.88 | 289.98 | None | 497236 | … | away | 2018111500 | 90 | right | CROSS |
1 | 2018-11-16T01:24:15.799Z | 79.41 | 23.71 | 0 | 0 | 0 | 90.31 | 159.68 | None | 2.50636e+06 | … | away | 2018111500 | 90 | right |
2 rows × 19 columns
In [22]:
#del pandas_df# free up some space.
del frame
gc.collect() # collect garbage value from the memory
Out[22]:
40
从上面的结果,可以得出结论,datatable 真的很快。几秒钟内就能得到想要的结果。
E) 使用MODIN-DASK/RAY Library
Modin Library 是个人最喜欢的,经常使用jupyter笔记本上的数据表。在进程执行期间(无论是数据操作还是任何类型的操作),系统通常使用单个核心/cpu,其他都处于空闲状态。为什么不利用空闲内核呢?
主要的想法是,它确实利用了所有空闲的CPU核心,这有助于更快地计算。
- Modin是一个早期的DataFrame库,它包装pandas并透明地分发数据和计算,通过一行代码更改加速您的pandas工作流程。
- 用户不需要知道他们的系统有多少核,也不需要指定如何分配数据。
- 事实上,用户可以继续使用以前的pandas notebooks ,同时体验Modin的显著加速,即使是在一台机器上。只需要修改import语句。一旦更改了import语句,就可以像使用pandas一样使用Modin了,因为API与pandas是相同的。
Modin使用Ray或Dask提供了一种轻松的方式来加快您的熊猫笔记本、脚本和库。
- Ray是一个快速、简单的框架,用于构建和运行分布式应用程序。Ray利用Apache Arrow进行有效的数据处理,并为分布式计算提供任务和参与者抽象。
- Dask为分析提供了先进的并行性,实现了大规模的性能。Dask更专注于数据科学领域,提供更高级别的api,除了低级调度和集群管理框架之外,这些api反过来为Pandas、NumPy和scikit-learn提供部分替代。
- Dask和Ray之间的主要区别是*Dask使用集中式调度器来跨多个核共享工作,而Ray使用分布式自底向上的调度。
你可以通过这个链接了解更多的差异。
注意:Ray不支持Windows,因此无法安装modin[Ray]或modin[all]。
- pip install modin[all] #安装以上所有软件
- pip install modin[ray] #安装modin依赖项和ray运行在ray上
In [23]:
!pip install modin[dask] # Install Modin dependencies and Dask to run on Dask
# Initialize all cores- MODIN-DASK Engine
# import os - Uncomment this if in use.Already imported in top cell.
os.environ["MODIN_ENGINE"] = "dask"
from distributed import Client
client = Client()
import modin.pandas as mpd
Collecting modin[dask]
Downloading modin-0.8.2-py3-none-manylinux1_x86_64.whl (533 kB)
|████████████████████████████████| 533 kB 1.3 MB/s eta 0:00:01
Collecting pandas==1.1.4
Downloading pandas-1.1.4-cp37-cp37m-manylinux1_x86_64.whl (9.5 MB)
|████████████████████████████████| 9.5 MB 12.6 MB/s eta 0:00:01
Requirement already satisfied: packaging in /opt/conda/lib/python3.7/site-packages (from modin[dask]) (20.1)
Requirement already satisfied: distributed<=2.19.0,>=2.12.0; extra == "dask" in /opt/conda/lib/python3.7/site-packages (from modin[dask]) (2.14.0)
Collecting dask<=2.19.0,>=2.12.0; extra == "dask"
Downloading dask-2.19.0-py3-none-any.whl (824 kB)
|████████████████████████████████| 824 kB 18.8 MB/s eta 0:00:01
Requirement already satisfied: python-dateutil>=2.7.3 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (2.8.1)
Requirement already satisfied: numpy>=1.15.4 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (1.18.5)
Requirement already satisfied: pytz>=2017.2 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (2019.3)
Requirement already satisfied: six in /opt/conda/lib/python3.7/site-packages (from packaging->modin[dask]) (1.14.0)
Requirement already satisfied: pyparsing>=2.0.2 in /opt/conda/lib/python3.7/site-packages (from packaging->modin[dask]) (2.4.7)
Requirement already satisfied: msgpack>=0.6.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.0.0)
Requirement already satisfied: zict>=0.1.3 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (2.0.0)
Requirement already satisfied: pyyaml in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.3.1)
Requirement already satisfied: click>=6.6 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (7.1.1)
Requirement already satisfied: psutil>=5.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.7.0)
Requirement already satisfied: setuptools in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (46.1.3.post20200325)
Requirement already satisfied: tblib>=1.6.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.6.0)
Requirement already satisfied: cloudpickle>=0.2.2 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.3.0)
Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (2.1.0)
Requirement already satisfied: toolz>=0.8.2 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (0.10.0)
Requirement already satisfied: tornado>=5; python_version < "3.8" in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.0.2)
Requirement already satisfied: heapdict in /opt/conda/lib/python3.7/site-packages (from zict>=0.1.3->distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.0.1)
Installing collected packages: pandas, dask, modin
Attempting uninstall: pandas
Found existing installation: pandas 1.1.3
Uninstalling pandas-1.1.3:
Successfully uninstalled pandas-1.1.3
Attempting uninstall: dask
Found existing installation: dask 2.30.0
Uninstalling dask-2.30.0:
Successfully uninstalled dask-2.30.0
ERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.
We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.
dask-xgboost 0.1.11 requires xgboost<=0.90, but you'll have xgboost 1.2.1 which is incompatible.
Successfully installed dask-2.19.0 modin-0.8.2 pandas-1.1.4
/opt/conda/lib/python3.7/site-packages/distributed/client.py:1079: VersionMismatchWarning: Mismatched versions found
dask
+-----------------------+---------+
| | version |
+-----------------------+---------+
| client | 2.30.0 |
| scheduler | 2.30.0 |
| tcp://127.0.0.1:32795 | 2.19.0 |
| tcp://127.0.0.1:34019 | 2.19.0 |
| tcp://127.0.0.1:36555 | 2.19.0 |
| tcp://127.0.0.1:39631 | 2.19.0 |
+-----------------------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
UserWarning: The pandas version installed 1.1.3 does not match the supported pandas version in Modin 1.1.4. This may cause undesired side effects!
UserWarning: The Dask Engine for Modin is experimental.
因此,而不是pandas pd,我们只需要使用mpd,即modin。pandas 和其他格式/方法将与熊猫相同。不像在dask dataframe(参考上面的C节),有一些其他的方法(如.compute()和其他),需要进行各种操作或数据操作,但在这里,在modin我们不需要关心任何方法/格式,只是使用它像pandas 库的modin api是相同的pandas api。
In [24]:
%%time
# using default pandas method to prepare data
dataframe = load_data() # function load_data defined above
CPU times: user 51.6 s, sys: 15.9 s, total: 1min 7s
Wall time: 1min 5s
您可以看到上述默认pd方法的结果。注意CPU利用率和所花费的时间。下面我们将使用MODIN.PANDAS执行相同的函数。
In [40]:
%%time
# changing the default pandas(pd) method by modin.pandas(mpd)
path = "/kaggle/input/nfl-big-data-bowl-2021/"
# I am using a function to avoid any kind of additional unnecassary variable - helps in RAM saving
# As this creates a new scope for the intermediate variables
# and removes them automatically when the interpreter exits the function.
def load_data_modin():
weekly_data = [csv for csv in os.listdir(path) if csv[:5] == "week1"]
# selecting data for all the weeks having "1" in week name due to the memory limitation of Kaggle notebook.
# As only 16 gigs is allowed to use.
dataframe = mpd.DataFrame()
for files in weekly_data:
df = mpd.read_csv(filepath_or_buffer = "/kaggle/input/nfl-big-data-bowl-2021/%s"%files)
dataframe = mpd.concat([dataframe,df])
return dataframe
dataframe_mpd = load_data_modin()
UserWarning: Distributing <class 'NoneType'> object. This may take some time.
CPU times: user 2.65 s, sys: 1.33 s, total: 3.98 s
Wall time: 15 s
看到运行时间了吗?好哇!很快就得到了结果。
现在可以看到上述操作的结果,即使用modin.pandas(mpd)。请注意CPU利用率和时间。Modin使用所有的CPU内核以获得更快的结果。您可以在任务管理器中检查本地CPU核心性能,以交叉检查它是否工作。
结论:从上面的结果,即所花费的时间(大约是一般方法的1/4)和CPU利用率,我们可以得出结论,modin库的工作速度非常快。
使用modin.pandas保存和读取CSV数据
In [26]:
%%time
# Storing merged data in CSV Format
dataframe_mpd.to_csv("/kaggle/working/modin_csv_data.csv",index = False)
UserWarning: `DataFrame.to_csv` defaulting to pandas implementation.
To request implementation, send an email to feature_requests@modin.org.
UserWarning: Distributing <class 'pandas.core.frame.DataFrame'> object. This may take some time.
CPU times: user 4min 4s, sys: 22.5 s, total: 4min 26s
Wall time: 4min 46s
In [27]:
%%time
# Reading data in CSV Format
csv_data = mpd.read_csv("/kaggle/working/csv_data.csv")
CPU times: user 1.99 s, sys: 1.37 s, total: 3.36 s
Wall time: 18.6 s
In [28]:
del csv_data
gc.collect()
Out[28]:
302
F) 其他并行处理库
尝试其他一些并行处理库,看看它们是否有帮助。
- SWIFTER: SWIFTER是一个包,它试图以最快的可用方法有效地将任何函数应用到Pandas数据帧或系列对象。它与Pandas对象集成在一起,因此我们只能对Pandas对象(如Data Frame或Series)使用这个包。 链接
- Pandaral-lel: Pandaral·lel的想法是将您的pandas计算分布在计算机上所有可用的cpu上,以获得显著的速度提升。Pandaral-lel的限制:Pandaral-lel仅适用于pandas和windows pc,只有当Python会话(Python, ipython, jupyter notebook, jupyter lab)从 Windows Subsystem for Linux (WSL). 链接.
- Ipyparallel: 这是另一个多处理和任务分配系统,专门用于跨集群并行执行Jupyter笔记本代码。可以在任何Python语句中使用前缀%px来自动并行化它。 链接.
- Dispy: Dispy是一个通用的,全面的,易于使用的框架,用于创建和使用计算集群,在单个机器(SMP)的多个处理器上并行执行计算,在集群,网格或云中的许多机器中。它允许将整个Python程序或单独的函数分布到机器集群中进行并行执行。它使用平台原生机制进行网络通信,以保持速度和效率,因此Linux、MacOS和Windows机器工作得同样好。
- Multiprocessing: 可以使用多处理包来并行处理输入文件,以加快处理速度。
- Joblib: Joblib提供了一个简单的helper类来使用多处理编写并行for循环。其核心思想是编写要作为生成器表达式执行的代码,并将其转换为并行计算。Joblib基本上使用“Parallel”类在CPU上执行并行任务。
G) 保存和加载数据使用各种数据格式
上述方法采用csv格式读取/加载数据,增强了数据操作/计算能力,在很大程度上减少了数据加载到内存的等待时间。除此之外,将数据集保存为各种数据格式始终是最佳实践,这有助于更快的数据操作和消耗更少的内存。有许多二进制格式可以在磁盘上存储数据,其中许多都是pandas支持的。
可以将数据转换为以下数据格式:
- csv (default)
- feather
- hdf
- msgpack
- parquet
- pickle
- jay
- numpy array(.npy format) - for numerical data
搞起
G1) csv format
CSV是一个以逗号分隔的值文件,允许数据以表格格式保存。从一开始就使用的默认标准文本文件格式,大多数数据都以这种格式可用。
A.保存和加载CSV使用PANDAS
In [29]:
%%time
# We would be using pandas library
save = {} # creating an dict obj to store the data saving time
read = {} # creating an dict obj to save the data reading time
# load data
# dataframe_pandas = load_data() and change to csv
# pandas "dataframe" is already defined above so we would be using this here.
# save the data
t1 = time.time() #initial time
dataframe.to_csv("df_csv.csv", index = False) # save df in .csv format
save["csv_pd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_csv = pd.read_csv("df_csv.csv")
read["csv_pd"] = time.time()-t2 # append the data to "read" dict
del dataframe
CPU times: user 4min 1s, sys: 13.6 s, total: 4min 14s
Wall time: 4min 2s
B. 使用Modin PANDAS保存和加载CSV
In [30]:
%%time
# We would be using modin.pandas library
# load data
# dataframe_pandas = load_data_modin() and change to csv
# pandas "dataframe_mpd" is already defined above so we would be using that here.
#save the data
t1 = time.time() #initial time
dataframe_mpd.to_csv("df_csv.csv", index = False) # save dataframe in .csv format
save["csv_mpd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_csv_mpd = mpd.read_csv("df_csv.csv")
read["csv_mpd"] = time.time()-t2 # append the data to "read" dict
del dataframe_mpd
del df_csv_mpd # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
UserWarning: `DataFrame.to_csv` defaulting to pandas implementation.
CPU times: user 4min 4s, sys: 28.6 s, total: 4min 33s
Wall time: 5min 4s
Out[30]:
476
G2) Feather Format
Feather %20和%20R.)是一种快速、轻量级、易于使用的二进制文件格式,用于存储数据帧。“Feather”为数据帧提供了二进制柱状序列化,旨在有效地读取和写入数据帧。它使用Apache Arrow柱状内存规范来表示磁盘上的二进制数据。它有几个具体的设计目标:
- 轻量级,最小的API:尽可能简单地将数据帧放入和取出内存
- 语言不可知:无论是用Python还是R代码编写的Feather文件都是一样的。其他语言也可以读写Feather文件。
- 高读写性能。只要可能,Feather操作应该与本地磁盘性能绑定
链接.
In [31]:
%%time
#save the data
t1 = time.time() #initial time
df_csv.to_feather("df_csv") # save df in feather format
save["feather_pd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_feather = pd.read_feather("df_csv", use_threads = True)
read["feather_pd"] = time.time()-t2 # append the data to "read" dict
del df_feather # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 8.64 s, sys: 3.63 s, total: 12.3 s
Wall time: 11.9 s
Out[31]:
113
G3) hdf Format
Hierarchical Data Format (HDF) 是一组文件格式(HDF4, HDF5),用于存储和组织大量数据。它支持无限多种数据类型,专为灵活高效的I/O和大容量复杂数据而设计。它允许访问时间和存储空间优化。有一个缺点,注意到,在读取数据时,它最初消耗了大量的内存,然后重置到正常状态,这可能会导致MemoryError或可以打破内核,如果没有足够的内存。
In [32]:
%%time
#save the data
t1 = time.time() #initial time
df_csv.to_hdf("df_csv.h5", key='hdf', mode='w') # save df in hdf format
save["hdf_pd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_hdf = pd.read_hdf("df_csv.h5", key = 'hdf')
read["hdf_pd"] = time.time()-t2 # append the data to "read" dict
del df_hdf # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
PerformanceWarning:
your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block2_values] [items->Index(['time', 'event', 'displayName', 'position', 'team', 'playDirection',
'route'],
dtype='object')]
CPU times: user 20.9 s, sys: 11 s, total: 32 s
Wall time: 34.2 s
Out[32]:
0
G4) Parquet Format
Parquet 是一种开源文件格式,适用于Hadoop生态系统中的任何项目。Apache Parquet是为高效、高性能的平柱式数据存储格式而设计的,与基于行的文件(如CSV或TSV文件)相比。
Parquet使用记录分解和组装算法,这优于简单的嵌套名称空间的扁平化。Parquet经过优化,可大量处理复杂数据,并具有高效数据压缩和编码类型的不同方法。这种方法特别适合那些需要从大型表中读取某些列的查询。Parquet只能读取所需的列,因此极大地减少了IO。
链接
In [33]:
%%time
#save the data
t1 = time.time() #initial time
df_csv.to_parquet("df_csv.parquet.gzip", compression='gzip') # save df in parquet format
save["parquet_pd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_parquet = pd.read_parquet("df_csv.parquet.gzip")
read["parquet_pd"] = time.time()-t2 # append the data to "read" dict
del df_parquet # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 30.7 s, sys: 3.38 s, total: 34.1 s
Wall time: 35.6 s
Out[33]:
2
G5) Pickle Format
可以将数据文件保存为pickle格式,该格式用于序列化和反序列化对象。将任何类型的python对象(list, dict等)转换为字节流(0和1)的过程称为pickle或序列化或扁平化或编组。我们可以通过一个称为unpickling的过程将字节流(通过pickle生成)转换回对象。
In [34]:
%%time
#save the data
t1 = time.time() #initial time
df_csv.to_pickle("df_csv.pkl") # save df in pkl format
save["pickle_pd"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_pickle = pd.read_pickle("df_csv.pkl")
read["pickle_pd"] = time.time()-t2 # append the data to "read" dict
del df_pickle # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 12.4 s, sys: 5.39 s, total: 17.8 s
Wall time: 16.9 s
Out[34]:
24
G6) Msgpack Format
MessagePack是一种有效的二进制序列化格式。它允许您在JSON等多种语言之间交换数据。但是它更快更小。小整数被编码成一个字节,而典型的短字符串除了字符串本身外只需要一个额外的字节。这个包提供了用于读写MessagePack数据的CPython绑定(适用于python)。
将使用 pandas-msgpack 库.
pandas_msgpack模块提供了从pandas到msgpack库的接口。这是一种轻量级的可移植二进制格式,类似于二进制JSON,非常节省空间,并且在写入(序列化)和读取(反序列化)方面都提供了良好的性能。
In [ ]:
# Note: pandas-msgpack is not working in kaggle notebook -- facing some installation error
# but for me it does work in local machine.
# For the time being, i am leaving the code as it is.
!pip install pandas-msgpack -U
# to avoid error we need specific pandas version
!pip install pandas-compat
# !pip uninstall --yes pandas
!pip install --upgrade pandas
from pandas_msgpack import to_msgpack, read_msgpack
In [ ]:
%%time
#save the data
t1 = time.time() #initial time
to_msgpack('msgpack_dataframe.msg', df_csv)
save["msgpack"] = time.time()-t1 # append the data to "save" dict
#load the saved data
t2 = time.time()
df_msgpack = read_msgpack('msgpack_dataframe.msg')
read["msgpack"] = time.time()-t2 # append the data to "read" dict
del df_msgpack # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
G7) Jay Format
正如已经讨论了上面的数据表(参考上面的D节了解更多信息)。数据表功能强大,速度极快。我们可以使用.jay格式进行更快的数据操作。
In [35]:
%%time
print(f'Version: {dt.__version__}') #datatable is already installed above
# save to .jay format
t1 = time.time() #initial time
frame = dt.fread("df_csv.csv") #store data in datatable frame
frame.to_jay("data_table.jay") # change to .jay format
save["jay"] = time.time()-t1 # append the data to "save" dict
t2 = time.time()
jay_data = dt.open("data_table.jay")
read["jay"] = time.time()-t2 # append the data to "save" dict
# to covert to pandas dataframe directly
# jay_data = dt.open("data_table.jay").to_pandas() # because of limited memory - Commenting.
del frame
del jay_data # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
Version: 0.11.0
CPU times: user 16.4 s, sys: 3.1 s, total: 19.5 s
Wall time: 15.9 s
FutureWarning: Function dt.open() is deprecated since 0.10.0, and will be removed in version 1.0.
Please use dt.fread(file), or dt.Frame(file) instead
Out[35]:
20
G7) Numpy Format- For numerical dataset
无论何时处理数字数据,使用Numpy文件格式总是一个很好的做法,因为它们是快速和容易处理的。它包含一个以NumPy (NPY)文件格式保存的数组。NPY文件存储了在任何计算机上重建数组所需的所有信息,包括dtype和形状信息。 链接
In [36]:
#select dataframe having numerical dtype (int/float)
numerical_df = df_csv.select_dtypes(include=['int64', 'float64'])
print(f"numerical_df memory info: {numerical_df.memory_usage().sum()/1024**2} MBs.")
numerical_df memory info: 871.2803955078125 MBs.
In [37]:
%%time
# save numerical data in .npy format
# np.save('data.npy', dataframe)
with open("data.npy", "wb") as file:
np.save(file, numerical_df)
CPU times: user 353 ms, sys: 1.89 s, total: 2.25 s
Wall time: 2.24 s
In [38]:
%%time
# read numerical data in .npy format
# data_array = np.load('data.npy')
# data = pd.DataFrame(data_array)
with open("data.npy", "rb") as file:
data = np.load(file)
df = pd.DataFrame(data)
CPU times: user 57.1 ms, sys: 886 ms, total: 943 ms
Wall time: 910 ms
从上面的结果中可以看到,NPY文件格式对于读取数字数据来说是非常快的。
In [39]:
result_df = pd.DataFrame({'SaveTime(in sec)':pd.Series(save),'ReadTime(in sec)':pd.Series(read)})
result_df
Out[39]:
SaveTime(in sec) | ReadTime(in sec) | |
csv_pd | 210.807918 | 31.154217 |
csv_mpd | 284.705031 | 19.505428 |
feather_pd | 8.463451 | 2.954259 |
hdf_pd | 17.917461 | 15.515713 |
parquet_pd | 26.297971 | 8.836816 |
pickle_pd | 9.718161 | 6.464217 |
jay | 15.559961 | 0.001492 |
结论
如果你的服务器有几个cpu,只有其中一个是完全专用于你的计算。为什么我们不使用空闲cpu呢?从上面的实验中,有一件事是清楚的,可以利用库来帮助并行处理。它们无疑提高了性能并节省了时间。除此之外,还可以使用其他方法/库/方法,因为单一的方法/库/方法永远不可能适用于所有场景。每种方法都有自己的缺点和优点。你可以尝试不同的数据集,看看上面的方法是否对你有益。有很多二进制格式来存储数据,这些格式消耗的内存空间更少,有助于更快地执行任务,因此我们必须利用各种数据格式的好处。
我希望上面的方法能帮助到你,就像它们帮助到我一样。
参考链接:https://www.gairuo.com/p/pandas-large-data
参考链接:https://www.gairuo.com/p/python-ray
参考链接:https://www.gairuo.com/p/python-modin
参考链接:https://www.gairuo.com/p/python-dask
参考链接:https://www.kaggle.com/code/vmanav/read-large-datasets-in-seconds/notebook
标签:python,dataframe,df,dask,time,csv,data,pandas From: https://blog.51cto.com/welcomeweb/6020131