首页 > 编程问答 >Airflow 2.7.2 触发badld编写的代码块 Triggerer的异步线程

Airflow 2.7.2 触发badld编写的代码块 Triggerer的异步线程

时间:2024-07-24 09:50:18浏览次数:13  
标签:python async-await triggers airflow coroutine

我正在运行 Airflow 版本 2.7.2,并且我不断地在触发器日志中看到该问题。

Triggerer's async thread was blocked for 0.50 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.

我的触发器运行函数是这个,它是 BaseTrigger 的实现

async def run(self):
    while not await self.check_if_job_finished():
        await asyncio.sleep(60)
    # Send our single event and then we're done
    self.finished = True
    yield TriggerEvent(self.job_id)

这是检查作业是否完成的函数,我用它来发送 Http 请求。

async def check_if_job_finished(self) -> bool:
    self.log.info("Check if job finished.")
    endpoint = 'abc.com'
    headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
    response_json = await async_http_util.async_get(url=endpoint + "/api/runtime/" + self.job_id,
                                               headers=headers, verify_ssl=False)
    if response_json['code'] == '0000':
        status = response_json['result']['status']
        if status == 'SUCCESS' or status == 'FAILED':
            return True
        else:
            return False
    else:
        raise AirflowException("job status failed, response is: {}".format(response_json))

为了发送 http 请求,我使用 aiohttp。

import asyncio
import logging
from aiohttp import ClientSession
from airflow import AirflowException

log = logging.getLogger(__name__)

async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs):
    """
    Send an asynchronous HTTP GET request with retry logic.
    :param url: Request URL
    :param retry_times: Maximum number of retry attempts
    :param retry_wait_seconds: Retry interval (seconds)
    :param kwargs: Additional parameters
    :return: Response.json()
    """
    for i in range(retry_times):
        try:
            async with ClientSession() as session:
                async with session.get(url, **kwargs) as response:
                    if response.status == 200:
                        log.info(f"Request successful with {response.status}, text: {await response.text()}")
                        return await response.json()
                    else:
                        raise Exception(f"Http code is {response.status}, text: {response.text()} need to retry.")
        except Exception as e:
            log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...")
            if i < retry_times - 1:
                await asyncio.sleep(retry_wait_seconds)
            else:
                raise AirflowException(f"All retries failed. {e}")

我尝试增加 pod 中的资源,但没有帮助。


The error message "Triggerer's async thread was blocked for 0.50 seconds" indicates that your trigger's asynchronous operation (specifically the check_if_job_finished function) is taking too long to complete, causing the triggerer thread to stall.

Here's a breakdown of potential bottlenecks and solutions:

1. Network Latency:

  • Problem: The async_get function makes an HTTP request to abc.com . Network latency can significantly delay response times, especially if the external service is experiencing slowdowns.
  • Solutions:
    • Optimize Network: Ensure a stable and fast network connection between your Airflow worker and abc.com .
    • Caching: If the job status doesn't change frequently, implement a caching mechanism to reduce the number of requests to abc.com . Consider using Airflow's built-in caching features or external tools like Redis.
    • Increase Timeout: Adjust the timeout value in async_get to a more generous limit if the external service is inherently slow.

2. External Service Performance:

  • Problem: The performance of the external service ( abc.com ) directly impacts your trigger. If the service is overloaded or inefficient, your trigger will wait longer for a response.
  • Solution: Investigate the performance of abc.com and consider optimizing it or switching to a more performant alternative if possible.

3. Inefficient async_get Implementation:

  • Problem: While the async_get function uses aiohttp , the retry logic could be improved. Currently, retries occur synchronously within the async function, blocking the event loop.
  • Solutions:
    • Asynchronous Retries: Implement asynchronous retries using asyncio.sleep within a loop to prevent blocking the event loop:

      python async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs): for _ in range(retry_times): try: async with ClientSession() as session: async with session.get(url, **kwargs) as response: if response.status == 200: return await response.json() else: raise Exception(f"Http code is {response.status}, need to retry.") except Exception as e: log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...") await asyncio.sleep(retry_wait_seconds) raise AirflowException(f"All retries failed.")

4. Resource Constraints (Less Likely):

  • Problem: While you mentioned increasing pod resources didn't help, ensure your Airflow worker nodes have sufficient CPU and memory if the check_if_job_finished function performs CPU-intensive tasks.

