首页 > 编程语言 >python pandas dataframe读取超大数据集

python pandas dataframe读取超大数据集

时间:2023-01-19 20:04:56浏览次数:101  
标签:python dataframe df dask time csv data pandas

python pandas dataframe读取超大数据集_数据集

前言

最近在搞一个根因分析相关的项目,内部用到一个原因模拟器,自动生成各种问题可能导致的告警现象, 算是大数据的边缘,一提到大数据,数据量就大了, 项目大概需要模拟3000+个根源节点,连边关系大概16000+,然后随机游走生成1600k条可能的告警现象。 准备用这1600k的告警数据进行深度学习。接下来问题就出现了, 这些数据准备先放在pandas的dataframe数据结构内, 然后遇到效率问题 (处理慢)和 空间问题(数据量过大oom),表现为:数据存储慢,数据加载到内存困难。 这里就记录下处理超大数据集用到的方法,以供大家参考。

废话一下: 如果是小数据集无需考虑这些, 直接放在csv 或者dataframe 然后处理就行, 没有任何阻碍。

python pandas dataframe读取超大数据集_速度块_02


首先是效率问题

简单总结下用到的方法:

一般数据读取,加载和保存在现有的dataframe上没有太好的解决办法,但是可以充分利用现在服务器的高性能多核的特性(利用所有IDLE CPU内核),当然这里有有点也有缺点,需要看哪种方式更适合数据处理的需求:

  • A) Pandas Dataframe
  • B) Pandas Dataframe + 使用pandas chunksize, engine, iterator and memory_map 参数节省内存
  • C) Dask Dataframe
  • D) Datatable Library
  • E) Modin-Dask/Ray Library
  • F) 其他并行处理库 : swifter, pandaral-lel, dispy, multiprocessing, joblib and many more.
  • G) 使用各种数据格式保存和加载数据

除此之外,还有很多使用二进制格式存储数据,降低数据内存消耗,提升处理速度在 G章节中. 包括:

  • csv (default)
  • feather
  • hdf
  • msgpack
  • parquet
  • pickle
  • jay
  • numpy array(.npy format) - for numerical data



展示原始数据加载

In [1]:

# import the library
import gc
import numpy as np
import pandas as pd
import os
import time
print(f'numpy version: {np.__version__}')
print(f'pandas version: {pd.__version__}')
numpy version: 1.18.5
pandas version: 1.1.3

这里演示 ​nfl_big_data_bowl_2021​ 数据集 (~2.2 Gb in size).

In [2]:

%%time
path = "/kaggle/input/nfl-big-data-bowl-2021/"
# I am using a function to avoid any kind of additional unnecassary variable - helps in RAM saving
# As this creates a new scope for the intermediate variables
# and removes them automatically when the interpreter exits the function.
# Note: I am using 50e5 rows because of memory limitation on Kaggle platform
def load_data():
weekly_data = [csv for csv in os.listdir(path) if csv[:5] == "week1"]
# selecting data for all the weeks having "1" in week name and using 20e5 rows due to the memory limitation of Kaggle notebook.
# As only 16 gigs is allowed to use.
dataframe = pd.DataFrame()
for files in weekly_data:
df = pd.read_csv(filepath_or_buffer = "/kaggle/input/nfl-big-data-bowl-2021/%s"%files, nrows=3000000)
dataframe = pd.concat([dataframe,df])
return dataframe[:]

dataframe = load_data()
CPU times: user 46.3 s, sys: 17.5 s, total: 1min 3s
Wall time: 1min 7s


A) 保存和加载数据的一般方法-使用Pandas CSV

  • 保存为CSV格式

In [3]:

%%time
# Saving data in CSV Format
dataframe.to_csv("/kaggle/working/csv_data.csv",index = False)
CPU times: user 3min 2s, sys: 2.5 s, total: 3min 5s
Wall time: 3min 5s
  • 读取CSV数据

In [4]:

%%time
# Reading data in CSV Format
csv_data = pd.read_csv("/kaggle/working/csv_data.csv")
CPU times: user 28.4 s, sys: 3.07 s, total: 31.5 s
Wall time: 31.5 s

In [5]:

del dataframe #we won't use this dataframe so i am deleting those to free up some space.
del csv_data
gc.collect() # collect garbage value from the memory

Out[5]:

20

数据保存和数据加载比较耗时。接下来就是优化


B) 使用 pandas 的chunksize, engine, iterator and memory_map 参数

以下参数证明有助于提高性能。. 可以尝试.

  • chunksize: 有时,在加载/读取数据时,由于系统的内存限制(有限的RAM),我们可能会遇到MemoryError。好消息是我们可以在熊猫身上把更大的数据文件读成更小的块。Pandas read_csv有一个内置参数“chunksize”,它以块的形式读取数据。这意味着,在任何单一时间内,为了适应本地内存,要读入数据帧的行数。使用chunksize将导致用于迭代的TextFileReader对象。我们可以对每个块执行操作,并将每个块连接起来,形成如下所示的数据帧,以适应本地内存。
  • engine: “c”和“python”引擎。这是一个可选参数。这些名称指示了编写解析器的语言。Pandas尽可能使用C解析器(指定为engine=' C '),但如果指定了C不支持的选项,则可能退回到Python。

