چگونه روی کوئری SQL با استفاده از Apache Airflow عمل Backfilling را انجام دهیم؟

چگونه روی کوئری SQL با استفاده از Apache Airflow عمل Backfilling را انجام دهیم؟

نوشته شده توسط: تیم فنی نیک آموز
تاریخ انتشار: ۲۴ آذر ۱۴۰۰
آخرین بروزرسانی: ۲۱ مهر ۱۴۰۱
زمان مطالعه: 20 دقیقه
۳
(۱)

مقدمه

(Backfilling) به هر فرآیندی اطلاق می ‌شود که شامل اصلاح یا اضافه کردن داده ‌های جدید به رکوردهای موجود در یک مجموعه داده باشد. این یک مورد رایج در مهندسی داده است. مثال‌ های زیر نمونه ‌هایی از Backfilling می ‌تواند باشد:

  • ممکن است نیاز باشد که یک تغییر در تعدادی از منطق ‌های کسب و کار، روی یک مجموعه داده از قبل پردازش شده اعمال شود.
  • ممکن است متوجه شوید که در منطق پردازش شما خطایی وجود دارد و می‌ خواهید داده‌ های قبلاً پردازش شده را دوباره پردازش کنید.
  • ممکن است بخواهید یک ستون اضافه کنید و آن را با یک مقدار معین در مجموعه داده موجود پر کنید. اکثر فریم‌ورک‌ های هماهنگ‌ ساز ETL از Backfilling پشتیبانی می‌ کنند.

اگر سوالات زیر ذهن شما را به خود مشغول داشته است،

  • چگونه می‌ توانم کوئری ‌های SQL خود را به گونه‌ای تغییر دهم که امکان Backfillingبا استفاده از Airflow فراهم گردد؟
  • چگونه می ‌توانم Execution_date خود را با استفاده از ماکروهای Airflow دستکاری کنم؟

باید گفت این مقاله دقیقا مناسب شماست. تصویر زیر فرایند Backfilling را نشان می‌ دهد.

نصب

ما یک مثال ساده را با استفاده از Apache Airflow اجرا خواهیم کرد و خواهیم دید که چگونه می ‌توانیم یک بک‌فیل روی یک مجموعه داده از قبل پردازش شده اجرا کنیم.

پیش‌نیازها

  • Docker
  • docker-compose
  • pgcli

یک پوشه پروژه ایجاد و وارد آن شوید.

mkdir airflow_backfill && cd airflow_backfill

فایلی به نام docker-compose-LocalExecutor.yml با محتوای زیر از Puckel airflow repo دریافت کنید.

version: "3.7"
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    logging:
      options:
        max-size: 10m
        max-file: "3"
    ports:
      - "۵۴۳۲:۵۴۳۲"

  webserver:
    image: puckel/docker-airflow:1.10.9
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - AIRFLOW_CONN_POSTGRES_DEFAULT=postgres://airflow:airflow@postgres:5432/airflow
    logging:
      options:
        max-size: 10m
        max-file: "3"
    volumes:
      - ./dags:/usr/local/airflow/dags
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "۸۰۸۰:۸۰۸۰"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

هنگامی که این مورد را دارید، می ‌توانید استفاده از سرویس ‌های Airflow را به صورت محلی راه اندازی کنید.

docker-compose -f docker-compose-LocalExecutor.yml up –d

چند ثانیه صبر کنید تا یک سرویس Airflow به صورت محلی اجرا شود. از Airflow’s postgres DB برای ایجاد یک مجموعه داده نمونه استفاده می‌ کنیم.

pgcli -h localhost -p 5432 -U airflow -d airflow
# the password is also airflow

چند نمونه، جدول و داده ایجاد می‌ کنیم.

