우리는 python 코드를 이용해서 DB서버와 소통을 해야할 일이 종종있다. 코드를 이용해서 DB에 데이터를 읽거나 쓰려면은 ①session과 ②cursor에 대한 개념을 이해할 필요가 있다. 아래 코드에서 두 개념을 살펴보고자 한다. DB는 postgres를 이용한다.
먼저, python에서 postgres DB에 접속해서 sql을 수행하고, 그 결과를 가지고 올수있도록하기위해서 psycopg2 라이브러리를 불러와야한다.
해당 라이브러리를 이용해서, psgcopg2.connect() 내에 ip정보, db명, user명, password, 포트번호를 입력해주면 DB서버와의 연결을 만들수 있다. 그리고 이 DB서버와의 연결을 'session' 이라고 부른다.
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
..
그러나 위에서만든 session만으로는 DB에 바로 쿼리를 날려서 원하는 데이터를 가져오거나 줄 수 없다. session을 생성했으면 session 안에서 쿼리를 날리고 받아올수 있도록 하는 cursor 객체가 필요하다. cursor 객체가 실제로 sql문을 실행하는 주체이며, 이 객체는 session 객체를 통해서 생성할수 있다.
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
..
이렇게 cursor 객체를 생성했으면, 수행하고자하는 로직들을 아래에 작성해준다. 필자는 airflow에서 코드를 작성했으며, dag_id, task_id, run_id, 그리고 'insrt 수행'이라는 텍스트를 DB에 insert하는 로직을 작성했다. 각각의 정보를 먼저 변수로 받은뒤, 이후 바인딩 변수에 각각의 정보가 기입되도록 작성되어있다.
cursor.execute() 부분에서 수행해야할 sql문과 바인딩해야할 변수들을 아래와 같이 기입해주면, 각각의 변수들이 '%s' 부분에 매핑되어 sql문에 실행되게 된다.
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
위 코드를 airflow dag로 구성하면 아래와 같이 작성할수 있다. 위의 코드들을 'insrt_postgres'라는 함수로 선언해주고, 이 함수를 이용해서 PythonOperator를 생성한뒤, task를 실행했다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres',
start_date=pendulum.datetime(2024,1,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'djlee', 'djlee', 'djlee']
)
insrt_postgres
위 DAG를 실행하면 해당값이 DB에 잘 insert된것을 확인할 수 있다.
'python' 카테고리의 다른 글
[python] OOP (0) | 2024.01.20 |
---|---|
[python] decorator (0) | 2023.12.16 |