목차
안녕하세요 윤도현입니다. 이번 글에서는 Airflow의 기본 개념에 대해 정리하였습니다.
관련글
[2] [MLOps] Airflow 2 - 환경세팅(with docker)
1. Airflow란?
- Apache Airflow는 초기 Airbnb 엔지니어링 팀에서 개발한 워크플로우 오픈소스 플랫폼입니다.
(워크플로우란?: 의존성으로 연결된 작업(task)들의 집합) - 프로그래밍 방식으로 워크플로우를 작성하여 예약 및 모니터링을 손쉽게 만들어줍니다.
2. Airflow 기본 구성 및 동작원리
2.1 Airflow key concept
a. DAG(Directed Acyclic Graph)
- 워크플로우 전체 구조(작업 흐름, 순서 정의)를 나타냅니다.
- 영어단어 뜻은 "방향성을 갖는 순환하지 않는 그래프"이고, DAG(대그)라고 부릅니다.
- 순차적으로 작업(task)를 실행하며, 순환 실행을 허용하지 않습니다.
- 만약 개발자가 실수로 Task A → Task B → Task C → 다시 Task A처럼 작업들을 연결하면, 이건 **사이클(cycle)**이 생긴 거고, 논리적 오류(DAG의 정의를 위반)가 발생하게 됩니다.
- 이러한 논리적 오류는 교착상태(deadlock)을 야기합니다.
❗Airflow에서 순환 실행이 문제가 되는이유?
1. 무한 루프(Deadlock 혹은 Infinite Loop) 발생 가능성
Airflow는 Task A를 실행하려면 Task C가 끝나야 하고, Task C는 Task B가 끝나야 하고, Task B는 다시 Task A가 끝나야 됨. 그럼… 아무것도 시작할 수 없게 됨. 무한 대기 상태 = Deadlock
2. 실행 순서를 판단할 수 없음
DAG는 작업이 어떤 순서로 실행되어야 하는지를 정의하는 건데, 사이클이 생기면 실행 순서를 정확히 정할 수 없음.
3. Airflow 자체에서 DAG을 파싱하지 못하고 에러를 냄
실행 전에 DAG 파일을 로딩하면서 구조 분석을 하는데, 그때 순환이 발견되면 아예 DAG가 실행조차 되지 않음.
b. Task & Task Instance
- Task
- DAG 안의 하나의 노드(작업 단위)를 나타냅니다.
- 이 작업은 Operator를 기반으로 정의됩니다.
- Task는 사실상 operator의 인스턴스라고 할 수 있습니다.
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id='say_hello',
bash_command='echo Hello',
dag=dag
)
이 코드에서 task가 바로 DAG의 하나의 Task가 됩니다.
- Task Instance
- 특정 DAG(전체 워크플로우) 실행중에 하나의 Task가 실제로 수행된 기록을 말합니다.
- 즉, Task + 실행시간(execution_date or logical_date)이 결합된 게 Task Instance입니다.
- 예를들어, 아래 세 가지 정보가 결합된 것이 하나의 Task Instance가 됩니다.
- DAG 이름: daily_report
- Task 이름: fetch_data
- 실행일: 2025-03-25
c. Operator
- 앞서 설명한 Task가 실제로 어떤 작업을 할지 정의하는 클래스 or 함수입니다.
- DAG가 전체 워크플로우의 흐름을 나타내는 설계도라면, Operator는 그 흐름 안에서 각각의 구체적인 작업을 수행하는 실행 단위를 나타냅니다.
- Operator Type는 크게 세 종류로 나뉩니다.
- Action Operators
- 기능이나 명령 등의 실행에 중점을 둔 오퍼레이터
- 실제 연산을 수행, 데이터 추출 및 프로세싱
- (참고) 내장 Operators는 BashOperator, PythonOperator, EmailOperator
- 이외의 오퍼레이터는 공식 document 참고 (링크)
- Transfer Operaters
- 하나의 시스템에서 다른 시스템으로 데이터를 이동시키는데 중점을 둔 오퍼레이터
- 예를 들어, ETL에서 T(Transform)와 L(Load) 사이의 전송 역할
- Sensor Operators
- 특정 조건이 만족될 때까지 대기(wait)하는 Operator
- 예를 들어, 특정 경로에 파일이 나타날 때까지 대기했다가 파일이 나타나면 실행
- Action Operators
2.2 Airflow component
- 에어플로우는 웹서버, 스케쥴러, Executor, Worker, Metastore 크게 5개로 구성됩니다.
- 웹서버: 웹 대시보드 UI로 스케쥴러에서 분석한 DAG를 시각화하고, DAG 실행 및 결과를 확인할 수 있는 인터페이스를 제공
- 스케쥴러: DAG 파일을 주기적으로 스캔하면서 실행 시간이 도래한 DAG에 대해 Task를 실행 큐에 올리는 역할
- Excutor: 작업이 어떻게 실행되는지 정의
- Worker: Excutor로부터 전달받은 작업을 실제로 실행
- Metastore: 에어플로우에 있는 DAG, Task등 메타데이터 관리
- # 스케쥴러 역할
- DAG 스캔
- 설정된 주기마다 DAG 폴더를 스캔해서 DAG를 파싱하고 등록
- DAG 파일 안에 포함된 Task들, 그 순서, 실행 조건등을 이해
- 스케쥴 확인
- DAG마다 schedule_interval이 있음 (@daily, */5 * * * * 등)
- 현재 시점과 비교해서, **“실행 시점이 도래했는가?”**를 판단함
- 실행이 필요하면 Task Instance 생성
- Task 상태 추적 및 의존성 계산
- 이전 Task가 성공/실패 했는지, 어떤 Task가 다음에 실행되야 하는지 계산
- 의존성 충족 여부 확인 후 Task Instance를 큐잉(queuing)
- Excutor에 작업 위임
- 실행이 가능하다고 판단된 Task Instance들을 Excutor에 전달
- Excutor는 이를 Worker에게 전달해서 실제로 실행
- DAG 스캔
2.3 Airflow 동작 원리
- DAG 파일 작성
- 유저가 python 코드로 DAG 파일 작성(DAG 폴더 안에 배치)
- Operator로 Task 정의
- schedule_interval, start_date, dependencies 등 설정
- DAG 로딩 및 파싱
- 스케쥴러와 웹서버는 주기적으로 DAG 폴더를 스캔
- DAG 파일들을 읽고 DAG Object로 파싱
- DAG 구조, Task 간의 관계, 스케쥴 정보 등을 Metastore(DB)에 저장
- 스케쥴링 조건 판단
- 스케쥴러가 DAG마다 schedule_interval, start_date, catchup 등을 기준으로 "지금 DAG를 실행해야 하는 시점인가?"를 판단함
- 실행 조건이 만족되면 해당 DAG 실행을 예약
- Task Instance 생성 및 상태 확인
- 스케쥴러는 DAG의 각 Task에 대해 Task Instance 생성
- 이전 Task의 상태나 의존성(set_upstream, set_downstream, TriggerRule 등)을 확인
- 실행 준비가 된 Task Instance는 Executor에 전달
- Executor가 Task를 실행 큐에 전달
- Executor는 Task Instance를 큐(대기열)에 전달
- Worker가 큐(대기열)에서 Task를 가져와서 실행
- Worker는 큐(대기열)를 감시하다가 실행 가능한 Task Instance를 꺼내서 실행
- Operator의 execute() 매서드가 실행됨
- 결과 저장 및 상태 업데이트
- 실행 결과(성공/실패/재시도)를 Metastore(DB)에 저장
- 웹서버를 통해 사용자에게 실행 결과 시각화
- 웹서버(사용자 UI)에서 DAG 모니터링 및 제어
- 사용자는 Web UI를 통해 DAG 상태 확인, 실행 로그 조회, 수동 실행, 실패 Task 재실행 등 가능
- DAG Pause/Resume, SLA 모니터링 등도 UI에서 제어
3. Airflow 장단점
3.1 장점
항목 | 설명 |
Web UI 제공 | DAG 실행 모니터링, 로그 확인, Task 상태 시각화 등 운영에 최적화된 시각화 인터페이스 제공 |
뛰어난 확장성 | 다양한 시스템과 연동 가능 (예: AWS, GCP, MySQL, Hadoop 등), Custom Operator & Sensor 쉽게 추가 가능 |
재실행 및 의존성 제어 | Task 실패 시 특정 Task만 재실행 가능, 의존성 기반 실행 흐름 제어 (upstream, downstream, TriggerRule) |
스케쥴링 기능 | DAG 단위의 정기 실행, Backfill, Catchup 등 풍부한 스케줄링 옵션 제공 |
가장 큰 오픈소스 커뮤니티 | 에어플로우는 MLOps 프레임워크중 가장 큰 커뮤니티를 보유함 |
3.2 단점
항목 | 설명 |
실시간 처리 부적합 | 스케쥴러 기반 실행방식이기 때문에 실시간 스트리밍이나 초저지연 처리에는 적합하지 않음 (주로 배치 기반) |
복잡한 DAG의 유지보수 어려움 | 동적 DAG, 분기(branch), 많은 의존성을 가진 DAG은 디버깅과 테스트가 어려워질 수 있음 |
설정이 다소 복잡함 | DB, 로그 저장소, Executor, Queue, Worker, Scheduler 등 많은 구성요소가 필요하고 설정 복잡도 있음 |
Task 병렬성 제한됨 | 기본 설정값으로는 동시 실행 제한 (concurrency, max_active_runs)에 걸릴 수 있음 |
Task간 데이터 전달 기능의 비효율성 | Task 간 데이터 전달 기능(XCom)이 있지만, 대용량/빈번한 사용에는 성능 및 구조상 불리함 |
Reference
[1] Apache Airflow 기반의 데이터 파이프라인, 바스 하렌슬락, 율리안 더라위터르(도서)
'기본기 > MLOps' 카테고리의 다른 글
[MLOps] Deepstream 3 - 실습(Multi-stream Pose Estimation) (0) | 2025.03.27 |
---|---|
[MLOps] DeepStream 2 - 환경세팅(with docker) (0) | 2025.03.26 |
[MLOps] Deepstream 1 - 기본개념 (0) | 2025.03.26 |
[MLOps] Airflow 3 - 실습(ML 모델 학습/실험 자동화) (0) | 2025.03.25 |
[MLOps] Airflow 2 - 환경세팅(with docker) (0) | 2025.03.25 |