راه اندازی یک پروژه مهندسی داده برای تازه کاران

راه اندازی یک پروژه مهندسی داده برای تازه کاران

نوشته شده توسط: تیم فنی نیک آموز
تاریخ انتشار: ۱۹ بهمن ۱۴۰۰
آخرین بروزرسانی: 23 دی 1403
زمان مطالعه: 15 دقیقه
۰
(۰)

مقدمه

یک پروژه مهندسی داده واقعی معمولاً شامل تعداد زیادی کامپوننت است. راه‌اندازی یک پروژه مهندسی داده، مطابق با به روزترین معیارها می‌تواند کاری بسیار زمان‌بر باشد. اگر شما با چالش‌های زیر مواجه هستید:

  • یک تحلیلگر داده، دانشجو، دانشمند یا مهندس هستید که به دنبال کسب تجربه مهندسی داده هستید، اما قادر به یافتن یک پروژه خوب برای شروع نیستید.
  • می‌خواهید روی یک پروژه مهندسی داده کار کنید که یک پروژه واقعی را شبیه‌سازی می‌کند.
  • به دنبال یک پروژه مهندسی داده انتها به انتها می‌گردید.
  • به دنبال یک پروژه خوب برای کسب تجربه مهندسی داده برای مصاحبه شغلی هستید.

باید گفت این مقاله دقیقا مناسب شماست. در این مقاله، شما با موارد زیر آشنا می‌شوید.

  • راه‌اندازی Apache Airflow، AWS EMR، AWS Redshift، AWS Spectrum، AWS S3
  • یادگیری بهترین شیوه های راه‌اندازی خطوط پردازش داده.
  • یادگیری نحوه تشخیص نقاط خرابی در خطوط پردازش داده و چگونگی پیاده‌سازی سیستم‌های مقاوم در برابر خرابی.
  • یادگیری نحوه طراحی و ایجاد یک خط پردازش داده بر اساس نیازهای تجاری.

اهداف

فرض می‌کنیم که شما در یک شرکت شاغل هستید که روی تجزیه و تحلیل رفتار کاربران کار می‌کند. به این صورت که داده‌های کاربر را جمع‌آوری می‌کند و یک پروفایل برای هر کاربر ایجاد می‌کند. در نظر بگیرید شما به عنوان یک مهندس داده در این شرکت، تسکی به شما ارجاع شده که یک خط پردازش داده‌ای طراحی کنید تا جدول user_behavior_metric را پر نماید. جدول user_behavior_metric یک جدول OLAP است که برای استفاده توسط تحلیلگران، نرم‌افزار داشبورد و غیره طراحی خواهد شد. موارد زیر را در نظر بگیرید:

  • user_purchase: جدول OLTP با اطلاعات خرید کاربر.
  • csv: داده‌هایی که هر روز توسط یک تامین کننده داده خارجی ارسال می‌شود.

طراحی

ما از Airflow برای هماهنگ کردن تسک‌های زیر استفاده خواهیم کرد:

  • طبقه‌بندی بازبینی‌های فیلم با Apache Spark.
  • بارگذاری داده‌های طبقه‌بندی بازبینی‌های فیلم در انبار داده.
  • برکهاج داده‌های خرید کاربر از پایگاه داده OLTP و بارگذاری آن در انبار داده.
  • پیوند میان داده‌های طبقه‌بندی شده بازبینی فیلم و داده‌های خرید کاربر برای رسیدن به داده‌هایی که معیار رفتار کاربر را نشان می‌دهد.

نصب

۱-۴) پیش نیازها

  • Docker
  • psql
  • حساب AWS
  • نصب AWS CLI و پیکره بندی آن

برای تنظیم جداول زیرساخت و پایه، از اسکریپتی به نام setup_infra.sh استفاده می‌کنیم.

git clone https://github.com/josephmachado/beginner_de_project.git
cd beginner_de_project
./setup_infra.sh {your-bucket-name}

در کد بالا به جای your-bucket-name نام مورد نظر خود را قرار دهید. به عنوان مثال:

üsetup_infra.sh sde-sample-bkt

دقت داشته باشید نامی که انتخاب می‌کنید، باید منحصر به فرد باشد. اگر هر یک از دستورات ناموفق بود، setup_infra.sh را باز کنید و دستورات ناموفق را به صورت دستی اجرا کنید.