SCHEMA sample;
DROP TABLE IF EXISTS sample.input_data;
CREATE TABLE sample.input_data (
    id SERIAL PRIMARY KEY,
    input_text VARCHAR(10),
    datetime_created TIMESTAMP
);
DROP TABLE IF EXISTS sample.output_data;
CREATE TABLE sample.output_data (
    id int UNIQUE,
    event_id VARCHAR(40),
    input_text VARCHAR(10),
    processed_text VARCHAR(50),
    datetime_created TIMESTAMP,
    datetime_inserted TIMESTAMP
);
INSERT INTO sample.input_data(input_text, datetime_created)
VALUES ('hello 00', '2021-01-01 00:00:00'),
    ('hello 00_2', '2021-01-01 00:10:00'),
    ('hello 01', '2021-01-01 01:00:00'),
    ('hello 01_2', '2021-01-01 01:10:00'),
    ('hello 02', '2021-01-01 02:00:00'),
    ('hello 02_2', '2021-01-01 02:10:00'),
    ('hello 03', '2021-01-01 03:00:00'),
    ('hello 03_2', '2021-01-01 03:10:00'),
    ('hello 04', '2021-01-01 04:00:00'),
    ('hello 04_2', '2021-01-01 04:10:00'),
    ('hello 05', '2021-01-01 05:00:00'),
    ('hello 05_2', '2021-01-01 05:10:00'),
    ('hello 06', '2021-01-01 06:00:00'),
    ('hello 06_2', '2021-01-01 06:10:00'),
    ('hello 07', '2021-01-01 07:00:00'),
    ('hello 07_2', '2021-01-01 07:10:00'),
    ('hello 08', '2021-01-01 08:00:00'),
    ('hello 08_2', '2021-01-01 08:10:00'),
    ('hello 09', '2021-01-01 09:00:00'),
    ('hello 09_2', '2021-01-01 09:10:00'),
    ('hello 10', '2021-01-01 10:00:00'),
    ('hello 10_2', '2021-01-01 10:10:00'),
    ('hello 11', '2021-01-01 11:00:00'),
    ('hello 11_2', '2021-01-01 11:10:00'),
    ('hello 12', '2021-01-01 12:00:00'),
    ('hello 12_2', '2021-01-01 12:10:00'),
    ('hello 13', '2021-01-01 13:00:00'),
    ('hello 13_2', '2021-01-01 13:10:00'),
    ('hello 14', '2021-01-01 14:00:00'),
    ('hello 14_2', '2021-01-01 14:10:00');


Apache Airflow

در Apache Airflow می ‌توانید روز شروع یک DAG و برنامه زمانی که می ‌خواهید با آن اجرا شود را مشخص کنید. اجرا برای یک بازه زمانی (بر اساس برنامه انتخاب شده) پس از سپری شدن آن بازه زمانی آغاز می ‌شود. Execution_date یک شی Pendulum است که روی زمان شروع برنامه ‌ریزی شده تنظیم می ‌شود تا بازه‌ای که اجرای فعلی قرار است انجام شود را پوشش دهد.
به عنوان مثال، در تصویر زیر می ‌بینید که یک DAG تنظیم شده است تا هر ساعت اجرا شود. از ۰۰-۰۱-۲۰۲۱ شروع می‌ شود.

(backfill)

