https://indistract.tistory.com/68
[airflow] CustomOperator
airflow에서는 기본적으로 제공해주는 오퍼레이터들이 많다. HTTP 요청을 보내는 SimpleHttpOperator, S3에서 데이터를 읽어와 MySQL 데이터베이스에 쓰는 데 사용되는 S3ToMySqlTransfer, 그리고 AWS S3에서 파일
indistract.tistory.com
위 게시글에서 CustomOperator를 사용해서 각 상황에 맞는 기능을 확장해 새로운 operator를 생성하는법을 배웠다. 커스텀한 operator를 만드는것은 중요하지만 나는 게시글의 마지막에 이런말을 남겼다.
다만, 기존의 오퍼레이터 기능이 충분하다면 굳이 커스텀 오퍼레이터를 만드는것은 비효율이다. 때문에 커스텀 오퍼레이터를 생성하기전에 기존의 오퍼레이터를 충분히 탐색하는 과정은 필수적이다.
우리는 이번시간에 api를 통해 받아둔 csv 파일을 postgres db 에 insert하는 dag를 하나 만들고자 한다. 그리고 바로 커스텀한 로직을 구현하기 전에 과연 airflow 내의 기능을 사용해서 주어진 문제를 해결할수는 없는지 탐색하는 과정을 거쳐보려한다.
먼저, postgres db에 연결하기 위해 postgres hook을 사용할 것이다. hook은 특정 솔루션을 제어할 수 있도록 method를 제공하는데, 이를 이용해보고자 한다. 우리의 목적은 csv파일을 postgres db에 올리는것이므로 bulk_load라는 method를 한번 사용해볼것이다. postgres hook의 명세는 아래에서 확인 가능하니 bulk_load 메서드를 사용하기 위해 확인해보자.
명세의 bluk_load의 설명을 우선 읽어보면 상당히 불친절하다.
"Loads a tab-delimited file into a database table"
이 짤막한 설명 하나가 전부다. 파라미터를 어떤 형태로 줘야하는지 명료하지 않다. 단지 tab으로 구분된 파일을 database에 전달한다는 말밖에 없다. 이런 경우에는 source code를 한번 확인해볼 필요가 있다.
bulk_load 메서드는 한줄로 정의되어있다. 이 함수는 클래스의 copy_expert라는 메서드를 사용하고 있고, 이 copy_expert라는 메서드를 다시 찾아가 확인해볼 필요가 있어보인다.
위처럼 copy_expert 메서드를 찾아가보면 postgres와의 연결을 만들어주고, cur 객체에서 copy_expert 메서드를 사용하며 sql문과 파일을 인자로 전달해주는 것을 확인할 수 있다. copy_expert가 어떤 기능을 하는지 알아보려면 구글에 psycopg2 라이브러리를 검색해서 해당 매서드를 확인하면되는데, 들어가서 확인해보면 마찬가지로 설명이 충분하지 않다.
bulk_load 메서드는 아마 설명대로 tab으로 구분된 파일을 database에 전달하긴 하겠지만 아래 3가지 정도가 불확실하다.
1. delimiter가 tab이 아니면 안되는지
2. 테이블이 없다면 생성해주는지
3. 헤더가 포함되도 되는지
그래서 결국 이런 경우는 코드를 직접 실행해보며 trouble shooting하는 방법밖에 없다. 결론적으로 직접 실행해보면 delimiter가 ',' 인 경우와 테이블이 존재하지 않는 경우는 에러가 발생한다. 그리고 데이터에 헤더가 포함되면 헤더를 하나의 row로 인식해서 db에 insert 해버린다.
이러한 bulk_load의 한계점때문에 우리는 아래 4가지 조건을 만족하는 CustomHook을 하나 생성하고자 한다.
1. delimiter 유형을 입력받게 하고
2. header 여부를 선택하게하며
3. 데이터에 특수문자가 있다면 제거하는 로직을 추가하고
4. 테이블이 없다면 생성후에 데이터를 업로드
이 보완점을 개선해서 CustomHook 생성하는 방법은 다음 게시글에서 알아보겠다.
'airflow' 카테고리의 다른 글
[airflow] CustomOperator (0) | 2024.01.23 |
---|---|
[airflow] hook (0) | 2024.01.10 |
[airflow] docker volume mount (0) | 2023.12.25 |
[airflow] Xcom (0) | 2023.12.12 |
[airflow] task 내 함수 선언 (1) | 2023.12.10 |