主要目的是在我们可以使用C引擎时使用C引擎,因为C引擎更快。更多信息: ​​https://stackoverflow.com/questions/52774459/engines-in-python-pandas-read-csv​

  • iterator: 随着时间产生一系列值(TextfileReader对象),而不是一次计算它们。节省内存。
  • memory_map: 将文件对象直接映射到内存中,并直接从那里访问数据。

In [6]:

%%time
# let's define a function to avoid creating unnecessary variable
def load_data_chunk(filepath, chunksize =500000, iterator = True, engine = 'c', memory_map = True):
chunk_df = [x for x in pd.read_csv(filepath_or_buffer = filepath,nrows=3000000, chunksize=chunksize, iterator=iterator, engine=engine, memory_map= memory_map)]
chunk_data = pd.concat(chunk_df,sort=False)
return chunk_data

chunk_data = load_data_chunk(filepath= "/kaggle/working/csv_data.csv")
CPU times: user 9.41 s, sys: 456 ms, total: 9.87 s
Wall time: 9.87 s

通过这种方式,我们可以消除MemoryError,但它有一个代价。有时候执行时间会比一般方法多一点。但事实证明,它比默认的pandas更快。read_csv方法。

In [7]:

del chunk_data # we won't use this dataframe so i am deleting those to free up some space.
gc.collect() # collect garbage value from the memory

Out[7]:

20


C) 使用Dask DataFrame

简单来说,dask基本上是一个并行计算库,可以在不将整个csv加载到内存的情况下分析csv数据,并通过利用多核cpu(空闲cpu内核)和从磁盘有效地流数据在单机上执行更快的计算。

Dask DataFrame由许多较小的Pandas DataFrame组成,沿着索引分割。这些Pandas dataframe可能存在于磁盘上,用于在一台机器上进行大于内存的计算,也可能存在于集群中的许多不同机器上。

dask或dask dataframe最好的部分是可以使用这个库,就像使用pandas库一样,因为dusk API是建立在pandas API之上的。

可以在以下情况下使用Dask库::

  • 操纵大型数据集,即使这些数据集不适合内存
  • 通过使用多核来加速长时间计算
  • 使用标准Pandas操作(如分组、连接和时间序列计算)对大型数据集进行分布式计算

注:数据转换不能利用并行计算的好处。

In [8]:

# !pip install dask[complete] --> for complete installation
!pip install dask # installing dask
# import dask dataframe
import dask.dataframe as dd
Requirement already satisfied: dask in /opt/conda/lib/python3.7/site-packages (2.30.0)
Requirement already satisfied: pyyaml in /opt/conda/lib/python3.7/site-packages (from dask) (5.3.1)

注意:从现在开始,将使用Dask库代替Pandas库进行演示。

正如已经提到的,Dask API是Pandas API的一个子集,所以可以像在Pandas中一样使用相同的方法。现在将使用Dask库复制数据保存和加载/读取(就像上面使用Pandas一样)。

准备数据:用“dd”代替“pd”。除此之外,下面的一切都是一样的。
In [9]:

%%time
# Reading data in CSV Format using Dask Dataframe
dask_csv_data = dd.read_csv(urlpath= "/kaggle/working/csv_data.csv")
dask_csv_data.head(3)
CPU times: user 1.59 s, sys: 191 ms, total: 1.78 s
Wall time: 1.79 s

Out[9]:

time

x

y

s

a

dis

o

dir

event

nflId

displayName

jerseyNumber

position

frameId

team

gameId

playId

playDirection

route

0

2018-11-16T01:24:15.799Z

77.97

18.61

0.0

0.0

0.0

109.88

289.98

None

497236.0

Jimmy Graham

80.0

TE

1

away

2018111500

90

right

CROSS

1

2018-11-16T01:24:15.799Z

79.41

23.71

0.0

0.0

0.0

90.31

159.68

None

2506363.0

Aaron Rodgers

12.0

QB

1

away

2018111500

90

right

NaN

2

2018-11-16T01:24:15.799Z

85.05

22.71

0.0

0.0

0.0

288.53

141.92

None

2532966.0

Bobby Wagner

54.0

MLB

1

home

2018111500

90

right

NaN

哇!,可以看到上面的方法使用Dask Dataframe是非常非常快的。可以应用一些快速的数据操作方法,如下所示:

In [10]:

dask_csv_data.columns

Out[10]:

Index(['time', 'x', 'y', 's', 'a', 'dis', 'o', 'dir', 'event', 'nflId',
'displayName', 'jerseyNumber', 'position', 'frameId', 'team', 'gameId',
'playId', 'playDirection', 'route'],
dtype='object')

In [11]:

dask_csv_data.shape

Out[11]:

(Delayed('int-d7b776a2-ca6d-4108-8ddb-841882ea67ec'), 19)

正如在上面所看到的,无法获得数据框架中的总行数。Dask延迟函数修饰了你的函数,使它们可以懒惰地运行。它不是立即执行函数,而是延迟执行,将函数及其参数放入任务图中。

因此,可以通过调用.compute()方法或dask.compute(…)函数将任何dask集合转换为具体值。这个函数将阻塞,直到计算完成,直接从延迟dask集合到本地内存中的具体值。

