我正在运行 Airflow 版本 2.7.2,并且我不断地在触发器日志中看到该问题。
Triggerer's async thread was blocked for 0.50 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines.
我的触发器运行函数是这个,它是 BaseTrigger 的实现
async def run(self):
while not await self.check_if_job_finished():
await asyncio.sleep(60)
# Send our single event and then we're done
self.finished = True
yield TriggerEvent(self.job_id)
这是检查作业是否完成的函数,我用它来发送 Http 请求。
async def check_if_job_finished(self) -> bool:
self.log.info("Check if job finished.")
endpoint = 'abc.com'
headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
response_json = await async_http_util.async_get(url=endpoint + "/api/runtime/" + self.job_id,
headers=headers, verify_ssl=False)
if response_json['code'] == '0000':
status = response_json['result']['status']
if status == 'SUCCESS' or status == 'FAILED':
return True
else:
return False
else:
raise AirflowException("job status failed, response is: {}".format(response_json))
为了发送 http 请求,我使用 aiohttp。
import asyncio
import logging
from aiohttp import ClientSession
from airflow import AirflowException
log = logging.getLogger(__name__)
async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs):
"""
Send an asynchronous HTTP GET request with retry logic.
:param url: Request URL
:param retry_times: Maximum number of retry attempts
:param retry_wait_seconds: Retry interval (seconds)
:param kwargs: Additional parameters
:return: Response.json()
"""
for i in range(retry_times):
try:
async with ClientSession() as session:
async with session.get(url, **kwargs) as response:
if response.status == 200:
log.info(f"Request successful with {response.status}, text: {await response.text()}")
return await response.json()
else:
raise Exception(f"Http code is {response.status}, text: {response.text()} need to retry.")
except Exception as e:
log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...")
if i < retry_times - 1:
await asyncio.sleep(retry_wait_seconds)
else:
raise AirflowException(f"All retries failed. {e}")
我尝试增加 pod 中的资源,但没有帮助。
The error message "Triggerer's async thread was blocked for 0.50 seconds" indicates that your trigger's asynchronous operation (specifically the
check_if_job_finished
function) is taking too long to complete, causing the triggerer thread to stall.
Here's a breakdown of potential bottlenecks and solutions:
1. Network Latency:
-
Problem:
The
async_get
function makes an HTTP request toabc.com
. Network latency can significantly delay response times, especially if the external service is experiencing slowdowns. -
Solutions:
-
Optimize Network:
Ensure a stable and fast network connection between your Airflow worker and
abc.com
. -
Caching:
If the job status doesn't change frequently, implement a caching mechanism to reduce the number of requests to
abc.com
. Consider using Airflow's built-in caching features or external tools like Redis. -
Increase Timeout:
Adjust the timeout value in
async_get
to a more generous limit if the external service is inherently slow.
-
Optimize Network:
Ensure a stable and fast network connection between your Airflow worker and
2. External Service Performance:
-
Problem:
The performance of the external service (
abc.com
) directly impacts your trigger. If the service is overloaded or inefficient, your trigger will wait longer for a response. -
Solution:
Investigate the performance of
abc.com
and consider optimizing it or switching to a more performant alternative if possible.
3. Inefficient
async_get
Implementation:
-
Problem:
While the
async_get
function usesaiohttp
, the retry logic could be improved. Currently, retries occur synchronously within theasync
function, blocking the event loop. -
Solutions:
-
Asynchronous Retries: Implement asynchronous retries using
asyncio.sleep
within a loop to prevent blocking the event loop:python async def async_get(url, retry_times=10, retry_wait_seconds=10, **kwargs): for _ in range(retry_times): try: async with ClientSession() as session: async with session.get(url, **kwargs) as response: if response.status == 200: return await response.json() else: raise Exception(f"Http code is {response.status}, need to retry.") except Exception as e: log.error(f"Request failed with {e}, retrying in {retry_wait_seconds} seconds...") await asyncio.sleep(retry_wait_seconds) raise AirflowException(f"All retries failed.")
-
4. Resource Constraints (Less Likely):
-
Problem:
While you mentioned increasing pod resources didn't help, ensure your Airflow worker nodes have sufficient CPU and memory if the
check_if_job_finished
function performs CPU-intensive tasks.
Debugging Tips:
-
PYTHONASYNCIODEBUG=1
: As the error message suggests, enable this environment variable to get detailed debug information about asynchronous tasks. -
Profiling:
Use Python profilers (e.g.,
cProfile
) to pinpoint performance bottlenecks within your trigger code. -
Logging:
Add more verbose logging statements within your
check_if_job_finished
andasync_get
functions to monitor execution times and identify potential delays.
Remember:
-
The goal is to make your
check_if_job_finished
function as lightweight and asynchronous as possible to avoid blocking the triggerer thread. -
Prioritize optimizing network communication and the performance of the external service (
abc.com
) as they are the most likely culprits for delays in this scenario.