이동준1
아웃풋 공부
이동준1
전체 방문자
오늘
어제
  • 분류 전체보기 (84)
    • airflow (8)
    • sql (23)
    • aws (12)
    • python (3)
    • 네트워크 (12)
    • 알고리즘 (2)
    • 짧은서평 (24)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • 네트워크
  • 고통의 비밀
  • 고통의비밀
  • 서평
  • AWS
  • regexp
  • 유연함의힘
  • 퓨처셀프
  • Network

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
이동준1

아웃풋 공부

airflow

[airflow] CustomOperator

2024. 1. 23. 00:20

airflow에서는 기본적으로 제공해주는 오퍼레이터들이 많다. HTTP 요청을 보내는 SimpleHttpOperator, S3에서 데이터를 읽어와 MySQL 데이터베이스에 쓰는 데 사용되는 S3ToMySqlTransfer, 그리고 AWS S3에서 파일을 가져와서 사용자 지정 함수를 적용한 후 다시 S3에 저장하는 S3FileTransformOperator 등이 있다.

 

이런 기본적으로 제공해주는 오퍼레이터도 충분히 훌륭하지만, 상황에 따라서 원하는 기능을 충분히 구현하지 못할수도 있다. 예를들어, 여러 url에 HTTP 요청을 보내고싶다면 SimpleHttpOperator 로는 목적을 충분히 달성하지 못한다. 해당 operator는 하나의 url에 대한 리턴값만 출력하기 때문이다.

 

이때 사용되는것이 CustomOperator이다. airflow는 오퍼레이터를 직접만들어 사용할수 있도록 BaseOperator라는 클래스를 제공한다. BaseOperator를 상속하면 직접 커스텀한 operator를 만들수 있는데, 상속시 2가지 메서드를 재정의(override) 해야한다.

 

    ① def __init__

    ② def execute

 

1번은 객체 생성시 객체에대한 초기값을 지정하는 함수이며, 2번에는 구현하고자하는 커스텀한 로직을 작성하면 된다. 기존 operator에서는 제공하지 않아 불편했던 기능들을 정리해서 2번에 코드로 적으면 되는 것이다. 

 

▼ custom operator 생성 예시 ▼

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

 

위처럼 BaseOperator를 상속받아서 클래스를 생성해두면, 우리는 dag에서 생성해둔 클래스(operator)를 불러와서 사용할 수 있다.

 

/dags
/plugins
    ㄴ operators
        ㄴ hello_operator.py
            ㄴ HelloOperator

 

만약, 생성한 custom operator 파이썬 파일이 dags 폴더와 같은 레벨에 있는 /plugins 폴더 내의 operators 폴더내에 있다면, 생성한 커스텀 오퍼레이터를 아래처럼 호출해서 사용하면 된다.

from operators.hello_operator import HelloOperator

with dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")

 

이런식으로 BaseOperator를 상속해서 커스텀한 로직을 만들어 사용할 수 있다. 뿐만 아니라 자주쓰이는 기능이라면 재사용에도 편리하고, PythonOperator를 사용할때보다 관리가 더 용이해진다는 장점이 있다.

 

다만, 기존의 오퍼레이터 기능이 충분하다면 굳이 커스텀 오퍼레이터를 만드는것은 비효율이다. 때문에 커스텀 오퍼레이터를 생성하기전에 기존의 오퍼레이터를 충분히 탐색하는 과정은 필수적이다.

'airflow' 카테고리의 다른 글

[airflow] CustomHook 1  (0) 2024.02.03
[airflow] hook  (0) 2024.01.10
[airflow] docker volume mount  (0) 2023.12.25
[airflow] Xcom  (0) 2023.12.12
[airflow] task 내 함수 선언  (1) 2023.12.10
    'airflow' 카테고리의 다른 글
    • [airflow] CustomHook 1
    • [airflow] hook
    • [airflow] docker volume mount
    • [airflow] Xcom
    이동준1
    이동준1

    티스토리툴바