In [12]:

del dask_csv_data
gc.collect()

Out[12]:

75

In [13]:

%%time
# Reading data in CSV Format using Dask Dataframe
dask_csv_data = dd.read_csv(urlpath= "/kaggle/working/csv_data.csv", blocksize= 64e6).compute() # 64MB chunks
dask_csv_data.head(3)
CPU times: user 40.4 s, sys: 4.14 s, total: 44.5 s
Wall time: 19.7 s

Out[13]:

time

x

y

s


a

dis

o

dir

event

nflId

displayName

jerseyNumber

position

frameId

team

gameId

playId

playDirection

route

0

2018-11-16T01:24:15.799Z

77.97

18.61

0.0

0.0

0.0

109.88

289.98

None

497236.0

Jimmy Graham

80.0

TE

1

away

2018111500

90

right

CROSS

1

2018-11-16T01:24:15.799Z

79.41

23.71

0.0

0.0

0.0

90.31

159.68

None

2506363.0

Aaron Rodgers

12.0

QB

1

away

2018111500

90

right

NaN

2

2018-11-16T01:24:15.799Z

85.05

22.71

0.0

0.0

0.0

288.53

141.92

None

2532966.0

Bobby Wagner

54.0

MLB

1

home

2018111500

90

right

NaN

In [14]:

%%time
# Storing data in CSV Format - using dask
dask_csv_data.to_csv("/kaggle/working/dask_data.csv",index = False)
CPU times: user 3min 9s, sys: 2.04 s, total: 3min 11s
Wall time: 3min 10s

In [15]:

del dask_csv_data # free up some space.
gc.collect() # collect garbage value from the memory

Out[15]:

40

对于上述方法,可以得出结论,数据保存需要时间,但数据读取相对较快。您可以使用不同的数据集并检查结果。


D) 使用Datatable Library

datatable提供了快速和方便的文本(csv)文件解析。最近了解了这个库,它对进行数据分析很有帮助!得益于作者 ​​author​​.

数据表的灵感来自R的数据。表,增强了数据的可访问性。它比pandas还快。
数据表解析器

  • 自动检测分隔符,标题,列类型,引用规则等。
  • 读取文件,URL, shell,原始文本,档案,glob
  • 提供最大速度的多线程文件读取
  • 在读取大文件时包含一个进度指示器

In [16]:

!pip install datatable # install datatable 
import datatable as dt # import datatable
print(f'Version: {dt.__version__}')
Collecting datatable
Downloading datatable-0.11.0-cp37-cp37m-manylinux2010_x86_64.whl (83.9 MB)
|████████████████████████████████| 83.9 MB 128 kB/s eta 0:00:01
Installing collected packages: datatable
Successfully installed datatable-0.11.0
Version: 0.11.0

使用fread()函数创建一个帧,它既强大又非常快。它可以自动检测大多数文本文件的解析参数,从.zip存档或url加载数据,读取Excel文件,等等。

In [17]:

%%time
frame = dt.fread("csv_data.csv")
CPU times: user 14.7 s, sys: 2.33 s, total: 17 s
Wall time: 4.28 s

In [18]:

frame.head(2)

Out[18]:

time

x

y

s

a

dis

o

dir

event

nflId

team

gameId

playId

playDirection

route

▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

0

2018-11-16T01:24:15.799Z

77.97

18.61

0

0

0

109.88

289.98

None

497236

away

2018111500

90

right

CROSS

1

2018-11-16T01:24:15.799Z

79.41

23.71

0

0

0

90.31

159.68

None

2.50636e+06

away

2018111500

90

right

2 rows × 19 columns


真是快得难以置信!可以轻松地将现有的Frame转换为numpy数组,分别使用to_numpyto_pandas()方法将pandas DataFrame转换为numpy数组,并像在pandas中那样执行数据操作。

非常快速简单,对吧?


In [19]:

# %%time
# pandas_df = frame.to_pandas()
# pandas_df = dt.fread("csv_data.csv").to_pandas() # we can combine both the above steps
# pandas_df.head(2)

还可以将帧保存为磁盘上的.jay(二进制)格式,然后立即打开它,而不管数据大小。
In [20]:

%%time
# save data to .jay format
frame.to_jay("data_table.jay")
CPU times: user 528 ms, sys: 3.13 s, total: 3.65 s
Wall time: 3.61 s

In [21]:

%%time
#read the .jay data foramt
dt.open("data_table.jay").head(2) # we would get Frame
# -- we can easily covert frames to pandas dataframe directly
# jay_data = dt.open("data_table.jay").to_pandas()
CPU times: user 1.58 ms, sys: 40 µs, total: 1.62 ms
Wall time: 1.63 ms
/opt/conda/lib/python3.7/site-packages/datatable/__init__.py:163: FutureWarning: Function dt.open() is deprecated since 0.10.0, and will be removed in version 1.0.
Please use dt.fread(file), or dt.Frame(file) instead
category=FutureWarning)

Out[21]:

time

x

y

s

a

dis

o

dir

event

nflId

team

gameId

playId

playDirection

route

▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪

▪▪▪▪▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

▪▪▪▪

0

2018-11-16T01:24:15.799Z

