我编写的代码如下,
import dask.array as da
from distributed import Client
remote_server = 'localhost:8786'
client = Client(remote_server)
rows, cols = 4123, 4123
chunk_rows, chunk_cols = 1024, 1024
matrix1 = da.random.random(size=(rows, cols), chunks=(chunk_rows, chunk_cols))
matrix2 = da.random.random(size=(rows, cols), chunks=(chunk_rows, chunk_cols))
print(matrix1.compute_chunk_sizes())
print(matrix1.compute_chunk_sizes())
sz = matrix1.shape
dim0 = sz[0]
dim1 = sz[1]
chunk_remain = dim1 % chunk_rows
lack_rows = chunk_rows - chunk_remain
lack_cols = lack_rows
# Find the missing component where splitting into chunks does not make the chunks square
lack_mat0 = da.zeros(shape=[lack_cols, dim1], chunks=(chunk_rows, chunk_cols))
lack_mat1 = da.zeros(shape=[dim0 + lack_cols, lack_rows], chunks=(chunk_rows, chunk_cols))
# Combine generated components
new_arr0 = da.append(matrix1, lack_mat0, axis=0)
new_arr1 = da.append(new_arr0, lack_mat1, axis=1)
new_matrix1 = new_arr1
sz = matrix2.shape
dim0 = sz[0]
dim1 = sz[1]
chunk_remain = dim0 % chunk_rows
lack_rows = chunk_rows - chunk_remain
lack_cols = lack_rows
# Find the missing component where splitting into chunks does not make the chunks square
lack_mat0 = da.zeros(shape=[lack_cols, dim1], chunks=(chunk_rows, chunk_cols))
lack_mat1 = da.zeros(shape=[dim0 + lack_cols, lack_rows], chunks=(chunk_rows, chunk_cols))
# Combine generated components
new_arr0 = da.append(matrix2, lack_mat0, axis=0)
new_arr1 = da.append(new_arr0, lack_mat1, axis=1)
new_matrix2 = new_arr1
# Unit matrix the added component parts
for i in range(rows, new_matrix1.shape[1]):
new_matrix1[i, i] = 1.0
new_matrix2[i, i] = 1.0
# Reorganize chunks
new_new_matrix1 = new_matrix1.rechunk((chunk_rows, chunk_cols))
new_new_matrix2 = new_matrix1.rechunk((chunk_rows, chunk_cols))
result_graph = da.linalg.solve(new_new_matrix1, new_new_matrix2)
print(result_graph.compute_chunk_sizes())
result_future = client.compute(result_graph)
result_result = client.gather(result_future)
print(result_result)
运行脚本
python solve-test-104.py
我收到错误如下,
2024-07-22 18:28:03,886 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7aa31b0cdd20>
0. 134840950195584
>.
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
RecursionError: maximum recursion depth exceeded while pickling an object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
RecursionError: maximum recursion depth exceeded while pickling an object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
RecursionError: maximum recursion depth exceeded while pickling an object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1249, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
RecursionError: maximum recursion depth exceeded while pickling an object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
RecursionError: maximum recursion depth exceeded while pickling an object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
RecursionError: maximum recursion depth exceeded while pickling an object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
frames[0] = pickle.dumps(
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1249, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/gorn/Projects/Tohoku-U/solvetest/solve-test-104.py", line 68, in <module>
print(result_graph.compute_chunk_sizes())
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/dask/array/core.py", line 1500, in compute_chunk_sizes
tuple(int(chunk) for chunk in chunks) for chunks in compute(tuple(c))[0]
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/dask/base.py", line 662, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/gorn/Projects/Tohoku-U/solvetest/venv/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 392, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7aa31b0cdd20>\n 0. 134840950195584\n>')
The error message "Could not pickle object as excessively deep recursion required" often arises when dealing with Dask and distributed computing, especially when using
client.compute
on large Dask arrays or complex Dask graphs. This error signals that the object you are trying to send to the Dask workers (in this case, the
result_graph
) is too deeply nested or contains circular references that exceed Python's recursion limit.
Let's break down the potential causes and solutions:
Understanding the Issue:
-
Dask Graphs and Serialization: When you use
client.compute
, Dask needs to send the computation graph (result_graph
in your case) to the workers. This graph describes how to compute the result from the input arrays. Dask uses serialization (likepickle
orcloudpickle
) to transfer this graph. -
Recursion Limit: Python has a built-in recursion limit to prevent stack overflows from infinite recursion. When serializing a deeply nested object, the serialization process itself can hit this limit.
Potential Solutions:
- Simplify Your Graph: Complex Dask graphs, particularly those created by iterative or recursive operations, can easily become too deep. Analyze your code:
-
Avoid Unnecessary Operations:
Check for any redundant
da.append
,rechunk
, or array manipulations that might be contributing to graph complexity. Can you achieve the same outcome with fewer steps? -
Break It Down: If possible, split your computation into smaller, independent stages. Compute intermediate results, persist them if necessary (
result.persist()
), and then use these persisted results in subsequent stages. This can significantly reduce the graph depth. -
Increase Recursion Limit (Use with Caution): You can increase Python's recursion limit using
sys.setrecursionlimit(limit)
. However, this is generally discouraged as a primary solution. A high recursion limit can lead to stack overflows and crashes if the underlying problem is not addressed. -
Use
dask.delayed
for Custom Functions: If you are applying custom functions within your Dask graph, consider wrapping them withdask.delayed
. This can help isolate complex computations and make them more serialization-friendly.
Debugging Tips:
-
Visualize Your Graph:
Use
result_graph.visualize(filename='graph.pdf')
to generate a visual representation of your Dask graph. This can help you identify areas of complexity and potential optimizations. - Inspect Chunk Sizes: Ensure your chunk sizes are appropriate for your data and computational resources. Very small chunk sizes can lead to excessively large graphs.
Code Improvements (Without Changing Logic):
import dask.array as da
from distributed import Client
# ... (Your code for connecting to the client and defining matrix dimensions)
# Function to pad a matrix to a multiple of chunk size
def pad_matrix(matrix, chunk_rows, chunk_cols):
rows, cols = matrix.shape
lack_rows = chunk_rows - (rows % chunk_rows) if rows % chunk_rows else 0
lack_cols = chunk_cols - (cols % chunk_cols) if cols % chunk_cols else 0
lack_mat0 = da.zeros(shape=[lack_rows, cols], chunks=(chunk_rows, chunk_cols))
lack_mat1 = da.zeros(shape=[rows + lack_rows, lack_cols], chunks=(chunk_rows, chunk_cols))
new_arr = da.append(matrix, lack_mat0, axis=0)
new_arr = da.append(new_arr, lack_mat1, axis=1)
return new_arr
# Pad matrices
new_matrix1 = pad_matrix(matrix1, chunk_rows, chunk_cols)
new_matrix2 = pad_matrix(matrix2, chunk_rows, chunk_cols)
# Set diagonal elements (no need to iterate)
new_matrix1 = new_matrix1 + da.diag(da.ones(new_matrix1.shape[0]))
new_matrix2 = new_matrix2 + da.diag(da.ones(new_matrix2.shape[0]))
# Rechunk (consider if truly necessary)
new_matrix1 = new_matrix1.rechunk((chunk_rows, chunk_cols))
new_matrix2 = new_matrix2.rechunk((chunk_rows, chunk_cols))
result_graph = da.linalg.solve(new_matrix1, new_matrix2)
# ... (Compute and gather results)
Remember: The best approach is to prevent excessively complex graphs in the first place. By carefully structuring your Dask computations, you can often avoid serialization issues and improve performance.
标签:python,dask,distributed From: 78777952