1. 함수생성
postgre에서 먼저 파티션 생성 함수를 만들어야합니다.
CREATE OR REPLACE FUNCTION "MEASURE".create_monthly_partition(target_date date)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
start_date date := date_trunc('month', target_date);
end_date date := start_date + interval '1 month';
partition_name text := format('meas_data_%s', to_char(start_date, 'YYYYMM'));
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = partition_name AND n.nspname = 'MEASURE'
) THEN
EXECUTE format('
CREATE TABLE "MEASURE".%I
PARTITION OF "MEASURE".meas_data
FOR VALUES FROM (%L) TO (%L);',
partition_name, start_date, end_date
);
END IF;
END;
$function$
;
2. dag작성
현재달 기준 다음달의 파티션이 생성되고
이 dag는 매달 1일 01:00에 실행됩니다.
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
with DAG(
dag_id="monthly_partition_creator",
start_date=datetime(2025, 6, 30),
schedule_interval="0 1 1 * *",
catchup=False,
max_active_runs=1,
tags=["partition"]
) as dag:
create_partition = PostgresOperator(
task_id="create_next_month_partition",
postgres_conn_id="postgres_db",
sql='SELECT "MEASURE".create_monthly_partition((current_date + interval \'1 month\')::date);',
autocommit=True,
)
create_partition
반응형
'웹 개발 > 🐍 Python' 카테고리의 다른 글
Python 타입 힌트에서 tuple, list, dict (1) | 2025.07.01 |
---|---|
Python 자주 쓰는 예외 클래스 (1) | 2025.07.01 |
Python 정규표현식 re.match 사용법 (1) | 2025.07.01 |
Python 타입 힌트 (0) | 2025.07.01 |
리스트 컴프리헨션(list comprehension) (1) | 2025.06.30 |