구조
airflow-docker/
├── airflow
└── dags # git을 활용하여 dag 버전관리
├── dags/ # DAG 정의 파일(.py) 보관 폴더
├── logs/ # Airflow 로그 저장 폴더
├── plugins/ # 사용자 플러그인 보관 폴더
└── docker-compose.yml # 전체 스택 구성 파일
└── .env # 환경변수 파일
docker-compose.yml 작성
services:
redis:
image: redis:7
container_name: airflow-redis
restart: always
ports:
- "6380:6379"
airflow-init:
image: apache/airflow:2.6.3
container_name: airflow-init
restart: "no"
env_file: .env
depends_on:
- redis
extra_hosts:
- "host.docker.internal:host-gateway"
entrypoint:
- bash
- -c
- |
airflow db init && \
airflow users create \
--username yourid \
--firstname Jaewon \
--lastname Lee \
--role Admin \
--email admin@example.com \
--password yourpassword
environment:
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://yourdbid:yourdbpassword@host.docker.internal:5432/yourdb
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW_UID=50000
- AIRFLOW_GID=50000
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
webserver:
image: apache/airflow:2.6.3
container_name: airflow-webserver
restart: always
env_file: .env
depends_on:
- airflow-init
- redis
extra_hosts:
- "host.docker.internal:host-gateway"
command: webserver --port 8080
ports:
- "8388:8080" # Host 8388 → Container 8080
environment:
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://yourdbid:yourdbpassword@host.docker.internal:5432/yourdb
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW_UID=50000
- AIRFLOW_GID=50000
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
scheduler:
image: apache/airflow:2.6.3
container_name: airflow-scheduler
restart: always
env_file: .env
depends_on:
- airflow-init
- redis
extra_hosts:
- "host.docker.internal:host-gateway"
command: scheduler
environment:
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://yourdbid:yourdbpassword@host.docker.internal:5432/yourdb
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW_UID=50000
- AIRFLOW_GID=50000
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
networks:
default:
driver: bridge
.env 작성
# .env
# Airflow 컨테이너 내부 유저/그룹 ID (선택)
AIRFLOW_UID=50000
AIRFLOW_GID=50000
# PostgreSQL (공유 DB)
POSTGRES_HOST=서비스이름 # compose 내 서비스 이름
POSTGRES_PORT=5432 # 호스트↔컨테이너 매핑: 5432:5432
POSTGRES_USER=db계정
POSTGRES_PASSWORD=db비번
POSTGRES_DB=db명
# Airflow Core 설정
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__LOAD_EXAMPLES=False
도커 실행
docker compose up -d
스택상태 확인
docker compose ps
웹 UI 접속 & 로그인
http://localhost:8388
dag작성
위에서 생성한 airflow/dags폴더로 이동하여
예시로 hello_world.py 파일을 만듭니다.
vi hello_world.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id='hello_world',
start_date=datetime(2025, 6, 20),
schedule_interval='@daily',
catchup=False,
) as dag:
say_hi = BashOperator(
task_id='say_hi',
bash_command='echo "Hello, Airflow!"'
)
웹ui 확인

