보통은 라이브러리를 import 할때는 아래처럼 가장 상단에 작성하는것이 일반적이다.
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
그러나 아래처럼 task decorator 안에서 라이브러리를 불러와서 사용하는 경우도 존재한다.
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_calc()
이는 스케줄러 부하 경감을 위해서이다.
스케줄러는 우리가 만든 dag를 주기적으로 파싱한다. 다시말해, dag에 문법적인 오류가 있는지 없는지 주기적으로 검토한다. dag가 실행되지 않아도 파싱은 진행되며, 아래 3가지 부분이 검토 대상이다.
① 코드 가장 상단에 라이브러리 import 하는 부분
② dag 선언 직전의 부분
③ task operator 선언하기 직전 부분
따라서, 이 세가지 영역에서 내용이 많을수록 스케줄러에 많은 부하가 발생한다. 규모가 작다면 문제가 되지 않을수도 있겠지만, 대규모 환경에서는 스케줄러 부하 문제가 종종 문제가되며 따라서 가급적 스케줄러 부하를 줄이는 방법으로 코드를 작성할 필요가 있다.
그리고 부하를 줄이는 한가지 방법이 operator 안에서만 사용할 함수는 operator 안에서 라이브러리를 호출해서 사용하는 것이다. 스케줄러는 operator 내부는 검사하지 않아 (위 세가지 경우에 속하지 않는다), operator 내부에 함수를 선언하면 스케줄러 부하가 줄어든다.
'airflow' 카테고리의 다른 글
[airflow] hook (0) | 2024.01.10 |
---|---|
[airflow] docker volume mount (0) | 2023.12.25 |
[airflow] Xcom (0) | 2023.12.12 |
[airflow] WSL (0) | 2023.12.03 |
[airflow] why airflow? (0) | 2023.09.17 |