77.97

18.61

0

0

0

109.88

289.98

None

497236

away

2018111500

90

right

CROSS

1

2018-11-16T01:24:15.799Z

79.41

23.71

0

0

0

90.31

159.68

None

2.50636e+06

away

2018111500

90

right

2 rows × 19 columns

In [22]:

#del pandas_df# free up some space.
del frame
gc.collect() # collect garbage value from the memory

Out[22]:

40

从上面的结果,可以得出结论,datatable 真的很快。几秒钟内就能得到想要的结果。



E) 使用MODIN-DASK/RAY Library

​Modin Library​​ 是个人最喜欢的,经常使用jupyter笔记本上的数据表。在进程执行期间(无论是数据操作还是任何类型的操作),系统通常使用单个核心/cpu,其他都处于空闲状态。为什么不利用空闲内核呢?

主要的想法是,它确实利用了所有空闲的CPU核心,这有助于更快地计算。

  • Modin是一个早期的DataFrame库,它包装pandas并透明地分发数据和计算,通过一行代码更改加速您的pandas工作流程。
  • 用户不需要知道他们的系统有多少核,也不需要指定如何分配数据。
  • 事实上,用户可以继续使用以前的pandas notebooks ,同时体验Modin的显著加速,即使是在一台机器上。只需要修改import语句。一旦更改了import语句,就可以像使用pandas一样使用Modin了,因为API与pandas是相同的。

Modin使用Ray或Dask提供了一种轻松的方式来加快您的熊猫笔记本、脚本和库。

  • Ray是一个快速、简单的框架,用于构建和运行分布式应用程序。Ray利用Apache Arrow进行有效的数据处理,并为分布式计算提供任务和参与者抽象。
  • Dask为分析提供了先进的并行性,实现了大规模的性能。Dask更专注于数据科学领域,提供更高级别的api,除了低级调度和集群管理框架之外,这些api反过来为Pandas、NumPy和scikit-learn提供部分替代。


  • Dask和Ray之间的主要区别是*Dask使用集中式调度器来跨多个核共享工作,而Ray使用分布式自底向上的调度。

你可以通过这个链接了解更多的差异。

注意:Ray不支持Windows,因此无法安装modin[Ray]或modin[all]。

  • pip install modin[all]  #安装以上所有软件
  • pip install modin[ray]  #安装modin依赖项和ray运行在ray上

In [23]:

!pip install modin[dask] # Install Modin dependencies and Dask to run on Dask