Debugging Tips:

  • PYTHONASYNCIODEBUG=1 : As the error message suggests, enable this environment variable to get detailed debug information about asynchronous tasks.
  • Profiling: Use Python profilers (e.g., cProfile ) to pinpoint performance bottlenecks within your trigger code.
  • Logging: Add more verbose logging statements within your check_if_job_finished and async_get functions to monitor execution times and identify potential delays.

Remember:

  • The goal is to make your check_if_job_finished function as lightweight and asynchronous as possible to avoid blocking the triggerer thread.
  • Prioritize optimizing network communication and the performance of the external service ( abc.com ) as they are the most likely culprits for delays in this scenario.

标签:python,async-await,triggers,airflow,coroutine
From: 78783289

相关文章

  • Python函数获取匹配和错误记录
    我有一个以下格式的json文件:[{"type":"BEGIN","id":"XYZ123"},{"type":"END","id":"XYZ123",},{"type":&......
  • python,替换标点符号但保持特殊单词完整的最佳方法
    我正在制作一个调制函数,它将采用带有特殊字符(@&*%)的关键字,并保持它们完整,同时从句子中删除所有其他标点符号。我设计了一个解决方案,但它非常庞大,而且可能比需要的更复杂。有没有一种方法可以以更简单的方式做到这一点。简而言之,我的代码匹配特殊单词的所有实例以查找跨度。然......
  • Python 检测 USB 设备 - IDLE 和 CMD 解释器之间的不同结果
    我正在尝试解决VDI解决方案中智能卡设备的USB重定向问题。我正在使用pyscard模块作为智能卡。对于进一步的上下文,主要问题是当浏览器插件调用用于处理智能卡的python脚本时,未检测到读卡器。关于问题,当我从CMD解释器运行此代码片段时,我收到空列表,表示系统上未找......
  • Python查找字符串的CRC32
    我尝试获取字符串数据类型变量的CRC32,但出现以下错误:>>>message='helloworld!'>>>importbinascii>>>binascii.crc32(message)Traceback(mostrecentcalllast):File"<stdin>",line1,in<module>TypeError:aby......
  • 使用python,如何创建重复的工作时间表
    这是我们公司的小组工作安排表。为三班制,2组日夜工作,1组休息。重复白天工作4天休息2天,然后再次夜间工作4天休息2天的时间表。我想使用python(pandas)自动安排在8月9日之后。抱歉英语不好,提前感谢您的帮助以下是使用Python和Pandas创建重复工作时间表的代码......
  • venv 已激活,但 pip 安装仍然默认进行,并且 python 在源代码中看不到该库
    在终端shell中的vscode中输入“whichpython”显示默认路径:C:\Users\erjan\AppData\Local\Programs\Python\Python311\python.exe(my_venv)但是(my_venv)意味着我的venv处于活动状态,我做了pipinstalltransformers,但下面的代码仍然显示错误-无法看到......
  • 在Python多处理中执行二进制信号量或互斥体以进行上下文切换操作
    我正在尝试自动化win应用程序和java应用程序之间的同步关系。我的标准是:启动win和jav应用程序在jav应用程序中执行命令等待jav应用程序的响应使用jav应用程序的响应到Windows应用程序作为输入。在jav应用程序中执行命令win应用程序......
  • 在spyder-python上随机出现的这些奇怪的亮点是什么
    在此处输入图像描述每次我单击此按钮或进行任何更改时,都会创建奇怪的突出显示,当我最小化功能时更是如此。有什么建议如何摆脱这些或可能的原因是什么?谢谢!我尝试更改外观首选项中的设置,但无法影响问题。很抱歉,我无法直接查看或与Spyder界面交互。我是一个AI......
  • 比较Python字典并找到缺失的元素
    我遇到了一个问题,我已经尝试了几天但没有得到任何结果。我想比较两个字典,在一个字典中有“赛前”足球比赛,在第二个字典中有“现场”足球比赛。我想将它们相互比较并打印它们(如果有)没有赛前比赛直播。示例1pre=[{"Home":"Genoa","Away":"In......
  • Python使用Visual Studio打印功能不显示输出
    任务:检查一个整数是正数还是负数。检查整数是否能被2整除。当输入0时,我需要退出循环并报告每个计数和总和。print函数没有显示任何输出。这是我从defmain()开始使用的代码defmain():countpositive=0countnegative=0count_divisible_by_2=0sump......