dag 파일 작성법
dag_id 는 보통 py파일 명과 동일하게 작성한다.
cron
# 매주 월요일 03:30 UTC 에 실행
schedule_interval='30 3 * * 1'
# 매 15분마다
schedule_interval='*/15 * * * *'
# 매 10초마다 (실험용)
schedule_interval=timedelta(seconds=10)
프리셋
'@hourly', '@daily' @hourly 매시 정각, @daily 매일 자정 등
dag 예시
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, timedelta
# 1. 공통 인자 설정
default_args = {
'owner': 'nivus_dev',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
# 2. DAG 정의
with DAG(
dag_id='example_full_dag',
default_args=default_args,
description='An example DAG showing scheduling, sensors, and tasks',
start_date=datetime(2025, 6, 27, 0, 0, 0),
schedule_interval='0 6 * * *', # 매일 06:00 UTC (cron 표현)
catchup=False,
max_active_runs=1,
tags=['example', 'full'],
) as dag:
# 3. 시간 센서: 매일 06:00이 될 때까지 대기
wait_for_6am = TimeSensor(
task_id='wait_for_6am',
target_time=time(hour=6, minute=0),
poke_interval=60, # 60초 간격으로 체크
timeout=3600, # 최대 1시간 대기
)
# 4. BashOperator: 현재 날짜 출력
print_date = BashOperator(
task_id='print_date',
bash_command='date "+%Y-%m-%d %H:%M:%S"',
)
# 5. PythonOperator: 간단한 파이썬 함수 실행
def greet(**context):
print("Hello from PythonOperator! Today is", context['ds'])
say_hello = PythonOperator(
task_id='say_hello',
python_callable=greet,
provide_context=True,
)
# 6. 의존성 설정
wait_for_6am >> print_date >> say_hello
오퍼레이터
Airflow에는 다양한 작업 목적에 맞춘 수십개의 Operator가 제공됩니다. 크게 카테고리별로 나누면 다음과 같습니다.
1. 기본연산자
BashOperator
: 셀 명령을 실행합니다.
from airflow.operators.bash import BashOperator
PythonOperator
: 파이썬 함수를 호출합니다.
from airflow.operators.python import PythonOperator
BranchPythonOperator
: 분기(branch) 로직을 수행해 다음 실행 경로를 결정합니다.
from airflow.operators.python import BranchPythonOperator
2. 스케줄링, 감시용 센서
TimeSensor, DateSensor
: 특정 시간, 날짜 도달을 기다립니다.
ExternalTaskSensor
: 다른 DAG/task의 완료를 기다립니다.
FileSensor, HttpSensor
:파일 존재/HTTP 응답을 폴링(polling) 방식으로 감시합니다.
3. 데이터베이스 연동
PostgresOperator
: PostgreSQL 에 SQL 쿼리를 실행합니다.
from airflow.providers.postgres.operators.postgres import PostgresOperator
MySqlOperator, MssqlOperator, SqliteOperator
: 각각 MySQL, MSSQL, SQLite 용 입니다.
4. 클라우드 서비스 연동
AWS
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
Google Cloud
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
5. 컨테이너·클러스터 연동
- DockerOperator
로컬/원격 Docker 컨테이너를 실행합니다. - KubernetesPodOperator
Kubernetes 클러스터에 Pod 를 스핀업합니다. - SparkSubmitOperator, DatabricksSubmitRunOperator
Spark/Databricks 작업 실행용
6. 이메일·알림
- EmailOperator
이메일 전송 - SlackAPIOperator, PagerDutyOperator
Slack, PagerDuty 등 외부 알림
7. 기타 유용한 Operator
- SqlSensor: SQL 쿼리 결과를 기다리는 센서
- SubDagOperator: 내부에 서브DAG을 호출
- TriggerDagRunOperator: 다른 DAG를 트리거(trigger)
- DummyOperator/EmptyOperator: 워크플로우 논리만 제어
- LatestOnlyOperator: 최신 실행만 수행
Operator 사용 예시
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
with DAG(
dag_id='multi_operator_example',
start_date=datetime(2025,6,27),
schedule_interval='@daily',
default_args={'retries': 1, 'retry_delay': timedelta(minutes=5)},
catchup=False
) as dag:
t1 = BashOperator(
task_id='run_shell',
bash_command='echo "Start!"'
)
t2 = PostgresOperator(
task_id='run_query',
postgres_conn_id='my_postgres',
sql='SELECT COUNT(*) FROM users;'
)
t3 = EmailOperator(
task_id='send_report',
to='team@example.com',
subject='User Count',
html_content="{{ ti.xcom_pull(task_ids='run_query') }}"
)
t1 >> t2 >> t3

왼쪽 젤위 버튼을 활성화 해주면(=unpause) 스케줄러가 실행됨

스케줄러가 실행되면 아래에 초록색 막대바가 생성된다
긴 막대바를 클릭하면 hello_world 의 전체 태스크를 확인할 수 있고

그 아래 작은 네모들은
hello_world 안의 각각의 태스트의 결과를 확인할 수 있다.

반응형
'서버&백엔드 > 🐧 Linux' 카테고리의 다른 글
| virtual box 용량늘리기 (0) | 2025.10.13 |
|---|---|
| Apache Airflow 사용법, 순차실행 (1) | 2025.06.30 |
| Linux | 도커 용량문제 (0) | 2025.05.29 |
| Ubuntu | 특정사용자 sudo 비밀번호 요구 예외거는법 (0) | 2025.05.15 |
| 리눅스 우분투 설치시 필수사항 (0) | 2025.03.25 |