خانه SQL Server چگونه روی کوئری SQL با استفاده از Apache Airflow عمل Backfilling را انجام دهیم؟ SQL Server مهندسی داده مسیر مهندسی داده نوشته شده توسط: تیم فنی نیک آموز تاریخ انتشار: ۲۴ آذر ۱۴۰۰ آخرین بروزرسانی: ۲۱ مهر ۱۴۰۱ زمان مطالعه: 20 دقیقه ۳ (۱) مقدمه (Backfilling) به هر فرآیندی اطلاق می شود که شامل اصلاح یا اضافه کردن داده های جدید به رکوردهای موجود در یک مجموعه داده باشد. این یک مورد رایج در مهندسی داده است. مثال های زیر نمونه هایی از Backfilling می تواند باشد: ممکن است نیاز باشد که یک تغییر در تعدادی از منطق های کسب و کار، روی یک مجموعه داده از قبل پردازش شده اعمال شود. ممکن است متوجه شوید که در منطق پردازش شما خطایی وجود دارد و می خواهید داده های قبلاً پردازش شده را دوباره پردازش کنید. ممکن است بخواهید یک ستون اضافه کنید و آن را با یک مقدار معین در مجموعه داده موجود پر کنید. اکثر فریمورک های هماهنگ ساز ETL از Backfilling پشتیبانی می کنند. اگر سوالات زیر ذهن شما را به خود مشغول داشته است، چگونه می توانم کوئری های SQL خود را به گونهای تغییر دهم که امکان Backfillingبا استفاده از Airflow فراهم گردد؟ چگونه می توانم Execution_date خود را با استفاده از ماکروهای Airflow دستکاری کنم؟ باید گفت این مقاله دقیقا مناسب شماست. تصویر زیر فرایند Backfilling را نشان می دهد. نصب ما یک مثال ساده را با استفاده از Apache Airflow اجرا خواهیم کرد و خواهیم دید که چگونه می توانیم یک بکفیل روی یک مجموعه داده از قبل پردازش شده اجرا کنیم. پیشنیازها Docker docker-compose pgcli یک پوشه پروژه ایجاد و وارد آن شوید. mkdir airflow_backfill && cd airflow_backfill فایلی به نام docker-compose-LocalExecutor.yml با محتوای زیر از Puckel airflow repo دریافت کنید. version: "3.7" services: postgres: image: postgres:9.6 environment: - POSTGRES_USER=airflow - POSTGRES_PASSWORD=airflow - POSTGRES_DB=airflow logging: options: max-size: 10m max-file: "3" ports: - "۵۴۳۲:۵۴۳۲" webserver: image: puckel/docker-airflow:1.10.9 restart: always depends_on: - postgres environment: - LOAD_EX=n - EXECUTOR=Local - AIRFLOW_CONN_POSTGRES_DEFAULT=postgres://airflow:airflow@postgres:5432/airflow logging: options: max-size: 10m max-file: "3" volumes: - ./dags:/usr/local/airflow/dags # - ./plugins:/usr/local/airflow/plugins ports: - "۸۰۸۰:۸۰۸۰" command: webserver healthcheck: test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"] interval: 30s timeout: 30s retries: 3 هنگامی که این مورد را دارید، می توانید استفاده از سرویس های Airflow را به صورت محلی راه اندازی کنید. docker-compose -f docker-compose-LocalExecutor.yml up –d چند ثانیه صبر کنید تا یک سرویس Airflow به صورت محلی اجرا شود. از Airflow’s postgres DB برای ایجاد یک مجموعه داده نمونه استفاده می کنیم. pgcli -h localhost -p 5432 -U airflow -d airflow # the password is also airflow چند نمونه، جدول و داده ایجاد می کنیم. SCHEMA sample; DROP TABLE IF EXISTS sample.input_data; CREATE TABLE sample.input_data ( id SERIAL PRIMARY KEY, input_text VARCHAR(10), datetime_created TIMESTAMP ); DROP TABLE IF EXISTS sample.output_data; CREATE TABLE sample.output_data ( id int UNIQUE, event_id VARCHAR(40), input_text VARCHAR(10), processed_text VARCHAR(50), datetime_created TIMESTAMP, datetime_inserted TIMESTAMP ); INSERT INTO sample.input_data(input_text, datetime_created) VALUES ('hello 00', '2021-01-01 00:00:00'), ('hello 00_2', '2021-01-01 00:10:00'), ('hello 01', '2021-01-01 01:00:00'), ('hello 01_2', '2021-01-01 01:10:00'), ('hello 02', '2021-01-01 02:00:00'), ('hello 02_2', '2021-01-01 02:10:00'), ('hello 03', '2021-01-01 03:00:00'), ('hello 03_2', '2021-01-01 03:10:00'), ('hello 04', '2021-01-01 04:00:00'), ('hello 04_2', '2021-01-01 04:10:00'), ('hello 05', '2021-01-01 05:00:00'), ('hello 05_2', '2021-01-01 05:10:00'), ('hello 06', '2021-01-01 06:00:00'), ('hello 06_2', '2021-01-01 06:10:00'), ('hello 07', '2021-01-01 07:00:00'), ('hello 07_2', '2021-01-01 07:10:00'), ('hello 08', '2021-01-01 08:00:00'), ('hello 08_2', '2021-01-01 08:10:00'), ('hello 09', '2021-01-01 09:00:00'), ('hello 09_2', '2021-01-01 09:10:00'), ('hello 10', '2021-01-01 10:00:00'), ('hello 10_2', '2021-01-01 10:10:00'), ('hello 11', '2021-01-01 11:00:00'), ('hello 11_2', '2021-01-01 11:10:00'), ('hello 12', '2021-01-01 12:00:00'), ('hello 12_2', '2021-01-01 12:10:00'), ('hello 13', '2021-01-01 13:00:00'), ('hello 13_2', '2021-01-01 13:10:00'), ('hello 14', '2021-01-01 14:00:00'), ('hello 14_2', '2021-01-01 14:10:00'); Apache Airflow در Apache Airflow می توانید روز شروع یک DAG و برنامه زمانی که می خواهید با آن اجرا شود را مشخص کنید. اجرا برای یک بازه زمانی (بر اساس برنامه انتخاب شده) پس از سپری شدن آن بازه زمانی آغاز می شود. Execution_date یک شی Pendulum است که روی زمان شروع برنامه ریزی شده تنظیم می شود تا بازهای که اجرای فعلی قرار است انجام شود را پوشش دهد. به عنوان مثال، در تصویر زیر می بینید که یک DAG تنظیم شده است تا هر ساعت اجرا شود. از ۰۰-۰۱-۲۰۲۱ شروع می شود. (backfill) اکنون که می دانیم Execution_date چیست، می توانیم از آن برای backfill داده های پردازش شده قبلی استفاده کنیم. فرض می کنیم که یک Airflow DAG داریم که برای اجرا در هر ساعت تنظیم شده است، از ۰۱-۰۱-۲۰۲۰ UTC آغاز به کار میکند، یک مقدار ورودی میگیرد و یک خروجی تولید میکند. یک فایل به نام Sample_dag.py در دایرکتوری فعلی در پوشه dags ایجاد می کنیم. from datetime import datetime, timedelta from airflow import DAG from airflow.operators.postgres_operator import PostgresOperator data_proc_script = """ INSERT INTO sample.output_data ( event_id, id, input_text, processed_text, datetime_created, datetime_inserted ) SELECT '{{ macros.uuid.uuid4() }}' as event_id, id, input_text, CONCAT(input_text, ' World') as processed_text, datetime_created, now() as datetime_inserted from sample.input_data WHERE datetime_created::DATE = '{{ ds }}' AND EXTRACT( HOUR from datetime_created ) = {{ execution_date.hour }} ON CONFLICT (id) DO UPDATE SET event_id = EXCLUDED.event_id, id = EXCLUDED.id, input_text = EXCLUDED.input_text, processed_text = EXCLUDED.processed_text, datetime_created = EXCLUDED.datetime_created, datetime_inserted = EXCLUDED.datetime_inserted; """ default_args = { "owner": "startDataEngineering", "depends_on_past": True, "wait_for_downstream": True, "start_date": datetime(2021, 1, 1), "email": ["sde@startdataengineering.com"], "email_on_failure": False, "email_on_retry": False, "retries": 0, } dag = DAG( "sample_dag", default_args=default_args, schedule_interval="0 * * * *", max_active_runs=1, ) process_data = PostgresOperator( dag=dag, task_id="process_data", sql=data_proc_script, postgres_conn_id="postgres_default", depends_on_past=True, wait_for_downstream=True, ) process_data این یک DAG ساده با یک تسک برای خواندن داده ها از یک جدول، اضافه کردن متن به آن و درج آن در جدول دیگر است. DAG را اجرا می کنیم و به Airflow حدود ۵ دقیقه فرصت دهیم تا DAG بارگذاری شود و سپس به آدرس زیر بروید و DAG را روشن کنید. http://localhost:8080/admin/airflow/tree?dag_id=sample_dag از آنجایی که یک روز پس از ۰۱-۰۱-۲۰۲۰ اجرا می شود و ما catchup را روی true تنظیم کرده ایم، DAG های ما شروع به اجرا خواهند کرد تا به تاریخ فعلی برسند. پس از چند دقیقه که DAG اجرا شد می توانیم اسکریپت SQL مورد استفاده در DAG را بررسی کنیم. INSERT INTO sample.output_data ( event_id, id, input_text, processed_text, datetime_created, datetime_inserted ) SELECT '{{ macros.uuid.uuid4() }}' as event_id, id, input_text, CONCAT(input_text, ' World') as processed_text, datetime_created, now() as datetime_inserted from sample.input_data WHERE datetime_created::DATE = '{{ ds }}' AND EXTRACT( HOUR from datetime_created ) = {{ execution_date.hour }} ON CONFLICT (id) DO UPDATE SET event_id = EXCLUDED.event_id, id = EXCLUDED.id, input_text = EXCLUDED.input_text, processed_text = EXCLUDED.processed_text, datetime_created = EXCLUDED.datetime_created, datetime_inserted = EXCLUDED.datetime_inserted; این اسکریپت SQL داده ها را از input_data که بر اساس تاریخ و زمان فیلتر شده است، می گیرد و سپس آن را در جدول output_data قرار می دهد. برخی از نکات مهم و قابل توجه عبارتند از: {{ macros.uuid.uuid4 () }} : uuid دسترسی به ماژول استاندارد پایتون UUID را برای ما فراهم می کند. ما می توانیم از هر یک از امکانات ماژول UUID در اینجا استفاده کنیم. در اینجا، ما از آن برای تولید شناسه منحصر به فرد برای هر ردیف در داده های خروجی خود استفاده نموده ایم. {{ ds }} : فرمت تاریخ را به صورت YYYY-MM-DD تنظیم می کند. {{ execution_date.hour }} : از آنجایی که execution_date یک آبجکت زمان تاریخ Pendulum است، می توانیم از هر یک از توابع Pendulum استفاده کنیم. hour یکی از آن توابع است که ساعت را به صورت عددی بین ۰ تا ۲۳ ارائه میکند. ON CONFLICT (id) DO UPDATE : از این مورد برای تضمین منحصر به فرد بودن رکوردهای خروجی استفاده می شود. این یک ویژگی postgres است که به ما امکان میدهد کوئریهای بهروزرسانی یا درج را بر اساس یک شناسه یکتا (از id در اینجا استفاده شده) بنویسیم. در این کد، اگر یک ردیف مربوط به یک شناسه داده شده در output_data وجود داشته باشد، عمل به روز رسانی انجام می شود، در غیر این صورت یک رکورد جدید در جدول sample.output_data درج می شود. به عنوان یک مثال، در نظر بگیرید DAG ما تا الان چند بار اجرا شده است. فرض کنید می خواهیم متن پردازششده را تغییر دهیم تا متن World, Good day را به جای کلمه World اضافه کنیم، از ساعت ۱۰ صبح UTC به تاریخ ۰۱-۰۱-۲۰۲۰ شروع شود و در ساعت ۱۳ (۱۳:۰۰) UTC به پایان برسد. برای این کار، ابتدا DAG در حال اجرا را متوقف میکنیم، World را به World, Good day در sample_dag.py را تغییر می دهیم و سپس دستورات زیر را اجرا می کنیم. docker exec -it airflow_backfill_webserver_1 /entrypoint.sh bash # sh into your docker container airflow backfill -s 2021-01-01T10:00:00+00:00 -e 2021-01-01T13:00:00+00:00 --reset_dagruns sample_dag # enter yes when prompted پس از تکمیل backfill می توانید متن را به World تغییر دهید و DAG خود را دوباره شروع کنید. می توانید از pgcli برای ورود به postgres db استفاده کنید و با اجرای کوئری زیر، مجموعه داده های پر شده را مشاهده کنید. Select * from sample.output_data تصویر زیر ورژن مجموعه داده قبل و بعد از بکفیل را نشان می دهد. با اجرای کد زیر میتوانید نمونه ایجاد شده Airflow را down کنید.docker-compose -f docker-compose-LocalExecutor.yml down نتیجه گیری امیدواریم این مقاله به شما دید خوبی درباره نحوه استفاده ازexecution_date در Airflow برای بکفیل اسکریپت SQL و همچنین نحوه استفاده از Airflow Macros برای آوردن قابلیت های pythonic در اسکریپت SQL به شما داده باشد. همچنین توجه داشته باشید که فرایند وارد کردن رکورد را طوری پیکربندی کرده ایم که یک UPSERT باشد یعنی در صورت وجود رکورد با شناسه منحصر به فرد عمل به روز رسانی را انجام دهد و در غیر این صورت عمل درج رکورد جدید را انجام دهد. این عمل از درج ردیف های تکراری در خروجی جلوگیری می کند. دفعه بعد که در حال نوشتن خط لوله ETL هستید، در نظر بگیرید که در صورت نیاز به انجام یک بکفیل، حتما از UPSERT به جای INSERT استفاده کنید. منابع https://www.startdataengineering.com/post/how-to-backfill-sql-query-using-apache-airflow/ چه رتبه ای میدهید؟ میانگین ۳ / ۵. از مجموع ۱ اولین نفر باش معرفی نویسنده مقالات 402 مقاله توسط این نویسنده محصولات 0 دوره توسط این نویسنده تیم فنی نیک آموز معرفی محصول مجتبی بنائی دوره آموزش مهندسی داده [Data Engineering] 2.380.000 تومان مقالات مرتبط ۰۲ آبان SQL Server ابزار Database Engine Tuning Advisor؛ مزایا، کاربردها و روش استفاده تیم فنی نیک آموز ۱۵ مهر SQL Server معرفی Performance Monitor ابزار مانیتورینگ SQL Server تیم فنی نیک آموز ۱۱ مهر SQL Server راهنمای جامع مانیتورینگ بکاپ ها در SQL Server تیم فنی نیک آموز ۰۸ مهر SQL Server Resource Governor چیست؟ آشنایی با نحوه پیکربندی و اهمیت های آن تیم فنی نیک آموز دیدگاه کاربران لغو پاسخ دیدگاه نام و نام خانوادگی ایمیل ذخیره نام، ایمیل و وبسایت من در مرورگر برای زمانی که دوباره دیدگاهی مینویسم. موبایل برای اطلاع از پاسخ لطفاً مرا با خبر کن ثبت دیدگاه Δ