# Initialize all cores- MODIN-DASK Engine
# import os - Uncomment this if in use.Already imported in top cell.
os.environ["MODIN_ENGINE"] = "dask"
from distributed import Client
client = Client()
import modin.pandas as mpd
Collecting modin[dask]
Downloading modin-0.8.2-py3-none-manylinux1_x86_64.whl (533 kB)
|████████████████████████████████| 533 kB 1.3 MB/s eta 0:00:01
Collecting pandas==1.1.4
Downloading pandas-1.1.4-cp37-cp37m-manylinux1_x86_64.whl (9.5 MB)
|████████████████████████████████| 9.5 MB 12.6 MB/s eta 0:00:01
Requirement already satisfied: packaging in /opt/conda/lib/python3.7/site-packages (from modin[dask]) (20.1)
Requirement already satisfied: distributed<=2.19.0,>=2.12.0; extra == "dask" in /opt/conda/lib/python3.7/site-packages (from modin[dask]) (2.14.0)
Collecting dask<=2.19.0,>=2.12.0; extra == "dask"
Downloading dask-2.19.0-py3-none-any.whl (824 kB)
|████████████████████████████████| 824 kB 18.8 MB/s eta 0:00:01
Requirement already satisfied: python-dateutil>=2.7.3 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (2.8.1)
Requirement already satisfied: numpy>=1.15.4 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (1.18.5)
Requirement already satisfied: pytz>=2017.2 in /opt/conda/lib/python3.7/site-packages (from pandas==1.1.4->modin[dask]) (2019.3)
Requirement already satisfied: six in /opt/conda/lib/python3.7/site-packages (from packaging->modin[dask]) (1.14.0)
Requirement already satisfied: pyparsing>=2.0.2 in /opt/conda/lib/python3.7/site-packages (from packaging->modin[dask]) (2.4.7)
Requirement already satisfied: msgpack>=0.6.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.0.0)
Requirement already satisfied: zict>=0.1.3 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (2.0.0)
Requirement already satisfied: pyyaml in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.3.1)
Requirement already satisfied: click>=6.6 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (7.1.1)
Requirement already satisfied: psutil>=5.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.7.0)
Requirement already satisfied: setuptools in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (46.1.3.post20200325)
Requirement already satisfied: tblib>=1.6.0 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.6.0)
Requirement already satisfied: cloudpickle>=0.2.2 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.3.0)
Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (2.1.0)
Requirement already satisfied: toolz>=0.8.2 in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (0.10.0)
Requirement already satisfied: tornado>=5; python_version < "3.8" in /opt/conda/lib/python3.7/site-packages (from distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (5.0.2)
Requirement already satisfied: heapdict in /opt/conda/lib/python3.7/site-packages (from zict>=0.1.3->distributed<=2.19.0,>=2.12.0; extra == "dask"->modin[dask]) (1.0.1)
Installing collected packages: pandas, dask, modin
Attempting uninstall: pandas
Found existing installation: pandas 1.1.3
Uninstalling pandas-1.1.3:
Successfully uninstalled pandas-1.1.3
Attempting uninstall: dask
Found existing installation: dask 2.30.0
Uninstalling dask-2.30.0:
Successfully uninstalled dask-2.30.0
ERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.

We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.

dask-xgboost 0.1.11 requires xgboost<=0.90, but you'll have xgboost 1.2.1 which is incompatible.
Successfully installed dask-2.19.0 modin-0.8.2 pandas-1.1.4
/opt/conda/lib/python3.7/site-packages/distributed/client.py:1079: VersionMismatchWarning: Mismatched versions found

dask
+-----------------------+---------+
| | version |
+-----------------------+---------+
| client | 2.30.0 |
| scheduler | 2.30.0 |
| tcp://127.0.0.1:32795 | 2.19.0 |
| tcp://127.0.0.1:34019 | 2.19.0 |
| tcp://127.0.0.1:36555 | 2.19.0 |
| tcp://127.0.0.1:39631 | 2.19.0 |
+-----------------------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
UserWarning: The pandas version installed 1.1.3 does not match the supported pandas version in Modin 1.1.4. This may cause undesired side effects!
UserWarning: The Dask Engine for Modin is experimental.

因此,而不是pandas pd,我们只需要使用mpd,即modin。pandas 和其他格式/方法将与熊猫相同。不像在dask dataframe(参考上面的C节),有一些其他的方法(如.compute()和其他),需要进行各种操作或数据操作,但在这里,在modin我们不需要关心任何方法/格式,只是使用它像pandas 库的modin api是相同的pandas  api。

In [24]:

%%time
# using default pandas method to prepare data
dataframe = load_data() # function load_data defined above
CPU times: user 51.6 s, sys: 15.9 s, total: 1min 7s
Wall time: 1min 5s

python pandas dataframe读取超大数据集_速度块_03

您可以看到上述默认pd方法的结果。注意CPU利用率和所花费的时间。下面我们将使用MODIN.PANDAS执行相同的函数。

In [40]:

%%time
# changing the default pandas(pd) method by modin.pandas(mpd)
path = "/kaggle/input/nfl-big-data-bowl-2021/"
# I am using a function to avoid any kind of additional unnecassary variable - helps in RAM saving
# As this creates a new scope for the intermediate variables
# and removes them automatically when the interpreter exits the function.
def load_data_modin():
weekly_data = [csv for csv in os.listdir(path) if csv[:5] == "week1"]
# selecting data for all the weeks having "1" in week name due to the memory limitation of Kaggle notebook.
# As only 16 gigs is allowed to use.
dataframe = mpd.DataFrame()
for files in weekly_data:
df = mpd.read_csv(filepath_or_buffer = "/kaggle/input/nfl-big-data-bowl-2021/%s"%files)
dataframe = mpd.concat([dataframe,df])
return dataframe

dataframe_mpd = load_data_modin()
UserWarning: Distributing <class 'NoneType'> object. This may take some time.
CPU times: user 2.65 s, sys: 1.33 s, total: 3.98 s
Wall time: 15 s

看到运行时间了吗?好哇!很快就得到了结果。

python pandas dataframe读取超大数据集_速度块_04

现在可以看到上述操作的结果,即使用modin.pandas(mpd)。请注意CPU利用率和时间。Modin使用所有的CPU内核以获得更快的结果。您可以在任务管理器中检查本地CPU核心性能,以交叉检查它是否工作。

​结论:从上面的结果,即所花费的时间(大约是一般方法的1/4)和CPU利用率,我们可以得出结论,modin库的工作速度非常快。

​使用modin.pandas保存和读取CSV数据

In [26]:

%%time
# Storing merged data in CSV Format
dataframe_mpd.to_csv("/kaggle/working/modin_csv_data.csv",index = False)
UserWarning: `DataFrame.to_csv` defaulting to pandas implementation.
To request implementation, send an email to feature_requests@modin.org.
UserWarning: Distributing <class 'pandas.core.frame.DataFrame'> object. This may take some time.
CPU times: user 4min 4s, sys: 22.5 s, total: 4min 26s
Wall time: 4min 46s

In [27]:

%%time
# Reading data in CSV Format
csv_data = mpd.read_csv("/kaggle/working/csv_data.csv")
CPU times: user 1.99 s, sys: 1.37 s, total: 3.36 s
Wall time: 18.6 s

In [28]:

del csv_data
gc.collect()

Out[28]:

302



F) 其他并行处理库

