Apache airflow

예를들면
plc_run_now_30sec_dag -> convert_to_nosql -> insert_abac, insert_ptn
위 순서대로 실행해야한다고 가정해보겠습니다. (맨마지막 2개는 병렬로 실행)
A dag가 끝나야 B dag 가 실행되게 하고싶었습니다.
여러가지 방법이 있었는데 저는 dataset을 활용했습니다.
dag가 성공적으로 종료되면 dataset에 업데이트 신호를 전파하고
B dag에서 그 업데이트 신호를 인지하면 실행되게끔 trigger를 설정했습니다.
결과적으로 A dag에서 B dag를 직접 호출 및 실행하지 않고도 실행할 수 있습니다.
1. plc_run_now_30sec_dag.py
API 호출하는 dag입니다.
API호출을 위해 SimpleHttpOperator를 사용합니다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.datasets import Dataset
# A-DAG가 성공적으로 API를 호출했음을 알리는 Dataset 정의
ds_run_now_done = Dataset("dataset://run-now-done")
# DAG 정의: 30초 간격으로 실행되며, 과거 실행은 catchup=False로 건너뜀
with DAG(
dag_id="plc_run_now_30sec_dag", # DAG 고유 이름
start_date=datetime(2025, 6, 30), # 스케줄 기준 시작일 (고정된 과거 날짜)
schedule_interval=timedelta(seconds=30), # 30초마다 실행
catchup=False, # 과거 미실행 주기 건너뜀
max_active_runs=1, # 동시에 하나의 실행만 허용
default_args={ # 기본 인자: 재시도 설정
"retries": 2, # 실패 시 최대 2회 재시도
"retry_delay": timedelta(minutes=1), # 재시도 간격 1분
},
) as dag:
# 시작 표시용 빈 연산자(의존성 구분용)
start = EmptyOperator(
task_id="start", # 태스크 아이디
)
# 외부 API를 호출하는 태스크
call_run_now_api = SimpleHttpOperator(
task_id="call_run_now_api", # 태스크 아이디
http_conn_id="backend_api", # Admin > Connections에 등록된 HTTP 커넥션 아이디
endpoint="/api/run-now", # 호출할 API 경로
method="POST", # HTTP 메서드
headers={"Content-Type": "application/json"}, # HTTP 헤더
data='{"trigger":"airflow"}', # POST 바디 데이터
log_response=True, # 응답 내용을 로그에 남김
outlets=[ds_run_now_done], # 성공 시 Dataset에 “업데이트” 신호 전파
)
# 태스크 순서 정의: start → call_run_now_api
start >> call_run_now_api
outlets에 넣어주면 업데이트 신호가 전파됩니다.
하단에 backend_api 생성하는법



2. convert_to_sql.py
postgreSQL에서 함수를 호출하는 dag입니다.
PostgresOperator를 사용합니다.
앞단에서 만든 데이터셋을 schedule에 넣으면
데이터셋이 업데이트 될때 실행되게 됩니다.
마찬가지로 convert_to_sql 도 다음에 실행해야할 dag가 있기때문에
새 dataset을 만들어줍니다.(ds_nosql_done)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.datasets import Dataset
from airflow.providers.postgres.operators.postgres import PostgresOperator
# ─────────────────────────────────────────────────────────────────────────────
# Dataset 정의
# A-DAG(plc_run_now_30sec_dag)가 API 호출 성공 시 생산하는 신호
ds_run_now_done = Dataset("dataset://run-now-done")
# B-DAG(convert_to_nosql)이 완료된 후 C-DAG로 전달할 신호
ds_nosql_done = Dataset("dataset://nosql-done")
# ─────────────────────────────────────────────────────────────────────────────
with DAG(
dag_id="convert_to_nosql", # DAG 고유 이름
start_date=datetime(2025, 6, 30), # 스케줄 기준 시작일 (고정된 과거 날짜)
schedule=[ds_run_now_done], # ds_run_now_done 업데이트 시 자동 트리거
catchup=False, # 과거 미실행 주기는 건너뜀
max_active_runs=1, # 동시에 하나의 실행만 허용
default_args={ # 기본 인자: 재시도 설정
"retries": 1, # 실패 시 1번 재시도
"retry_delay": timedelta(minutes=2), # 재시도 간격 2분
},
) as dag:
# PostgreSQL 함수 호출 태스크
run_pg_function = PostgresOperator(
task_id="run_meas_func", # 태스크 아이디
postgres_conn_id="postgres_db", # Admin > Connections에 등록된 Postgres 커넥션 ID
sql='SELECT "YOURSCHEME".func_meas_data_to_raw_jsonb();', # 실행할 SQL (함수 호출)
autocommit=True, # 함수 내부 트랜잭션 처리 시 자동 커밋
outlets=[ds_nosql_done], # 성공 시 ds_nosql_done Dataset 업데이트
)
마찬가지로 작업전에 postgres connetion을 생성해야합니다.

3.insert_data.py
그다음 순서의 dag도 동일한 형식으로
앞단의 dataset을 schedule에 넣어줍니다.
from datetime import datetime
from airflow import DAG
from airflow.datasets import Dataset
from airflow.providers.postgres.operators.postgres import PostgresOperator
ds_nosql_done = Dataset("dataset://nosql-done") # B-DAG이 생산
with DAG(
dag_id="insert_data",
start_date=datetime(2025, 6, 30),
schedule=[ds_nosql_done], # Dataset 트리거
catchup=False,
max_active_runs=1,
) as dag:
run_insert_data = PostgresOperator(
task_id="run_insert_data",
postgres_conn_id="postgres_db",
sql='SELECT "YOURSCHEME".insert_datata();',
autocommit=True,
)
run_insert_data
다 작성하면 Datasets탭을 눌러 flow를 확인할 수 있습니다.

'백엔드 > 🐧 Linux' 카테고리의 다른 글
| virtual box 용량늘리기 (0) | 2025.10.13 |
|---|---|
| Apache Airflow 설치 (0) | 2025.06.27 |
| Linux | 도커 용량문제 (0) | 2025.05.29 |
| Ubuntu | 특정사용자 sudo 비밀번호 요구 예외거는법 (0) | 2025.05.15 |
| 리눅스 우분투 설치시 필수사항 (0) | 2025.03.25 |