اکنون که می ‌دانیم Execution_date چیست، می‌ توانیم از آن برای backfill داده ‌های پردازش شده قبلی استفاده کنیم.
فرض می ‌کنیم که یک Airflow DAG داریم که برای اجرا در هر ساعت تنظیم شده است، از ۰۱-۰۱-۲۰۲۰ UTC آغاز به کار می‌کند، یک مقدار ورودی می‌گیرد و یک خروجی تولید می‌کند. یک فایل به نام Sample_dag.py در دایرکتوری فعلی در پوشه dags ایجاد می ‌کنیم.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
data_proc_script = """
INSERT INTO sample.output_data (
        event_id,
        id,
        input_text,
        processed_text,
        datetime_created,
        datetime_inserted
    )
SELECT '{{ macros.uuid.uuid4() }}' as event_id,
    id,
    input_text,
    CONCAT(input_text, ' World') as processed_text,
    datetime_created,
    now() as datetime_inserted
from sample.input_data
WHERE datetime_created::DATE = '{{ ds }}'
    AND EXTRACT(
        HOUR
        from datetime_created
    ) = {{ execution_date.hour }} ON CONFLICT (id) DO
UPDATE
SET event_id = EXCLUDED.event_id,
    id = EXCLUDED.id,
    input_text = EXCLUDED.input_text,
    processed_text = EXCLUDED.processed_text,
    datetime_created = EXCLUDED.datetime_created,
    datetime_inserted = EXCLUDED.datetime_inserted;
"""
default_args = {
    "owner": "startDataEngineering",
    "depends_on_past": True,
    "wait_for_downstream": True,
    "start_date": datetime(2021, 1, 1),
    "email": ["sde@startdataengineering.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}
dag = DAG(
    "sample_dag",
    default_args=default_args,
    schedule_interval="0 * * * *",
    max_active_runs=1,
)
process_data = PostgresOperator(
    dag=dag,
    task_id="process_data",
    sql=data_proc_script,
    postgres_conn_id="postgres_default",
    depends_on_past=True,
    wait_for_downstream=True,
)
process_data

این یک DAG ساده با یک تسک برای خواندن داده‌ ها از یک جدول، اضافه کردن متن به آن و درج آن در جدول دیگر است.
DAG را اجرا می‌ کنیم و به Airflow حدود ۵ دقیقه فرصت دهیم تا DAG بارگذاری شود و سپس به آدرس زیر بروید و DAG را روشن کنید.

http://localhost:8080/admin/airflow/tree?dag_id=sample_dag

از آنجایی که یک روز پس از ۰۱-۰۱-۲۰۲۰ اجرا می ‌شود و ما catchup را روی true تنظیم کرده ایم، DAG های ما شروع به اجرا خواهند کرد تا به تاریخ فعلی برسند.
پس از چند دقیقه که DAG اجرا شد می ‌توانیم اسکریپت SQL مورد استفاده در DAG را بررسی کنیم.

INSERT INTO sample.output_data (
        event_id,
        id,
        input_text,
        processed_text,
        datetime_created,
        datetime_inserted
    )
SELECT '{{ macros.uuid.uuid4() }}' as event_id,
    id,
    input_text,
    CONCAT(input_text, ' World') as processed_text,
    datetime_created,
    now() as datetime_inserted
from sample.input_data
WHERE datetime_created::DATE = '{{ ds }}'
    AND EXTRACT(
        HOUR
        from datetime_created
    ) = {{ execution_date.hour }} ON CONFLICT (id) DO
UPDATE
SET event_id = EXCLUDED.event_id,
    id = EXCLUDED.id,
    input_text = EXCLUDED.input_text,
    processed_text = EXCLUDED.processed_text,
    datetime_created = EXCLUDED.datetime_created,
    datetime_inserted = EXCLUDED.datetime_inserted;

 

این اسکریپت SQL داده ‌ها را از input_data که بر اساس تاریخ و زمان فیلتر شده است، می‌ گیرد و سپس آن را در جدول output_data قرار می‌ دهد.

برخی از نکات مهم و قابل توجه عبارتند از:

  • {{ macros.uuid.uuid4 () }} : uuid دسترسی به ماژول استاندارد پایتون UUID را برای ما فراهم می‌ کند. ما می ‌توانیم از هر یک از امکانات ماژول UUID در اینجا استفاده کنیم. در اینجا، ما از آن برای تولید شناسه منحصر به فرد برای هر ردیف در داده‌ های خروجی خود استفاده نموده ایم.
  • {{ ds }} : فرمت تاریخ را به صورت YYYY-MM-DD تنظیم می ‌کند.
    {{ execution_date.hour }}
  • : از آنجایی که execution_date یک آبجکت زمان تاریخ Pendulum است، می‌ توانیم از هر یک از توابع Pendulum استفاده کنیم. hour یکی از آن توابع است که ساعت را به صورت عددی بین ۰ تا ۲۳ ارائه می‌کند.
    ON CONFLICT (id) DO UPDATE
  • : از این مورد برای تضمین منحصر به فرد بودن رکوردهای خروجی استفاده می‌ شود. این یک ویژگی postgres است که به ما امکان می‌دهد کوئری‌های به‌روزرسانی یا درج را بر اساس یک شناسه یکتا (از id در اینجا استفاده شده) بنویسیم. در این کد، اگر یک ردیف مربوط به یک شناسه داده شده در output_data وجود داشته باشد، عمل به روز رسانی انجام می ‌شود، در غیر این صورت یک رکورد جدید در جدول sample.output_data درج می‌ شود.

به عنوان یک مثال، در نظر بگیرید DAG ما تا الان چند بار اجرا شده است. فرض کنید می ‌خواهیم متن پردازش‌شده را تغییر دهیم تا متن World, Good day را به ‌جای کلمه World اضافه کنیم، از ساعت ۱۰ صبح UTC به تاریخ ۰۱-۰۱-۲۰۲۰ شروع شود و در ساعت ۱۳ (۱۳:۰۰) UTC به پایان برسد.

برای این کار، ابتدا DAG در حال اجرا را متوقف می‌کنیم، World را به World, Good day در sample_dag.py را تغییر می دهیم و سپس دستورات زیر را اجرا می ‌کنیم.

docker exec -it airflow_backfill_webserver_1 /entrypoint.sh bash # sh into your docker container
airflow backfill -s 2021-01-01T10:00:00+00:00 -e 2021-01-01T13:00:00+00:00 --reset_dagruns sample_dag 
# enter yes when prompted

پس از تکمیل backfill می ‌توانید متن را به World تغییر دهید و DAG خود را دوباره شروع کنید. می ‌توانید از pgcli برای ورود به postgres db استفاده کنید و با اجرای کوئری زیر، مجموعه داده ‌های پر شده را مشاهده کنید.
Select * from sample.output_data
تصویر زیر ورژن مجموعه داده قبل و بعد از بک‌فیل را نشان می‌ دهد.

 

با اجرای کد زیر می‌توانید نمونه ایجاد شده Airflow را down کنید.docker-compose -f docker-compose-LocalExecutor.yml down

نتیجه گیری

امیدواریم این مقاله به شما دید خوبی درباره نحوه استفاده ازexecution_date در Airflow برای بک‌فیل اسکریپت SQL و همچنین نحوه استفاده از Airflow Macros برای آوردن قابلیت ‌های pythonic در اسکریپت SQL به شما داده باشد.

همچنین توجه داشته باشید که فرایند وارد کردن رکورد را طوری پیکربندی کرده ‌ایم که یک UPSERT باشد یعنی در صورت وجود رکورد با شناسه منحصر به فرد عمل به روز رسانی را انجام دهد و در غیر این صورت عمل درج رکورد جدید را انجام دهد. این عمل از درج ردیف ‌های تکراری در خروجی جلوگیری می ‌کند. دفعه بعد که در حال نوشتن خط لوله ETL هستید، در نظر بگیرید که در صورت نیاز به انجام یک بک‌فیل، حتما از UPSERT به جای INSERT استفاده کنید.

 منابع

https://www.startdataengineering.com/post/how-to-backfill-sql-query-using-apache-airflow/

چه رتبه ای می‌دهید؟

میانگین ۳ / ۵. از مجموع ۱

اولین نفر باش

title sign
معرفی نویسنده
تیم فنی نیک آموز
مقالات
402 مقاله توسط این نویسنده
محصولات
0 دوره توسط این نویسنده
تیم فنی نیک آموز
title sign
دیدگاه کاربران