尝试其他一些并行处理库,看看它们是否有帮助。

  • ​SWIFTER​​​: SWIFTER是一个包,它试图以最快的可用方法有效地将任何函数应用到Pandas数据帧或系列对象。它与Pandas对象集成在一起,因此我们只能对Pandas对象(如Data Frame或Series)使用这个包。 ​​​链接​
  • ​Pandaral-lel​​​: Pandaral·lel的想法是将您的pandas计算分布在计算机上所有可用的cpu上,以获得显著的速度提升。Pandaral-lel的限制:Pandaral-lel仅适用于pandas和windows pc,只有当Python会话(Python, ipython, jupyter notebook, jupyter lab)从 ​​Windows Subsystem for Linux​​​ (WSL).  ​​链接​​.
  • ​Ipyparallel​​​: 这是另一个多处理和任务分配系统,专门用于跨集群并行执行Jupyter笔记本代码。可以在任何Python语句中使用前缀%px来自动并行化它。 ​​链接​​.
  • ​Dispy​​: Dispy是一个通用的,全面的,易于使用的框架,用于创建和使用计算集群,在单个机器(SMP)的多个处理器上并行执行计算,在集群,网格或云中的许多机器中。它允许将整个Python程序或单独的函数分布到机器集群中进行并行执行。它使用平台原生机制进行网络通信,以保持速度和效率,因此Linux、MacOS和Windows机器工作得同样好。
  • ​Multiprocessing​​: 可以使用多处理包来并行处理输入文件,以加快处理速度。
  • ​Joblib​​: Joblib提供了一个简单的helper类来使用多处理编写并行for循环。其核心思想是编写要作为生成器表达式执行的代码,并将其转换为并行计算。Joblib基本上使用“Parallel”类在CPU上执行并行任务。


G) 保存和加载数据使用各种数据格式

上述方法采用csv格式读取/加载数据,增强了数据操作/计算能力,在很大程度上减少了数据加载到内存的等待时间。除此之外,将数据集保存为各种数据格式始终是最佳实践,这有助于更快的数据操作和消耗更少的内存。有许多二进制格式可以在磁盘上存储数据,其中许多都是pandas支持的。 

可以将数据转换为以下数据格式:

  • csv (default)
  • feather
  • hdf
  • msgpack
  • parquet
  • pickle
  • jay
  • numpy array(.npy format) - for numerical data

搞起


G1) csv format

CSV是一个以逗号分隔的值文件,允许数据以表格格式保存。从一开始就使用的默认标准文本文件格式,大多数数据都以这种格式可用。

A.保存和加载CSV使用PANDAS

In [29]:

%%time
# We would be using pandas library
save = {} # creating an dict obj to store the data saving time
read = {} # creating an dict obj to save the data reading time

# load data
# dataframe_pandas = load_data() and change to csv
# pandas "dataframe" is already defined above so we would be using this here.

# save the data
t1 = time.time() #initial time
dataframe.to_csv("df_csv.csv", index = False) # save df in .csv format
save["csv_pd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_csv = pd.read_csv("df_csv.csv")
read["csv_pd"] = time.time()-t2 # append the data to "read" dict

del dataframe
CPU times: user 4min 1s, sys: 13.6 s, total: 4min 14s
Wall time: 4min 2s

B. 使用Modin PANDAS保存和加载CSV

In [30]:

%%time
# We would be using modin.pandas library
# load data
# dataframe_pandas = load_data_modin() and change to csv
# pandas "dataframe_mpd" is already defined above so we would be using that here.

#save the data
t1 = time.time() #initial time
dataframe_mpd.to_csv("df_csv.csv", index = False) # save dataframe in .csv format
save["csv_mpd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_csv_mpd = mpd.read_csv("df_csv.csv")
read["csv_mpd"] = time.time()-t2 # append the data to "read" dict

del dataframe_mpd
del df_csv_mpd # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
UserWarning: `DataFrame.to_csv` defaulting to pandas implementation.
CPU times: user 4min 4s, sys: 28.6 s, total: 4min 33s
Wall time: 5min 4s

Out[30]:

476

G2) Feather Format

Feather %20和%20R.)是一种快速、轻量级、易于使用的二进制文件格式,用于存储数据帧。“Feather”为数据帧提供了二进制柱状序列化,旨在有效地读取和写入数据帧。它使用Apache Arrow柱状内存规范来表示磁盘上的二进制数据。它有几个具体的设计目标:

  • 轻量级,最小的API:尽可能简单地将数据帧放入和取出内存
  • 语言不可知:无论是用Python还是R代码编写的Feather文件都是一样的。其他语言也可以读写Feather文件。
  • 高读写性能。只要可能,Feather操作应该与本地磁盘性能绑定

 ​​链接​​.

In [31]:

%%time

#save the data
t1 = time.time() #initial time
df_csv.to_feather("df_csv") # save df in feather format
save["feather_pd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_feather = pd.read_feather("df_csv", use_threads = True)
read["feather_pd"] = time.time()-t2 # append the data to "read" dict

del df_feather # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 8.64 s, sys: 3.63 s, total: 12.3 s
Wall time: 11.9 s

Out[31]:

113

G3) hdf Format

​Hierarchical Data Format (HDF)​​ 是一组文件格式(HDF4, HDF5),用于存储和组织大量数据。它支持无限多种数据类型,专为灵活高效的I/O和大容量复杂数据而设计。它允许访问时间和存储空间优化。有一个缺点,注意到,在读取数据时,它最初消耗了大量的内存,然后重置到正常状态,这可能会导致MemoryError或可以打破内核,如果没有足够的内存。

In [32]:

%%time

