이동준1
아웃풋 공부
이동준1
전체 방문자
오늘
어제
  • 분류 전체보기 (84)
    • airflow (8)
    • sql (23)
    • aws (12)
    • python (3)
    • 네트워크 (12)
    • 알고리즘 (2)
    • 짧은서평 (24)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • regexp
  • Network
  • 유연함의힘
  • 네트워크
  • 퓨처셀프
  • 고통의 비밀
  • 고통의비밀
  • AWS
  • 서평

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
이동준1

아웃풋 공부

[airflow] Xcom
airflow

[airflow] Xcom

2023. 12. 12. 00:10

airflow에서 사용되는 xcom 은 Cross Communication의 약어이다. 이는 airflow dag 내부에서 task간 데이터 공유를 위해서 사용된다. 예를들어, task1의 수행결과를 task2의 입력으로 사용하고싶을때 xcom 을 이용해 구현할 수 있다.

 

이 기능은 주로 작은 데이터를 공유할때 사용되며 xcom의 내용은 메타 DB의 Xcom 테이블에 값이 저장된다. task에서 xcom을 사용하게되면 airflow 에서 task를 선택했을때 'Xcom'이라는 영역이 나타난다. 이를 클릭하면 xcom 테이블에 어떤값이 key-value 형태로 저장되었는지 확인할 수 있다.

 

 

XCom 테이블 예시

 

xcom을 사용하는 방법으로는 크게 2가지가 있다.

 

① **kwargs에 존재하는 'ti' (task instance) 객체 활용

② python 함수의 return 값 활용

 

airflow상에서 **kwargs에는 dictionary 형태로 많은 변수들이 존재하는데, 그중에 'ti' (task instance)라는 객체도 존재한다. ti 객체에는 'xcom_push'와 'xcom_pull'이라는 미리 만들어져있는 함수가있는데, 이를 활용하면 xcom에 데이터를 올리거나 받아오는것이 가능하다

 

아래 dag의 'python_xcom_push_task' 를 참고하면 kwargs에서 'ti' 객체를 먼저 불러온뒤, ti.xcom_push를 이용해서 데이터를 key-value 형태로 올리는것을 확인할 수 있다. 이 데이터는 xcom 테이블에 저장되며, 우리가 위에서 확인했던 스크린샷의 형태처럼 저장된다.

 

그리고 'python_xcom_pull_task' 에서 ti.xcom_pull 을 통해서 어떤 key에 해당하는 value를 취할것인지 key값을 입력해 해당하는 value를 각각 value1, value2에 저장해주는것을 확인할 수 있다. 추가적으로 task_ids 를 입력해줄수도있는데, 이는 각각의 다른 task에서 같은 key값에 다른 value를 저장하는경우에 사용하면 유용하다. 만약 xcom 테이블에 동일한 이름의 여러 key값이 존재하는데 task_ids를 지정해주지 않는다면, 가장 마지막에 테이블에 추가된 데이터가 출력되게 된다.

 

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="10 0 * * * ",
    start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    
    @task(task_id='python_xcom_push_task')
    def xcom_push(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)

    xcom_push() >> xcom_pull()

 

그리고 두번째로 python 함수의 return값을 이용해 xcom을 사용할수있는데, 이 방식이 독특하다.

 

아래 dag는 위처럼 xcom에 데이터를 push/pull 하는 과정이 없다. 그러나 airflow는 task decorator 내 함수에서 return을 하게되면 그 값이 자동으로 xcom 테이블에 올라가게 된다 (이는 task decorator를 사용하는 장점중 하나이다). 아래 dag는 이러한 특성을 이용한 사용법이다.

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'

    @task(task_id='python_xcom_pull')
    def xcom_pull(status, **kwargs):
        print('함수 입력값으로 받은 값:' + status)


    xcom_pull(xcom_push_result())

 

첫번째 task에서 출력한 'Success' 라는 return값은 XCom 테이블에 저장된다. 여기서 return값은 key값을 따로 지정하지는 않는데, key값은 'return_value' 라고 defalut로 지정된다. 그래서 위 dag처럼 return값을 두번째 task에 직접 입력값으로 주지 않더라도, ti.xcom_pull(key='return_value') 를 이용하면 Xcom 테이블의 데이터를 취할수 있다.

 

그리고 마지막에 task decorator를 사용할때, 함수의 입출력 관계를 xcom_pull(xcom_push_result()) 로 설정해주었는데 이 관계만으로 task flow가 정의되게 된다는점도 알아두면 좋다. ( python_xcom_push_by_return >> python_xcom_pull).

'airflow' 카테고리의 다른 글

[airflow] hook  (0) 2024.01.10
[airflow] docker volume mount  (0) 2023.12.25
[airflow] task 내 함수 선언  (1) 2023.12.10
[airflow] WSL  (0) 2023.12.03
[airflow] why airflow?  (0) 2023.09.17
    'airflow' 카테고리의 다른 글
    • [airflow] hook
    • [airflow] docker volume mount
    • [airflow] task 내 함수 선언
    • [airflow] WSL
    이동준1
    이동준1

    티스토리툴바