기본기/MLOps

[MLOps] Airflow 3 - 실습(ML 모델 학습/실험 자동화)

dohyeon2 2025. 3. 25. 16:45

목차

    안녕하세요 윤도현입니다. 이번 글에서는 Airflow와 IRIS 데이터셋을 이용하여

    데이터 수집 → 전처리 → 모델 학습 → 성능 저장까지 자동화하는 방법에 대해 정리하였습니다.

     

    관련글

    [1] [MLOps] Airflow 1 - 기본개념

    [2][MLOps] Airflow 2 - 환경세팅(with docker)

    [3][MLOps] Airflow 3 - 실습

     

     

     

    1. 초기 세팅

    1.1 Airflow docker-compose.yaml 파일이 있는 디렉토리에 필요한 디렉토리 생성

    mkdir -p ./data ./scripts
    • data/: 결과 저장용
    • scripts/: 전처리/학습 코드 넣는 폴더

    1.2 docker-compose.yaml 파일 내 볼륨 마운트 설정

    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./plugins:/opt/airflow/plugins
      - ./scripts:/opt/airflow/scripts
      - ./data:/opt/airflow/data
    # docker compose 재실행
    docker compose restart

    2. 기본 폴더 구조 설정

    airflow-docker/
    ├── dags/
    │   └── ml_pipeline.py   ← DAG 파일
    ├── scripts/
    │   ├── preprocess.py
    │   └── train.py

     

    3. 스크립트 작성(전처리, 학습, DAG 파일)

    3.1 전처리 스크립트(scripts/preprocess.py)

    # scripts/preprocess.py
    import pandas as pd
    from sklearn.datasets import load_iris
    
    def preprocess():
        iris = load_iris()
        df = pd.DataFrame(iris.data, columns=iris.feature_names)
        df["target"] = iris.target
        df.to_csv("/opt/airflow/data/iris.csv", index=False)
        print("Preprocessing completed.")

     

    3.2 모델 학습 스크립트(scripts/train.py)

    # scripts/train.py
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    
    def train():
        df = pd.read_csv("/opt/airflow/data/iris.csv")
        X = df.drop("target", axis=1)
        y = df["target"]
    
        model = RandomForestClassifier()
        model.fit(X, y)
        acc = accuracy_score(y, model.predict(X))
    
        with open("/opt/airflow/data/result.txt", "w") as f:
            f.write(f"Train Accuracy: {acc:.4f}")
        print("Training completed.")

     

    3.3 DAG 파일(dags/ml_pipeline.py)

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    import sys
    sys.path.append("/opt/airflow/scripts")  # 경로 등록
    
    from preprocess import preprocess
    from train import train
    
    with DAG(
        dag_id="ml_pipeline_dag",
        start_date=datetime(2023, 1, 1),
        schedule_interval="@daily",
        catchup=False,
        tags=["ml", "pipeline"]
    ) as dag:
    
        t1 = PythonOperator(
            task_id="preprocess_data",
            python_callable=preprocess
        )
    
        t2 = PythonOperator(
            task_id="train_model",
            python_callable=train
        )
    
        t1 >> t2  # 전처리 후 학습

     

    4. Dag 실행

    해당하는 Dag 선택
    Dag 실행

    5. 결과 확인(data/result.txt)

    학습 결과

     

    여기까지 Airflow를 이용하여 IRIS 데이터셋을 전처리하고 RandomForest 모델을 학습한 뒤 학습결과를 테스트하는 일련의 

    워크플로우를 자동화해보았습니다!!!