#save the data
t1 = time.time() #initial time
df_csv.to_hdf("df_csv.h5", key='hdf', mode='w') # save df in hdf format
save["hdf_pd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_hdf = pd.read_hdf("df_csv.h5", key = 'hdf')
read["hdf_pd"] = time.time()-t2 # append the data to "read" dict

del df_hdf # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
PerformanceWarning: 
your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block2_values] [items->Index(['time', 'event', 'displayName', 'position', 'team', 'playDirection',
'route'],
dtype='object')]
CPU times: user 20.9 s, sys: 11 s, total: 32 s
Wall time: 34.2 s

Out[32]:

0

G4) Parquet Format

​Parquet​​ 是一种开源文件格式,适用于Hadoop生态系统中的任何项目。Apache Parquet是为高效、高性能的平柱式数据存储格式而设计的,与基于行的文件(如CSV或TSV文件)相比。

Parquet使用记录分解和组装算法,这优于简单的嵌套名称空间的扁平化。Parquet经过优化,可大量处理复杂数据,并具有高效数据压缩和编码类型的不同方法。这种方法特别适合那些需要从大型表中读取某些列的查询。Parquet只能读取所需的列,因此极大地减少了IO。

 ​​链接​

In [33]:

%%time

#save the data
t1 = time.time() #initial time
df_csv.to_parquet("df_csv.parquet.gzip", compression='gzip') # save df in parquet format
save["parquet_pd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_parquet = pd.read_parquet("df_csv.parquet.gzip")
read["parquet_pd"] = time.time()-t2 # append the data to "read" dict

del df_parquet # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 30.7 s, sys: 3.38 s, total: 34.1 s
Wall time: 35.6 s

Out[33]:

2

G5) Pickle Format

可以将数据文件保存为pickle格式,该格式用于序列化和反序列化对象。将任何类型的python对象(list, dict等)转换为字节流(0和1)的过程称为pickle或序列化或扁平化或编组。我们可以通过一个称为unpickling的过程将字节流(通过pickle生成)转换回对象。

In [34]:

%%time

#save the data
t1 = time.time() #initial time
df_csv.to_pickle("df_csv.pkl") # save df in pkl format
save["pickle_pd"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_pickle = pd.read_pickle("df_csv.pkl")
read["pickle_pd"] = time.time()-t2 # append the data to "read" dict

del df_pickle # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
CPU times: user 12.4 s, sys: 5.39 s, total: 17.8 s
Wall time: 16.9 s

Out[34]:

24

G6) Msgpack Format

​MessagePack​​是一种有效的二进制序列化格式。它允许您在JSON等多种语言之间交换数据。但是它更快更小。小整数被编码成一个字节,而典型的短字符串除了字符串本身外只需要一个额外的字节。这个包提供了用于读写MessagePack数据的CPython绑定(适用于python)。

将使用 pandas-msgpack 库.

pandas_msgpack模块提供了从pandas到msgpack库的接口。这是一种轻量级的可移植二进制格式,类似于二进制JSON,非常节省空间,并且在写入(序列化)和读取(反序列化)方面都提供了良好的性能。

In [ ]:

# Note: pandas-msgpack is not working in kaggle notebook -- facing some installation error
# but for me it does work in local machine.
# For the time being, i am leaving the code as it is.
!pip install pandas-msgpack -U
# to avoid error we need specific pandas version
!pip install pandas-compat
# !pip uninstall --yes pandas
!pip install --upgrade pandas
from pandas_msgpack import to_msgpack, read_msgpack

In [ ]:

%%time

#save the data
t1 = time.time() #initial time
to_msgpack('msgpack_dataframe.msg', df_csv)
save["msgpack"] = time.time()-t1 # append the data to "save" dict

#load the saved data
t2 = time.time()
df_msgpack = read_msgpack('msgpack_dataframe.msg')
read["msgpack"] = time.time()-t2 # append the data to "read" dict

del df_msgpack # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory

G7) Jay Format

正如已经讨论了上面的数据表(参考上面的D节了解更多信息)。数据表功能强大,速度极快。我们可以使用.jay格式进行更快的数据操作。

In [35]:

%%time
print(f'Version: {dt.__version__}') #datatable is already installed above

# save to .jay format
t1 = time.time() #initial time
frame = dt.fread("df_csv.csv") #store data in datatable frame
frame.to_jay("data_table.jay") # change to .jay format
save["jay"] = time.time()-t1 # append the data to "save" dict

t2 = time.time()
jay_data = dt.open("data_table.jay")
read["jay"] = time.time()-t2 # append the data to "save" dict
# to covert to pandas dataframe directly
# jay_data = dt.open("data_table.jay").to_pandas() # because of limited memory - Commenting.


del frame
del jay_data # delete the csv data to free memory space
gc.collect() # collect garbage value (if any) from the memory
Version: 0.11.0
CPU times: user 16.4 s, sys: 3.1 s, total: 19.5 s
Wall time: 15.9 s
FutureWarning: Function dt.open() is deprecated since 0.10.0, and will be removed in version 1.0.
Please use dt.fread(file), or dt.Frame(file) instead

Out[35]:

20

G7) Numpy Format- For numerical dataset

