Apache Airflow
Apache Airflow 설치 방법
What is Airflow?
a platform created by the community to programmatically author, schedule and monitor workflows.
Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우 오픈 소스 플랫폼입니다.
DAG(Directed Acyclic Graph) 기반으로 데이터 파이프라인을 정의하고, 스케줄링, 모니터링, 관리할 수 있습니다. 사용자는 Python 코드를 이용해 파이프라인을 정의하며, 워크플로우의 각 단계는 작업(task)으로 표현됩니다.
파이프라인 내의 작업들 간의 의존성을 DAG로 시각화하고, Airflow UI를 통해 실시간으로 작업의 상태를 확인하거나 재시도, 중지 등의 관리를 할 수 있습니다.
Why Airflow?
그렇다면 Airflow를 장점이 무엇인지 왜 사용하는지 설명하겠습니다.
airflow는 크게 아래의 특징을 가지고 있습니다.
- 확장성: Airflow는 수백에서 수천 개의 작업을 포함하는 복잡한 파이프라인도 효율적으로 처리할 수 있습니다. DAG로 작업의 흐름을 정의하고, 병렬 처리나 의존성 관리를 쉽게 할 수 있습니다.
- 유연성: 작업을 Python 코드로 정의하기 때문에 매우 유연한 로직을 구현할 수 있으며, 다양한 시스템이나 API와 쉽게 통합됩니다.
- 모니터링과 에러 처리: Airflow는 웹 UI를 통해 작업의 진행 상태를 모니터링하고, 실패 시 재시도, 경고 알림 등의 기능을 제공합니다.
- 스케줄링: 반복적인 작업(예: 매일 데이터 처리)도 간단하게 설정할 수 있으며, 특정 시간대 또는 조건부로 작업을 실행할 수 있습니다.
특히 Airflow는 python 코드로 작성하여 보다 유연하고 세밀한 조정이 가능합니다.
Airflow vs Apache NiFi
NiFi는 데이터 흐름(ingestion)을 중심으로 데이터 이동을 시각적으로 정의하는 도구로, GUI 기반의 사용이 가능하고, 실시간 데이터 스트리밍 처리가 강점입니다.
Airflow는 주로 배치(batch) 처리에 중점을 두며, Python 코드를 사용해 복잡한 데이터 파이프라인을 관리합니다.
Airflow는 데이터 오케스트레이션 도구로, ETL 파이프라인뿐만 아니라, 다양한 종류의 파이프라인(데이터 처리, 모델 훈련, 시스템 관리)을 관리할 수 있는 것이 특징입니다.
다른 전통적인 ETL 도구들(예: Informatica, Talend)과 비교했을 때, Airflow는 매우 유연하고 코드 중심적이라는 점에서 큰 차이가 있습니다. 전통적인 ETL 도구들은 GUI 기반의 워크플로우 정의를 지원하지만, Airflow는 Python을 통해 더 복잡하고 유연한 파이프라인 로직을 정의할 수 있습니다.
Start Airflow
Airflow를 설치하는 과정을 설명합니다.
install activate poetry를 사용하여 가상환경을 활성화 하고 Airflow를 설치합니다.
1 2
poetry init poetry add airflow
init database Airflow에서 사용하는 기본 스키마를 생성하며 사용자를 추가합니다. 이 과정까지 진행한다면 ~/airflow 하위에
airflow.cfg
파일 등이 생깁니다.1 2 3 4 5 6 7 8 9
airflow db init airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email [admin@example.com](mailto:admin@example.com) \ --password admin
create sample dags dags 파일을 하나 만들어 테스트를 진행합니다.
[test.py]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( 'example_dag', default_args={'retries': 1}, description='A simple tutorial DAG', schedule_interval='@daily', start_date=datetime(2023, 10, 1), catchup=False, ) as dag: t1 = BashOperator( task_id='print_date', bash_command='date', ) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', ) t1 >> t2
위의 코드에서는 bashOperator를 사용하였으나 pythonOperator도 사용가능합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
import os import sys from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator # TriggerDagRunOperator 추가 from datetime import datetime from test1 import test_function1 from test2 import test_function2 from _scproxy import _get_proxy_settings # mac용 설정 _get_proxy_settings() os.environ["NO_PROXY"] = "*" with DAG( "python_example_dag", default_args={"retries": 1}, schedule_interval=None, start_date=datetime(2023, 10, 1), catchup=False, ) as dag: task1 = PythonOperator( task_id="test_function_task", python_callable=test_function1, ) task2 = TriggerDagRunOperator( task_id="test_function2_task", python_callable=test_function2, ) task1 >> task2
start Airflow airflow webserver와 scheduler를 실행합니다.
1 2
airflow webserver --port 8080 airflow scheduler
web gui login & check 2번에서 생성했던 username과 password로 로그인하면 등록한
example_dag
를 확인할 수 있습니다. 여기서 해당 dag를 실행하면 해당 작업이 queue에 들어가게 됩니다.그리고 나서 job이 수행되고
아래와 같이 종료됩니다.
update database connection
Airflow에서 기본으로 내장되어있는 sqllite를 사용합니다. 따로 Airflow가 관리하는 DB를 따로 관리하고자 한다면 아래와 같이
airflow.cfg
파일을 수정합니다. (airflow.cfg 파일은 ~/airflow/airflow.cfg에 위치합니다.)1 2 3
[core] sql_alchemy_conn = postgresql+psycopg2://airflow_user:testtest!@localhost/airflow_db sql_alchemy_schema = airflow_schema # 스키마를 따로 지정하지않는다면 postgresql 기준 public을 사용함.
update dags folder dag들의 기본 경로를 변경도 가능합니다.
1 2 3
[core] # dags_folder = /Users/yoodongjin/airflow/dags dags_folder = /Users/yoodongjin/Desktop/dev/project/open-datify-scrapper/dags
create database & user 6번에서 정의한 connection 정보와 동일하게 database와 user를 생성하고 권한을 부여합니다.
1 2 3
CREATE DATABASE airflow_db; CREATE USER airflow_user WITH PASSWORD 'airflow_user'; GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
6번, 8번을 수행하였다면 2번 과정인
init database
를 다시 수행하여 airflow에서 관리하는 table을 생성합니다.
logs는
~/airflow/logs
경로에서 확인 가능합니다.