https://indistract.tistory.com/62
[python] session과 cursor
우리는 python 코드를 이용해서 DB서버와 소통을 해야할 일이 종종있다. 코드를 이용해서 DB에 데이터를 읽거나 쓰려면은 ①session과 ②cursor에 대한 개념을 이해할 필요가 있다. 아래 코드에서 두
indistract.tistory.com
이전 게시글에서는 python 코드를 이용해서 Postgres DB와 소통하는 방법을 알아보았으며, 더불어 session과 cursor개념도 설명했다. 위 게시글의 마지막에 dag도 하나 작성해보았는데, 해당 dag는 치명적인 단점이 2가지 있다.
① DB 연결정보 변경시 관리가 번거롭다는점과
② DB 연결정보가 코드에 노출된다는 것이다.
게시글의 마지막 코드부분을 다시 살펴보면 ip정보, port, db명, user, password 정보를 argument로 전달하며, 이 정보는 코드를 열람하는 모두가 확인할 수 있다. 심지어 나는 이 정보를 public repository에 올렸다. 내 git을 방문하는 사람은 모두 이 내 접속정보를 가져갈수 있다.
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'djlee', 'djlee', 'djlee']
)
insrt_postgres
이 두가지 문제점을 해결하기위해 airflow의 hook을 이용할 수 있다. hook을 이용하기 위해서는 먼저 ① airflow UI 화면에서 Connection 정보를 등록해야한다.
정보를 등록하려면, airflow UI 화면에서 Admin을 클릭후 Connections 메뉴에 들어가 커넥션 추가버튼(파란 + 버튼)을 누른다.
해당버튼을 클릭하면 신규 Connection을 생성할수 있고, 나는 'conn-db-postgres-custom' 이라는 Conn Id로 신규 posgres 컨테이너 커넥션을 만들었다. 해당 postgres 커넥션에는 ip정보, port, shema, user, password, 그리고 description을 입력할수 있다.
생성을 정상적으로 마치면 아래와 같이 List Connection에 커넥션이 추가된다.
② 이제 이렇게 등록된 Connection 정보를 이용해서 Hook 객체를 생성할 수 있다. Hook 객체는 우리가 Connection에 등록했던 연결 정보를 가져와서 DB서버와 연결을 만들어준다 (session 생성).
이전에 우리가 코드에 접속정보를 모두 적었던반면, hook은 접속정보를 Connection을 통해 받아옴으로 접속정보가 코드상에서 노출되지 않는다.
hook 객체를 사용하게되면 아래와같이 코드를 변경할 수 있다. hook을 사용함으로 session을 정의하는 부분이 변경되었으며, PythonOperator에서는 접속정보를 전달하는것이 아닌 Conn Id ('conn-db-postgres-custom')를 전달하는것으로 변경되었다.
이로써 우리는 코드에 접속정보를 직접 노출시키지 않음으로 보안을 강화했으며, 접속정보를 일원화해서 관리할수 있게 되는 이점 또한 얻었다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres_hook',
start_date=pendulum.datetime(2024, 1, 1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, **kwargs):
from airflow.providers.postgres.hooks.postgres import PostgresHook
from contextlib import closing
postgres_hook = PostgresHook(postgres_conn_id)
with closing(postgres_hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'hook insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
insrt_postgres_with_hook = PythonOperator(
task_id='insrt_postgres_with_hook',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}
)
insrt_postgres_with_hook
위 dag를 실행시, 아래처럼 Postgres에 행이 잘 추가된것을 확인할 수 있다.
'airflow' 카테고리의 다른 글
[airflow] CustomHook 1 (0) | 2024.02.03 |
---|---|
[airflow] CustomOperator (0) | 2024.01.23 |
[airflow] docker volume mount (0) | 2023.12.25 |
[airflow] Xcom (0) | 2023.12.12 |
[airflow] task 내 함수 선언 (1) | 2023.12.10 |