اسکریپت راه‌اندازی حدود ۱۰ دقیقه طول می‌کشد تا زیرساخت مورد نیاز را تنظیم کند. لاگ‌های مرحله راه‌اندازی به صورت محلی در فایلی به نام setup.log ذخیره می‌شوند. برای مشاهده رابط کاربری Airflow به آدرس زیر مراجعه نمایید.

www.localhost:8080

username: airflow

password: airflow

پس از اتمام پروژه، فراموش نکنید که با استفاده از اسکریپت tear_down_infra.sh زیرساخت را down کنید.

./tear_down_infra.sh {your-bucket-name}

۲-۴) زیر ساخت

برای پروژه خود موارد زیر را خواهیم داشت:

کامپوننت‌های محلی:

  • وب سرور Apache Airflow و زمانبندی در کانتینرهای داکر.
  • پایگاه داده متادیتا Apache Airflow (Postgres) در یک کانتینر داکر.

کامپوننت‌های AWS

  • AWS S3 به عنوان برکه داده
    AWS Redshift Spectrum به عنوان انبار داده.
    AWS IAM اجازه می‌دهد به Spectrum تا در زمان اجرای کوئری به داده‌های موجود در S3 دسترسی داشته باشد.
    AWS EMR برای اجرای کارهای Apache Spark نیاز است.

۳-۴) ساختار برکه داده

از AWS S3 به عنوان برکه داده خود استفاده خواهیم کرد. داده‌های سیستم‌های خارجی برای پردازش بیشتر در اینجا ذخیره می‌شوند. AWS S3 به عنوان فضای ذخیره سازی برای استفاده با AWS Redshift Spectrum استفاده خواهد شد. در پروژه خود از یک باکِت با چندین فولدر استفاده خواهیم کرد.

کامپوننت‌های AWS S3

  • Raw: برای ذخیره داده‌های خام به کار می رود. در بخش طراحی به عنوان Raw Area نشان داده می‌شود.
  • Stage: در قسمت طراحی به عنوان Stage Area نشان داده می‌شود.
  • Scripts: جهت ذخیره اسکریپت های اسپارک برای استفاده توسط AWS EMR استفاده می‌شود.

این الگو به ما اجازه می‌دهد باکِت‌های مختلفی را برای محیط‌های مختلف ایجاد کنیم.

۵-۴) ایجاد جداول و پیکره بندی Airflow

اسکریپت راه‌اندازی، جداول مورد نیاز خط پردازش داده ما را نیز ایجاد می‌کند. جداول زیر را ایجاد می‌کنیم:

جدول retail.user_purchase: اسکریپت ایجاد این جدول در فایل create_user_purchase.sql در فولدر pgsetup در مخزن گیت تعریف شده است. داده‌ها در سیستم فایل کانتینر Postgres ذخیره می‌شوند. این داده‌ها با استفاده از دستور COPY در جدول بارگذاری می‌شوند.

جدول spectrum.user_purchase_staging: به این صورت تعریف می‌شود که داده‌های آن در محل استیج برکه داده ذخیره می‌شود. توجه داشته باشید که جدول یک پارتیشن نیز دارد که در insert_date تعریف شده است.

جدول spectrum.classified_movie_review: به این صورت تعریف می‌شود که داده‌های آن در محل استیج برکه داده ذخیره می‌شود.

جدول public.user_behavior_metric: جدولی است که می‌خواهیم داده‌ها را در آن بارگذاری کنیم.

علاوه بر جداول، اسکریپت راه‌اندازی، اتصالات و متغیرهای Airflow را نیز ایجاد می‌کند.

اتصال redshift: برای اتصال به خوشه AWS Redshift استفاده می‌شود.

اتصال postgres_default: برای اتصال به پایگاه داده محلی Postgres مورد استفاده قرار می‌گیرد.

متغیر BUCKET: برای نشان دادن باکِت مورد استفاده به عنوان برکه داده برای خط پردازش داده استفاده می‌شود.

متغیر EMR_ID: برای ارسال دستورات به یک خوشه AWS EMR استفاده می‌شود.

اتصالات و متغیرها را در محیط Airflow مطابق شکل زیر می‌توانید ببینید.

مرور کد

داده‌های جدول user_behavior_metric از دو مجموعه داده اصلی تولید می‌شود. ما به نحوه جمع‌آوری، تبدیل و استفاده از هر یک از آن‌ها برای تولید داده‌های جدول نهایی خواهیم پرداخت.

