이동준1
아웃풋 공부
이동준1
전체 방문자
오늘
어제
  • 분류 전체보기 (87)
    • airflow (8)
    • sql (23)
    • aws (12)
    • python (3)
    • 네트워크 (12)
    • 알고리즘 (2)
    • 짧은서평 (27)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

  • 고통의비밀
  • 서평
  • 통증해방 서평
  • 마음의기술 서평
  • 네트워크
  • 유연함의힘
  • AWS
  • 퓨처셀프
  • 고통의 비밀
  • regexp
  • 마음의기술
  • Network
  • 통증해방

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
이동준1

아웃풋 공부

[python] session과 cursor
python

[python] session과 cursor

2024. 1. 9. 00:21

우리는 python 코드를 이용해서 DB서버와 소통을 해야할 일이 종종있다. 코드를 이용해서 DB에 데이터를 읽거나 쓰려면은 ①session과 ②cursor에 대한 개념을 이해할 필요가 있다. 아래 코드에서 두 개념을 살펴보고자 한다. DB는 postgres를 이용한다.

 

먼저, python에서 postgres DB에 접속해서 sql을 수행하고, 그 결과를 가지고 올수있도록하기위해서 psycopg2 라이브러리를 불러와야한다.

 

해당 라이브러리를 이용해서, psgcopg2.connect() 내에 ip정보, db명, user명, password, 포트번호를 입력해주면 DB서버와의 연결을 만들수 있다. 그리고 이 DB서버와의 연결을 'session' 이라고 부른다.

import psycopg2
from contextlib import closing

with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
	..

 

 

그러나 위에서만든 session만으로는 DB에 바로 쿼리를 날려서 원하는 데이터를 가져오거나 줄 수 없다. session을 생성했으면 session 안에서 쿼리를 날리고 받아올수 있도록 하는 cursor 객체가 필요하다. cursor 객체가 실제로 sql문을 실행하는 주체이며, 이 객체는 session 객체를 통해서 생성할수 있다.  

 

import psycopg2
from contextlib import closing

with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
	with closing(conn.cursor()) as cursor:
    	..

 

이렇게 cursor 객체를 생성했으면, 수행하고자하는 로직들을 아래에 작성해준다. 필자는 airflow에서 코드를 작성했으며, dag_id, task_id, run_id, 그리고 'insrt 수행'이라는 텍스트를 DB에 insert하는 로직을 작성했다. 각각의 정보를 먼저 변수로 받은뒤, 이후 바인딩 변수에 각각의 정보가 기입되도록 작성되어있다.

 

cursor.execute() 부분에서 수행해야할 sql문과 바인딩해야할 변수들을 아래와 같이 기입해주면, 각각의 변수들이 '%s' 부분에 매핑되어 sql문에 실행되게 된다.

 

import psycopg2
from contextlib import closing

with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
    with closing(conn.cursor()) as cursor:
        dag_id = kwargs.get('ti').dag_id
        task_id = kwargs.get('ti').task_id
        run_id = kwargs.get('ti').run_id
        msg = 'insrt 수행'
        sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
        cursor.execute(sql, (dag_id, task_id, run_id, msg))
        conn.commit()

 

 

위 코드를 airflow dag로 구성하면 아래와 같이 작성할수 있다. 위의 코드들을 'insrt_postgres'라는 함수로 선언해주고, 이 함수를 이용해서 PythonOperator를 생성한뒤, task를 실행했다.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='dags_python_with_postgres',
    start_date=pendulum.datetime(2024,1,1, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:

    def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
        import psycopg2
        from contextlib import closing

        with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn:
            with closing(conn.cursor()) as cursor:
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
                cursor.execute(sql, (dag_id, task_id, run_id, msg))
                conn.commit()
        
    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_args=['172.28.0.3', '5432', 'djlee', 'djlee', 'djlee']
    )

    insrt_postgres

 

위 DAG를 실행하면 해당값이 DB에 잘 insert된것을 확인할 수 있다.

'python' 카테고리의 다른 글

[python] OOP  (0) 2024.01.20
[python] decorator  (0) 2023.12.16
    'python' 카테고리의 다른 글
    • [python] OOP
    • [python] decorator
    이동준1
    이동준1

    티스토리툴바