airflow

[airflow] CustomHook 1
https://indistract.tistory.com/68 [airflow] CustomOperator airflow에서는 기본적으로 제공해주는 오퍼레이터들이 많다. HTTP 요청을 보내는 SimpleHttpOperator, S3에서 데이터를 읽어와 MySQL 데이터베이스에 쓰는 데 사용되는 S3ToMySqlTransfer, 그리고 AWS S3에서 파일 indistract.tistory.com 위 게시글에서 CustomOperator를 사용해서 각 상황에 맞는 기능을 확장해 새로운 operator를 생성하는법을 배웠다. 커스텀한 operator를 만드는것은 중요하지만 나는 게시글의 마지막에 이런말을 남겼다. 다만, 기존의 오퍼레이터 기능이 충분하다면 굳이 커스텀 오퍼레이터를 만드는것은 비효율이다. 때문..
[airflow] CustomOperator
airflow에서는 기본적으로 제공해주는 오퍼레이터들이 많다. HTTP 요청을 보내는 SimpleHttpOperator, S3에서 데이터를 읽어와 MySQL 데이터베이스에 쓰는 데 사용되는 S3ToMySqlTransfer, 그리고 AWS S3에서 파일을 가져와서 사용자 지정 함수를 적용한 후 다시 S3에 저장하는 S3FileTransformOperator 등이 있다. 이런 기본적으로 제공해주는 오퍼레이터도 충분히 훌륭하지만, 상황에 따라서 원하는 기능을 충분히 구현하지 못할수도 있다. 예를들어, 여러 url에 HTTP 요청을 보내고싶다면 SimpleHttpOperator 로는 목적을 충분히 달성하지 못한다. 해당 operator는 하나의 url에 대한 리턴값만 출력하기 때문이다. 이때 사용되는것이 Cus..

[airflow] hook
https://indistract.tistory.com/62 [python] session과 cursor 우리는 python 코드를 이용해서 DB서버와 소통을 해야할 일이 종종있다. 코드를 이용해서 DB에 데이터를 읽거나 쓰려면은 ①session과 ②cursor에 대한 개념을 이해할 필요가 있다. 아래 코드에서 두 indistract.tistory.com 이전 게시글에서는 python 코드를 이용해서 Postgres DB와 소통하는 방법을 알아보았으며, 더불어 session과 cursor개념도 설명했다. 위 게시글의 마지막에 dag도 하나 작성해보았는데, 해당 dag는 치명적인 단점이 2가지 있다. ① DB 연결정보 변경시 관리가 번거롭다는점과 ② DB 연결정보가 코드에 노출된다는 것이다. 게시글의 마지..

[airflow] docker volume mount
docker container에서 사용된 파일은 container가 종료될때 함께 사라진다. 즉, airflow DAG도 마찬가지로 컨테이너가 종료되면 사라진다. 우리는 보통은 일회성으로 DAG를 사용하길 희망하진 않는다. 그래서 container가 생성될 때 외부(WSL)의 volume을 마운트 할 수 있도록 설정하면 DAG를 지속적으로 사용할 수 있다. ① WSL 내에 실행을 희망하는 DAG 파일이 존재한다는 전제하에, ② 'docker-compose.yaml' 파일을 수정하여 위 조건을 달성할 수 있다. 'docker-compose.yaml' 파일내에 'volumes' 항목을 수정하면 airflow와 WSL 내의 DAG를 연결해줄수 있다. 'volumes' 항목은 WSL의 디렉토리와 연결해줄 컨테이..

[airflow] Xcom
airflow에서 사용되는 xcom 은 Cross Communication의 약어이다. 이는 airflow dag 내부에서 task간 데이터 공유를 위해서 사용된다. 예를들어, task1의 수행결과를 task2의 입력으로 사용하고싶을때 xcom 을 이용해 구현할 수 있다. 이 기능은 주로 작은 데이터를 공유할때 사용되며 xcom의 내용은 메타 DB의 Xcom 테이블에 값이 저장된다. task에서 xcom을 사용하게되면 airflow 에서 task를 선택했을때 'Xcom'이라는 영역이 나타난다. 이를 클릭하면 xcom 테이블에 어떤값이 key-value 형태로 저장되었는지 확인할 수 있다. xcom을 사용하는 방법으로는 크게 2가지가 있다. ① **kwargs에 존재하는 'ti' (task instance..
[airflow] task 내 함수 선언
보통은 라이브러리를 import 할때는 아래처럼 가장 상단에 작성하는것이 일반적이다. from airflow import DAG import pendulum from airflow.decorators import task with DAG( dag_id="dags_python_with_macro", schedule="10 0 * * *", start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"), catchup=False ) as dag: 그러나 아래처럼 task decorator 안에서 라이브러리를 불러와서 사용하는 경우도 존재한다. @task(task_id='task_direct_calc') def get_datetime_calc(**kwargs): fr..

[airflow] WSL
Airflow는 Windos에 직접 설치가 불가능하다. 그래서 Window에 리눅스 서버를 올려서 Airflow를 설치해야하는데, 이를 위해서 WSL을 많이 이용한다. WSL은 Windows Subsystem for Linux의 약어로, 윈도우에서 리눅스 실행환경을 지원해주는 Windows의 확장 기능이다. WSL 이 있기 이전에도 Windows에서 가상머신(Virtual Machine, VM)을 이용해 Linux를 사용할 수 있었다. 가상머신은 하나의 물리적인 컴퓨터 안에 구축된 가상 컴퓨터이며, 온전한 컴퓨터 시스템(CPU/메모리/네트워크 인터페이스 및 스토리지까지 갖춘)으로 작동한다. 가상화 VM은 메모리 오버헤드가 심하다. 가상 머신은 하나의 물리적인 서버에서 독립적인 가상 환경을 운영할 수 있도..

[airflow] why airflow?
데이터는 일반적으로 추출(Extract), 적재(Load), 그리고 가공(Transfrom)하는 과정을 거쳐 최종적으로 사용된다. 이 데이터 흐름은 항상 원활하게 이루어지지는 않은데, 각 단계에서 오류가 발생할수 있기 때문이다. 예를들어, 데이터를 추출하기 위해 필요한 API가 갑자기 작동하지 않을 수 있고, 데이터를 적재하는 snowflake에서 오류가 발생할수 있고, 그리고 DBT를 이용해 데이터를 변환하는 과정에서 실수가 있었을 수도있다. 이렇게 각각의 단계에서 이슈가 발생할수 있고, 때문에 이런 것들을 관리할 도구가 필요하다. 이를 도와주는것이 airflow이다. 위와같은 데이터흐름이 하나만 존재한다면 airflow가 필요하지 않을지도 모르지만, 데이터 파이프라인이 늘어나면 늘어날수록 airflo..