۱-۵) بارگذاری داده‌های خرید کاربر در انبار داده

برای بارگذاری اطلاعات خرید کاربر از Postgres در AWS Redshift، تسک‌های زیر را اجرا می‌کنیم.

extract_user_purchase_data: داده‌ها را از Postgres به یک سیستم فایل محلی در کانتینر Postgres بارگذاری می‌کند. این فایل سیستم بین کانتینرهای محلی Postgres و Airflow همگام‌سازی می‌شود. این روال اجازه می‌دهد تا Airflow به این داده‌ها دسترسی داشته باشد.

user_purchase_to_stage_data_lake: داده‌های استخراج ‌شده را به ناحیه استیج برکه داده منتقل می‌کند. در مسیر زیر می‌توانید آن را بیابید.

stage/user_purchase/{{ ds }}/user_purchase.csv

ds با تاریخ اجرا با فرمت YYYY-MM-DD جایگزین می‌شود. ds با پارتیشن insert_date که در ایجاد جدول تعریف شده است تنظیم می‌گردد.

user_purchase_stage_data_lake_to_stage_tbl: یک کوئری Redshift را اجرا می‌کند تا جدول spectrum.user_purchase_staging را از پارتیشن تاریخ جدید آگاه کند.

extract_user_purchase_data = PostgresOperator(
dag=dag,
task_id="extract_user_purchase_data",
sql="./scripts/sql/unload_user_purchase.sql",
postgres_conn_id="postgres_default",
params={"user_purchase": "/temp/user_purchase.csv"},
depends_on_past=True,
wait_for_downstream=True,
)
user_purchase_to_stage_data_lake = PythonOperator(
dag=dag,
task_id="user_purchase_to_stage_data_lake",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "/temp/user_purchase.csv",
"key": "stage/user_purchase/{{ ds }}/user_purchase.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
)
user_purchase_stage_data_lake_to_stage_tbl = PythonOperator(
dag=dag,
task_id="user_purchase_stage_data_lake_to_stage_tbl",
python_callable=run_redshift_external_query,
op_kwargs={
"qry": "alter table spectrum.user_purchase_staging add if not exists partition(insert_date='{{ ds }}') 
location 's3://"
+ BUCKET_NAME
+ "/stage/user_purchase/{{ ds }}'",
},
)
extract_user_purchase_data >> user_purchase_to_stage_data_lake >> user_purchase_stage_data_lake_to_stage_tbl

./scripts/sql/unload_user_purchase.sql

COPY (
select invoice_number,
stock_code,
detail,
quantity,
invoice_date,
unit_price,
customer_id,
country
from retail.user_purchase -- we should have a date filter here to pull only required date's data
) TO '{{ params.user_purchase }}' WITH (FORMAT CSV, HEADER);
-- user_purchase will be replaced with /temp/user_purchase.csv from the params in extract_user_purchase_data task

ذخیره کل مجموعه داده در سیستم فایل محلی به طور کلی نمی تواند الگوی خوبی باشد. زیرا اگر مجموعه داده ما خیلی بزرگ باشد، می‌تواند منجر به خطای سرریز حافظه شود. راه حل جایگزین وجود یک فرآیند جریان[۱] است که به صورت دسته‌ای در برکه داده نوشته شود.

۲-۵) بارگذاری داده‌های طبقه‌بندی بازبینی فیلم در انبار داده

برای دریافت داده‌های طبقه‌بندی شده بازبینی فیلم در AWS Redshift، تسک‌های زیر را اجرا می‌کنیم:

movie_review_to_raw_data_lake: داده‌های فایل محلی (data/movie_review.csv) را در ناحیه داده‌های خام برکه داده کپی می‌کند.
spark_script_to_S3: اسکریپت اسپارک را در ناحیه اسکریپت‌های برکه داده کپی می‌کند. این کار به AWS EMR اجازه می‌دهد تا بتواند به آن اسکریپت‌ها ارجاع دهد.
start_emr_movie_classification_script: گام‌های EMR تعریف شده در فایل dags/scripts/emr/clean_movie_review.json را به دسته EMR اضافه می‌کند. این کار ۳ مرحله EMR را به دسته اضافه می‌کند، این سه مرحله عبارتند از:

  • داده‌های خام را از S3 به HDFS منتقل می‌کند: داده‌ها را از ناحیه خام برکه داده به HDFS EMR کپی می‌کند.
  • بازبینی‌های فیلم را طبقه‌بندی می‌کند: اسکریپت اسپارک طبقه‌بندی بازبینی را اجرا می‌کند.
  • داده‌های طبقه‌بندی شده را از HDFS به S3 منتقل می‌کند: داده‌ها را از HDFS EMR به ناحیه استیج برکه داده کپی می‌کند.

