首页 > 编程问答 >使用 Python 通过逻辑应用运行长时间运行的 Azure Functions

使用 Python 通过逻辑应用运行长时间运行的 Azure Functions

时间:2024-07-23 11:33:17浏览次数:13  
标签:python azure-functions azure-logic-apps long-running-processes

我已经尝试解决这个问题有一段时间了,但我似乎找不到解决方案。

因此,正如标题所示,我试图通过函数在逻辑应用程序中运行长时间运行的操作。我有一个 python 代码,可以比较 2 个 excel 文件并进行一些转换。它工作正常,但是,Excel 文件包含近 20k 行(它是动态的,将来会添加更多行),因此需要一段时间才能完成。实际的 Python 代码运行大约 8 分钟,但当我将相同的代码转换为 Azure Function 时,它在本地运行大约 11 分钟。这显然会达到逻辑应用程序的超时限制。在做一些研究时,我发现使用 HTTP webhook 是最简单可行的解决方法。

我找到的示例博客 https://medium.com/@jeffhollan/calling-long-running-functions-from-logic- apps-6d7ba5044701

这个想法是创建 2 个函数。其中之一是 Webhook 将调用、传递callbackUri 并立即返回 202 响应。第二个将执行长时间运行的操作,并在完成后调用回调uri。

所有示例实现都使用 C# 或其他语言,没有 python 示例。我最终使用了线程。

import logging
import azure.functions as func
import time
import requests
import threading


app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