无论何时处理数字数据,使用Numpy文件格式总是一个很好的做法,因为它们是快速和容易处理的。它包含一个以NumPy (NPY)文件格式保存的数组。NPY文件存储了在任何计算机上重建数组所需的所有信息,包括dtype和形状信息。 ​​链接​

In [36]:

#select dataframe having numerical dtype (int/float)
numerical_df = df_csv.select_dtypes(include=['int64', 'float64'])
print(f"numerical_df memory info: {numerical_df.memory_usage().sum()/1024**2} MBs.")
numerical_df memory info: 871.2803955078125 MBs.

In [37]:

%%time
# save numerical data in .npy format
# np.save('data.npy', dataframe)
with open("data.npy", "wb") as file:
np.save(file, numerical_df)
CPU times: user 353 ms, sys: 1.89 s, total: 2.25 s
Wall time: 2.24 s

In [38]:

%%time
# read numerical data in .npy format
# data_array = np.load('data.npy')
# data = pd.DataFrame(data_array)
with open("data.npy", "rb") as file:
data = np.load(file)
df = pd.DataFrame(data)
CPU times: user 57.1 ms, sys: 886 ms, total: 943 ms
Wall time: 910 ms

从上面的结果中可以看到,NPY文件格式对于读取数字数据来说是非常快的。

In [39]:

result_df = pd.DataFrame({'SaveTime(in sec)':pd.Series(save),'ReadTime(in sec)':pd.Series(read)})
result_df

Out[39]:

SaveTime(in sec)

ReadTime(in sec)

csv_pd

210.807918

31.154217

csv_mpd

284.705031

19.505428

feather_pd

8.463451

2.954259

hdf_pd

17.917461

15.515713

parquet_pd

26.297971

8.836816

pickle_pd

9.718161

6.464217

jay

15.559961

0.001492

结论

如果你的服务器有几个cpu,只有其中一个是完全专用于你的计算。为什么我们不使用空闲cpu呢?从上面的实验中,有一件事是清楚的,可以利用库来帮助并行处理。它们无疑提高了性能并节省了时间。除此之外,还可以使用其他方法/库/方法,因为单一的方法/库/方法永远不可能适用于所有场景。每种方法都有自己的缺点和优点。你可以尝试不同的数据集,看看上面的方法是否对你有益。有很多二进制格式来存储数据,这些格式消耗的内存空间更少,有助于更快地执行任务,因此我们必须利用各种数据格式的好处。

我希望上面的方法能帮助到你,就像它们帮助到我一样。

参考链接:​​https://www.gairuo.com/p/pandas-large-data​

参考链接:​​https://www.gairuo.com/p/python-ray​

参考链接:​​https://www.gairuo.com/p/python-modin​

参考链接:​​https://www.gairuo.com/p/python-dask​

参考链接:​​​​​https://www.kaggle.com/code/vmanav/read-large-datasets-in-seconds/notebook​​​

标签:python,dataframe,df,dask,time,csv,data,pandas
From: https://blog.51cto.com/welcomeweb/6020131

相关文章

  • Python3.9+torch1.7.1+cuda11.0+cudnn8.0+Anaconda3安装
    前言想要安装pytorch,至少得先安装Anaconda、python!!!必要的不想用cpu要用gpu的还需要cuda11.0+cudnn11.0!!!一、安装python3.9二、安装Anaconda3三、安装cuda11.0+cudnn8.0四、......
  • python 工匠规范
    最近看了《python工匠》这本关于python的书,虽然只看了第一章关于变量与注释的内容,也有些突如其来的想法。一直学习如何使用python去做一些功能,忽略了它本身的一些特性和......
  • python __getitem__用法
    classA():def__init__(self):self.name={"key":"aaa"}def__getitem__(self,item):returnself.name.get(item)obj=A()print(obj["key"])......
  • python编码
    str类型字符串  byte类型字符串1、两者互相转换通过 str类型通过encode转换成bytes类型bytes类型通过decode转换成str类型2、两者以一种编码方式进行......
  • 多进程 multiprocessing in Python
    Simpleexampleimporttimeimportmultiprocessingstart=time.perf_counter()defdo_something(): print('sleeping1second...') time.sleep(1) print('Done......
  • python操作mysql基础
    importpymysqlconfig={'host':'127.0.0.1','port':3306,'user':'root','password':'root','database':'sys','cursorclass':......
  • Python3.9安装
    一、安装python3.9链接:https://pan.baidu.com/s/1mDkgKt2KSoMrKVxesb76Pg?pwd=ma4n提取码:ma4n--来自百度网盘超级会员V4的分享下载python3.9,然后安装到D盘都是......
  • 多线程threading in Python
    SimpleExample1importtimeimportthreadingstart=time.perf_counter()defdo_something(): print('sleeping1second...') time.sleep(1) print('Doneslee......
  • Python - requests 使用记录
    requests使用简单方法记录importrequestsfromfake_useragentimportUserAgentua=UserAgent()headers={'User-Agent':ua.random#伪装}#......
  • 我的Python程序太慢了。如何加快速度?
    如果你的Python程序太慢,你可以按照下面给出的提示和技巧-抽象化避免过度抽象,尤其是在微小函数或方法的形式下。抽象往往会产生间接性,并迫使解释器工作更多。如果间接寻......