Wait_for_movie_classification_transformation: این یک تسک حسگر است که منتظر می ماند تا مرحله نهایی (انتقال داده‌های طبقه‌بندی شده از HDFS به S3) به پایان برسد.

[۱] Streaming Process

movie_review_to_raw_data_lake = PythonOperator(
dag=dag,
task_id="movie_review_to_raw_data_lake",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "/data/movie_review.csv",
"key": "raw/movie_review/{{ ds }}/movie.csv",
"bucket_name": BUCKET_NAME,
},
)
spark_script_to_s3 = PythonOperator(
dag=dag,
task_id="spark_script_to_s3",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "./dags/scripts/spark/random_text_classification.py",
"key": "scripts/random_text_classification.py",
"bucket_name": BUCKET_NAME,
},
)
start_emr_movie_classification_script = EmrAddStepsOperator(
dag=dag,
task_id="start_emr_movie_classification_script",
job_flow_id=EMR_ID,
aws_conn_id="aws_default",
steps=EMR_STEPS,
params={
"BUCKET_NAME": BUCKET_NAME,
"raw_movie_review": "raw/movie_review",
"text_classifier_script": "scripts/random_text_classifier.py",
"stage_movie_review": "stage/movie_review",
},
depends_on_past=True,
)
last_step = len(EMR_STEPS) - 1
wait_for_movie_classification_transformation = EmrStepSensor(
dag=dag,
task_id="wait_for_movie_classification_transformation",
job_flow_id=EMR_ID,
step_id='{{ task_instance.xcom_pull("start_emr_movie_classification_script", key="return_value")['
+ str(last_step)
+ "] }}",
depends_on_past=True,
)
[
movie_review_to_raw_data_lake,
spark_script_to_s3,
] >> start_emr_movie_classification_script >> wait_for_movie_classification_transformation

۳-۵) ایجاد معیارهای رفتار کاربر

با داده‌های خرید کاربر و همچنین با داده‌های طبقه‌بندی شده فیلم که در انبار داده ذخیره شده است، می‌توانیم داده‌های جدول user_behavior_metric را تولید کنیم. این کار با استفاده از تسک gene_user_behavior_metric انجام می‌شود. این کار یک اسکریپت SQL shift را برای پر کردن جدول public.user_behavior_metric اجرا می‌کند.

generate_user_behavior_metric = PostgresOperator(
dag=dag,
task_id="generate_user_behavior_metric",
sql="scripts/sql/generate_user_behavior_metric.sql",
postgres_conn_id="redshift",
)
end_of_data_pipeline = DummyOperator(task_id="end_of_data_pipeline", dag=dag) # dummy operator to indicate DAG complete
[
user_purchase_stage_data_lake_to_stage_tbl,
wait_for_movie_classification_transformation,
] >> generate_user_behavior_metric >> end_of_data_pipeline

کوئری SQL با استفاده از spectrum.user_purchase_staging و spectrum.classified_movie_review، معیارهای رفتار مشتری را تولید می‌کند.

-- scripts/sql/generate_user_behavior_metric.sql
DELETE FROM public.user_behavior_metric
WHERE insert_date = '{{ ds }}';
INSERT INTO public.user_behavior_metric (
customerid,
amount_spent,
review_score,
review_count,
insert_date
)
SELECT ups.customerid,
CAST(
SUM(ups.Quantity * ups.UnitPrice) AS DECIMAL(18, 5)
) AS amount_spent,
SUM(mrcs.positive_review) AS review_score,
count(mrcs.cid) AS review_count,
'{{ ds }}'
FROM spectrum.user_purchase_staging ups
JOIN (
SELECT cid,
CASE
WHEN positive_review IS True THEN 1
ELSE 0
END AS positive_review
FROM spectrum.classified_movie_review
WHERE insert_date = '{{ ds }}'
) mrcs ON ups.customerid = mrcs.cid
WHERE ups.insert_date = '{{ ds }}'
GROUP BY ups.customerid;