@app.function_name("http_trigger1 ")
@app.route("http_trigger1 ")
def http_trigger1 (req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Webhook request from Logic Apps received.')

    callback_url = req.params.get('callback_url')
    data = req.params.get('data')
    if not callback_url:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            callback_url = req_body.get('callback_url')
            data = req_body.get('data')
            
    if not callback_url:
        return func.HttpResponse(
            "Enter a valid callback URL",
            status_code=400
        )
        
    try:
        # Immediately return 202 Accepted
        threading.Thread(target=process_and_callback, args=(callback_url, data)).start()
        
        return func.HttpResponse(
            "Accepted for processing",
            status_code=202
        )
    
    except Exception as e:
        logging.error(f'An error occurred: {str(e)}')
        return func.HttpResponse(
            "Error occurred while invoking callback",
            status_code=500
        )

def process_and_callback(callback_url: str, data: str) -> None:
    try:

        time.sleep(720)
        

        callback_data = {
            "Subject": data 
        }
        
        # Make callback request
        response = requests.post(callback_url, json=callback_data)
        response.raise_for_status() 

        logging.info(f'Callback successful with status code: {response.status_code}')
    
    except Exception as e:
        logging.error(f'An error occurred while invoking callback: {str(e)}')

我有 12 分钟的睡眠来模拟长时间运行的动作。这似乎有效,并且确实绕过了逻辑应用程序中的超时限制。然而,问题在于,在我用实际代码替换睡眠之后:

  1. 逻辑应用程序间歇性地返回以下错误: 请求的实体太大 请求失败,状态代码为“RequestEntityTooLarge”。

我的请求正文由callbackUri、file1、file2(均为base64编码)组成

  1. 通常,一旦收到此错误,我只需“重新提交”它,它就会继续然而,运行时,它被卡住了(或者看起来是这样)。我还没有看到它完成,但它处于运行状态超过 1 小时 30 分钟(而本地只需要 11 分钟)

我希望有人可以帮助我解决这个问题。如果需要,很乐意分享更多信息。

编辑: 我还应该注意到,我使用虚拟文件(行数较少的文件)对此进行了测试,并使用上述方法仅运行并执行了 3 分钟。仅在使用实际文件时才会出现此问题。


可以理解在尝试使用 Python Azure 函数处理长时间运行的任务时遇到的问题。 使用 HTTP webhook 来处理超时的方法是正确的,但遇到的错误表明其中存在一些优化点。 我们来分析一下情况并找到解决方案:

1. “请求实体过大”错误:

  • 此错误明确表明发送到 webhook 函数的数据量超出了允许的限制。Azure 函数对请求大小有特定限制(根据不同的触发器,几 MB 不等)。 使用 base64 编码大型 Excel 文件会导致大小显着增加,从而可能导致此问题。

  • 解决方案: 不要通过请求正文传递整个 Excel 文件,而应考虑将它们上传到 Azure Blob 存储等服务,并将 Blob URL 传递给函数。这不仅可以解决大小限制问题,还可以提高性能。

2. 函数执行时间过长:

  • 虽然 webhook 方法解决了逻辑应用超时问题,但提到即使使用实际代码替换睡眠后,函数的运行时间也比本地时间长得多。 这表明可能存在性能瓶颈。

  • 解决方案:

    • 优化 Python 代码: 仔细检查的代码是否存在任何潜在的瓶颈。分析 Excel 文件比较和转换逻辑以获得最佳性能。考虑使用专用库(如 pandas openpyxl )来提高效率。
    • Azure 函数计划: 确保的 Azure 函数计划具有足够的资源来处理工作负载。 如果使用的是“消费计划”,则在代码执行过程中启动时可能会出现延迟,这称为“冷启动”。 考虑使用“专用计划”或“高级计划”来确保持续可用性和更好的性能。
    • 异步操作: 如果可能,将长时间运行的操作分解为更小的异步任务。 这使可以利用 Azure 函数的可扩展性,并有可能减少总体处理时间。

修订后的代码实现(概念性):

import logging
import azure.functions as func
import time
import requests
import threading
from azure.storage.blob import BlobServiceClient  # 添加 Azure Blob 存储库

# ...(现有的代码)...

def http_trigger1(req: func.HttpRequest) -> func.HttpResponse:
    # ...(解析 callback_url)...

    # 从请求中获取 Blob URL(假设已上传文件)
    file1_blob_url = req.params.get('file1_blob_url')
    file2_blob_url = req.params.get('file2_blob_url')

    if not all([file1_blob_url, file2_blob_url]):
        return func.HttpResponse(
            "缺少文件 URL",
            status_code=400
        )

    try:
        threading.Thread(target=process_and_callback, args=(callback_url, file1_blob_url, file2_blob_url)).start()
        return func.HttpResponse("Accepted for processing", status_code=202)
    except Exception as e:
        logging.error(f'发生错误:{str(e)}')
        return func.HttpResponse("调用回调时发生错误", status_code=500)

def process_and_callback(callback_url: str, file1_blob_url: str, file2_blob_url: str) -> None:
    try:
        # 使用 Blob URL 下载文件
        blob_service_client = BlobServiceClient.from_connection_string(STORAGE_CONNECTION_STRING)
        # ...(下载文件的逻辑)...

        # 处理 Excel 文件比较和转换
        # ...(的处理逻辑,使用下载的文件)...

        # ...(构建 callback_data)...

        # 进行回调请求
        response = requests.post(callback_url, json=callback_data)
        response.raise_for_status()

        logging.info(f'回调成功,状态码:{response.status_code}')
    except Exception as e:
        logging.error(f'调用回调时发生错误:{str(e)}')

其他提示:

  • 日志记录: 在整个函数执行过程中实现详细的日志记录,以帮助确定任何性能瓶颈或错误的根本原因。
  • 错误处理: 实现稳健的错误处理机制,以便在发生错误时可以优雅地处理并向逻辑应用报告。
  • 可扩展性: 设计的解决方案时要考虑到可扩展性。 如果预计数据量会随着时间的推移而增加,请确保的体系结构能够处理增加的负载。

通过实施这些更改,可以解决“请求实体过大”错误、优化函数的性能并创建一个更可靠的解决方案来处理 Azure 函数中的长时间运行任务。

标签:python,azure-functions,azure-logic-apps,long-running-processes
From: 78781447

相关文章

  • 使用递归函数计算阶乘时,我在 python 中遇到类型错误
    defcalc_fact(n):如果(n==1或n==0):返回1别的:n*calc_fact(n-1)print(calc_fact(5))试图创建函数来计算阶乘,不支持类型错误操作数*:对于int或Nonetype我不知道为什么谢谢Python代码中出现“类型错误:不支持的操作数类型为*:'int'和'NoneType'”表明你......
  • 如何调试 python Flask [84] [CRITICAL] WORKER TIMEOUT?
    调试:gtts.tts:保存到temp.mp37月22日09:10:56PM[2024-07-2215:40:56+0000][84][严重]工作超时(pid:87)|||7月22日09:10:56PM[2024-07-2215:40:56+0000][87][INFO]工人退出(pid:87)7月22日09:10:57PM[2024-07-2215:40:57+0000][95][INF......
  • 类型错误:无法将函数返回值转换为 Python 类型!签名是 () -> 处理 anaconda spider
    这是代码:importosimportrandomimportnumpyasnpimportpandasaspdimporttensorflowastffromtensorflow.kerasimportbackendasKfromtensorflow.keras.layersimportDense,Dropout,Flatten,Conv2D,MaxPool2D,Input......
  • python进阶---闭包与装饰器
    一、闭包        在Python中,闭包是指一个函数内部定义的函数,这个内部函数可以访问并修改其外部函数的局部变量,即使外部函数已经执行完毕。闭包可以通过多层函数嵌套来实现。    闭包的三要素:    1、外部函数嵌套内部函数    2、外部函数返......
  • 强制从当前包自动导入的 Python 以此包的名称为前缀
    我在VSCode中使用Python和Pylance扩展。在我正在编辑的自己的包中,自动添加的导入(设置“导入格式:绝对”)如下所示:frommydirectory.myfileimportmyclass但是,我的Python包正在被被一个(非常愚蠢且不可协商的)外部系统消耗,该系统拒绝正确解释它,除非导入的格式特别......
  • Python语言-面向对象
    知识代码classJobSalary(object):job=''def__init__(self,city):self.jobname="数据分析师"self.exp=''self.city=city#方法defdata_normalize(self,data):print(f'正在规范化......
  • 需要帮助使用 Selenium Python 单击 Microsoft Teams 按钮
    我将Python与Selenium结合使用,并自动登录MicrosoftTeams。进入后,弹出窗口显示我需要单击“立即切换”以切换到V2版本。我似乎无法使用SeleniumPython成功单击此按钮。谁能帮我自动点击这个按钮?这是我不成功的尝试:self.driver.find_element(By.CLASS_NAME,......
  • python接口自动化(四十)- logger 日志 - 下(超详解)
    宏哥微信粉丝群:https://bbs.csdn.net/topics/618423372 有兴趣的可以扫码加入 1.简介按照上一篇的计划,这一篇给小伙伴们讲解一下:(1)多模块使用logging,(2)通过文件配置logging模块,(3)自己封装一个日志(logging)类。可能有的小伙伴在这里会有个疑问一个logging为什么分两篇的篇幅......
  • Python面试题:使用NumPy进行高效数组运算
    NumPy是Python中进行高效数组运算的基础库。以下是一些示例,展示了如何使用NumPy进行高效的数组运算,包括创建数组、数组操作、数学运算以及一些高级操作。安装NumPy如果你还没有安装NumPy,可以通过以下命令进行安装:pipinstallnumpy示例代码1.创建数组import......
  • Python面试题:使用Matplotlib和Seaborn进行数据可视化
    使用Matplotlib和Seaborn进行数据可视化是数据分析中非常重要的一部分。以下示例展示了如何使用这两个库来创建各种图表,包括基本的线图、柱状图、散点图和高级的分类数据可视化图表。安装Matplotlib和Seaborn如果你还没有安装这两个库,可以使用以下命令进行安装:pipins......