我已经尝试解决这个问题有一段时间了,但我似乎找不到解决方案。
因此,正如标题所示,我试图通过函数在逻辑应用程序中运行长时间运行的操作。我有一个 python 代码,可以比较 2 个 excel 文件并进行一些转换。它工作正常,但是,Excel 文件包含近 20k 行(它是动态的,将来会添加更多行),因此需要一段时间才能完成。实际的 Python 代码运行大约 8 分钟,但当我将相同的代码转换为 Azure Function 时,它在本地运行大约 11 分钟。这显然会达到逻辑应用程序的超时限制。在做一些研究时,我发现使用 HTTP webhook 是最简单可行的解决方法。
我找到的示例博客 https://medium.com/@jeffhollan/calling-long-running-functions-from-logic- apps-6d7ba5044701
这个想法是创建 2 个函数。其中之一是 Webhook 将调用、传递callbackUri 并立即返回 202 响应。第二个将执行长时间运行的操作,并在完成后调用回调uri。
所有示例实现都使用 C# 或其他语言,没有 python 示例。我最终使用了线程。
import logging
import azure.functions as func
import time
import requests
import threading
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.function_name("http_trigger1 ")
@app.route("http_trigger1 ")
def http_trigger1 (req: func.HttpRequest) -> func.HttpResponse:
logging.info('Webhook request from Logic Apps received.')
callback_url = req.params.get('callback_url')
data = req.params.get('data')
if not callback_url:
try:
req_body = req.get_json()
except ValueError:
pass
else:
callback_url = req_body.get('callback_url')
data = req_body.get('data')
if not callback_url:
return func.HttpResponse(
"Enter a valid callback URL",
status_code=400
)
try:
# Immediately return 202 Accepted
threading.Thread(target=process_and_callback, args=(callback_url, data)).start()
return func.HttpResponse(
"Accepted for processing",
status_code=202
)
except Exception as e:
logging.error(f'An error occurred: {str(e)}')
return func.HttpResponse(
"Error occurred while invoking callback",
status_code=500
)
def process_and_callback(callback_url: str, data: str) -> None:
try:
time.sleep(720)
callback_data = {
"Subject": data
}
# Make callback request
response = requests.post(callback_url, json=callback_data)
response.raise_for_status()
logging.info(f'Callback successful with status code: {response.status_code}')
except Exception as e:
logging.error(f'An error occurred while invoking callback: {str(e)}')
我有 12 分钟的睡眠来模拟长时间运行的动作。这似乎有效,并且确实绕过了逻辑应用程序中的超时限制。然而,问题在于,在我用实际代码替换睡眠之后:
- 逻辑应用程序间歇性地返回以下错误: 请求的实体太大 请求失败,状态代码为“RequestEntityTooLarge”。
我的请求正文由callbackUri、file1、file2(均为base64编码)组成
- 通常,一旦收到此错误,我只需“重新提交”它,它就会继续然而,运行时,它被卡住了(或者看起来是这样)。我还没有看到它完成,但它处于运行状态超过 1 小时 30 分钟(而本地只需要 11 分钟)
我希望有人可以帮助我解决这个问题。如果需要,很乐意分享更多信息。
编辑: 我还应该注意到,我使用虚拟文件(行数较少的文件)对此进行了测试,并使用上述方法仅运行并执行了 3 分钟。仅在使用实际文件时才会出现此问题。
可以理解在尝试使用 Python Azure 函数处理长时间运行的任务时遇到的问题。 使用 HTTP webhook 来处理超时的方法是正确的,但遇到的错误表明其中存在一些优化点。 我们来分析一下情况并找到解决方案:
1. “请求实体过大”错误:
-
此错误明确表明发送到 webhook 函数的数据量超出了允许的限制。Azure 函数对请求大小有特定限制(根据不同的触发器,几 MB 不等)。 使用 base64 编码大型 Excel 文件会导致大小显着增加,从而可能导致此问题。
-
解决方案: 不要通过请求正文传递整个 Excel 文件,而应考虑将它们上传到 Azure Blob 存储等服务,并将 Blob URL 传递给函数。这不仅可以解决大小限制问题,还可以提高性能。
2. 函数执行时间过长:
-
虽然 webhook 方法解决了逻辑应用超时问题,但提到即使使用实际代码替换睡眠后,函数的运行时间也比本地时间长得多。 这表明可能存在性能瓶颈。
-
解决方案:
-
优化 Python 代码:
仔细检查的代码是否存在任何潜在的瓶颈。分析 Excel 文件比较和转换逻辑以获得最佳性能。考虑使用专用库(如
pandas
或openpyxl
)来提高效率。 - Azure 函数计划: 确保的 Azure 函数计划具有足够的资源来处理工作负载。 如果使用的是“消费计划”,则在代码执行过程中启动时可能会出现延迟,这称为“冷启动”。 考虑使用“专用计划”或“高级计划”来确保持续可用性和更好的性能。
- 异步操作: 如果可能,将长时间运行的操作分解为更小的异步任务。 这使可以利用 Azure 函数的可扩展性,并有可能减少总体处理时间。
-
优化 Python 代码:
仔细检查的代码是否存在任何潜在的瓶颈。分析 Excel 文件比较和转换逻辑以获得最佳性能。考虑使用专用库(如
修订后的代码实现(概念性):
import logging
import azure.functions as func
import time
import requests
import threading
from azure.storage.blob import BlobServiceClient # 添加 Azure Blob 存储库
# ...(现有的代码)...
def http_trigger1(req: func.HttpRequest) -> func.HttpResponse:
# ...(解析 callback_url)...
# 从请求中获取 Blob URL(假设已上传文件)
file1_blob_url = req.params.get('file1_blob_url')
file2_blob_url = req.params.get('file2_blob_url')
if not all([file1_blob_url, file2_blob_url]):
return func.HttpResponse(
"缺少文件 URL",
status_code=400
)
try:
threading.Thread(target=process_and_callback, args=(callback_url, file1_blob_url, file2_blob_url)).start()
return func.HttpResponse("Accepted for processing", status_code=202)
except Exception as e:
logging.error(f'发生错误:{str(e)}')
return func.HttpResponse("调用回调时发生错误", status_code=500)
def process_and_callback(callback_url: str, file1_blob_url: str, file2_blob_url: str) -> None:
try:
# 使用 Blob URL 下载文件
blob_service_client = BlobServiceClient.from_connection_string(STORAGE_CONNECTION_STRING)
# ...(下载文件的逻辑)...
# 处理 Excel 文件比较和转换
# ...(的处理逻辑,使用下载的文件)...
# ...(构建 callback_data)...
# 进行回调请求
response = requests.post(callback_url, json=callback_data)
response.raise_for_status()
logging.info(f'回调成功,状态码:{response.status_code}')
except Exception as e:
logging.error(f'调用回调时发生错误:{str(e)}')
其他提示:
- 日志记录: 在整个函数执行过程中实现详细的日志记录,以帮助确定任何性能瓶颈或错误的根本原因。
- 错误处理: 实现稳健的错误处理机制,以便在发生错误时可以优雅地处理并向逻辑应用报告。
- 可扩展性: 设计的解决方案时要考虑到可扩展性。 如果预计数据量会随着时间的推移而增加,请确保的体系结构能够处理增加的负载。
通过实施这些更改,可以解决“请求实体过大”错误、优化函数的性能并创建一个更可靠的解决方案来处理 Azure 函数中的长时间运行任务。
标签:python,azure-functions,azure-logic-apps,long-running-processes From: 78781447