برای مشاهده رابط کاربری Airflow به آدرس زیر مراجعه کنید:

www.localhost:8080

username: airflow

password: airflow

DAG را روشن کنید. دقت داشته باشید ممکن است حدود ۱۰ دقیقه طول بکشد تا کامل اجرا شود.

۴-۵) بررسی نتایج

همانطور که در زیر نشان داده شده است می‌توانید جدول public.user_behavior_metric را از قسمت ترمینال بررسی کنید.

export REDSHIFT_HOST=$(aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text)
psql postgres://sde_user:sdeP0ssword0987@$REDSHIFT_HOST:5439/dev

در SQL prompt، از کوئری‌های زیر برای مشاهده داده‌های تولید شده استفاده کنید.

select insert_date, count(*) as cnt from spectrum.classified_movie_review group by insert_date order by cnt desc; -- 100,000 per day
select insert_date, count(*) as cnt from spectrum.user_purchase_staging group by insert_date order by cnt desc; -- 541,908 per day
select insert_date, count(*) as cnt from public.user_behavior_metric group by insert_date order by cnt desc; -- 908 per day
q

پس از اتمام پروژه، فراموش نکنید که با استفاده از اسکریپت tear_down_infra.sh زیرساخت را down کنید.

./tear_down_infra.sh {your-bucket-name}

۲) نکات کلیدی طراحی

اکنون که خط پردازش داده با موفقیت اجرا می‌شود، وقت آن است که برخی از انتخاب‌های مربوط به طراحی را مرور کنیم.

۱. خط پردازش داده ناتوان

بررسی کنید که همه تسک‌ها ناتوان هستند. اگر یک تسک نیمه‌کاره اجرا شده است و یا یک تسک ناموفق را مجدداً اجرا کنید، خروجی شما نباید با حالتی که تسک با موفقیت اجرا شده بود متفاوت باشد.

۲. نظارت و هشدار

خط پردازش داده را می‌توان از طریق رابط کاربری Airflow نظارت کرد. مراحل EMR را می‌توان از طریق رابط کاربری AWS نظارت کرد. در صورت خرابی تسک‌ها، مشکلات کیفیت داده‌ها، وظایف معلق و … هیچ هشداری در این پروژه نداریم. در پروژه‌های واقعی معمولاً سیستم نظارت و هشدار وجود دارد. برخی از سیستم‌های متداول مورد استفاده برای نظارت و هشدار عبارتند از: Cloud Watch، datadog، newrelic.

۳. کنترل کیفیت

ما کیفیت داده را در این خط پردازش داده بررسی نمی‌کنیم. قبل از بارگذاری داده‌ها در جدول نهایی، می‌توانیم تعداد اولیه، انحراف استاندارد، و غیره را بررسی کنیم. برای الزامات تست پیشرفته، از فریم‌ورک‌های بررسی کیفیت داده، مانند great_expectations می‌توان استفاده کرد.

۴. اجراهای هم‌زمان

اگر مجبور به اجرای مجدد خط پردازش داده پس از چند ماه هستید، ایده آل آن است که آنها را به طور هم‌زمان و نه پشت سر هم اجرا کنید. در مورد سطوح هم‌زمانی و تنظیم آن‌ها می‌توانید تحقیق کنید.

۵. بک‌فیل

فرض کنید یک تغییر منطقی در اسکریپت طبقه‌بندی فیلم دارید. می‌خواهید این تغییر در ۳ هفته گذشته اعمال شود. می‌توانید از اجرای بک‌فیل استفاده کنید.

نتیجه‌گیری

امیدواریم این مقاله ایده خوبی در مورد نحوه طراحی و ساخت خطوط پردازش داده انتها به انتها به شما داده باشد. به عنوان جمع‌بندی موارد زیر در این مقاله مورد بررسی قرار گرفته شد:
• راه‌اندازی زیرساخت
• بهترین شیوه‌های خطوط پردازش داده
• نکات کلیدی طراحی

منابع

https://www.startdataengineering.com/post/data-engineering-project-for-beginners-batch-edition

چه رتبه ای می‌دهید؟

میانگین ۰ / ۵. از مجموع ۰

اولین نفر باش

title sign
معرفی نویسنده
تیم فنی نیک آموز
مقالات
401 مقاله توسط این نویسنده
محصولات
0 دوره توسط این نویسنده
تیم فنی نیک آموز
title sign
دیدگاه کاربران

close-image