خانه مهندسی داده راه اندازی یک پروژه مهندسی داده برای تازه کاران مهندسی داده مسیر مهندسی داده نوشته شده توسط: تیم فنی نیک آموز تاریخ انتشار: ۱۹ بهمن ۱۴۰۰ آخرین بروزرسانی: 23 دی 1403 زمان مطالعه: 15 دقیقه ۰ (۰) مقدمه یک پروژه مهندسی داده واقعی معمولاً شامل تعداد زیادی کامپوننت است. راهاندازی یک پروژه مهندسی داده، مطابق با به روزترین معیارها میتواند کاری بسیار زمانبر باشد. اگر شما با چالشهای زیر مواجه هستید: یک تحلیلگر داده، دانشجو، دانشمند یا مهندس هستید که به دنبال کسب تجربه مهندسی داده هستید، اما قادر به یافتن یک پروژه خوب برای شروع نیستید. میخواهید روی یک پروژه مهندسی داده کار کنید که یک پروژه واقعی را شبیهسازی میکند. به دنبال یک پروژه مهندسی داده انتها به انتها میگردید. به دنبال یک پروژه خوب برای کسب تجربه مهندسی داده برای مصاحبه شغلی هستید. باید گفت این مقاله دقیقا مناسب شماست. در این مقاله، شما با موارد زیر آشنا میشوید. راهاندازی Apache Airflow، AWS EMR، AWS Redshift، AWS Spectrum، AWS S3 یادگیری بهترین شیوه های راهاندازی خطوط پردازش داده. یادگیری نحوه تشخیص نقاط خرابی در خطوط پردازش داده و چگونگی پیادهسازی سیستمهای مقاوم در برابر خرابی. یادگیری نحوه طراحی و ایجاد یک خط پردازش داده بر اساس نیازهای تجاری. اهداف فرض میکنیم که شما در یک شرکت شاغل هستید که روی تجزیه و تحلیل رفتار کاربران کار میکند. به این صورت که دادههای کاربر را جمعآوری میکند و یک پروفایل برای هر کاربر ایجاد میکند. در نظر بگیرید شما به عنوان یک مهندس داده در این شرکت، تسکی به شما ارجاع شده که یک خط پردازش دادهای طراحی کنید تا جدول user_behavior_metric را پر نماید. جدول user_behavior_metric یک جدول OLAP است که برای استفاده توسط تحلیلگران، نرمافزار داشبورد و غیره طراحی خواهد شد. موارد زیر را در نظر بگیرید: user_purchase: جدول OLTP با اطلاعات خرید کاربر. csv: دادههایی که هر روز توسط یک تامین کننده داده خارجی ارسال میشود. طراحی ما از Airflow برای هماهنگ کردن تسکهای زیر استفاده خواهیم کرد: طبقهبندی بازبینیهای فیلم با Apache Spark. بارگذاری دادههای طبقهبندی بازبینیهای فیلم در انبار داده. برکهاج دادههای خرید کاربر از پایگاه داده OLTP و بارگذاری آن در انبار داده. پیوند میان دادههای طبقهبندی شده بازبینی فیلم و دادههای خرید کاربر برای رسیدن به دادههایی که معیار رفتار کاربر را نشان میدهد. نصب ۱-۴) پیش نیازها Docker psql حساب AWS نصب AWS CLI و پیکره بندی آن برای تنظیم جداول زیرساخت و پایه، از اسکریپتی به نام setup_infra.sh استفاده میکنیم. git clone https://github.com/josephmachado/beginner_de_project.git cd beginner_de_project ./setup_infra.sh {your-bucket-name} در کد بالا به جای your-bucket-name نام مورد نظر خود را قرار دهید. به عنوان مثال: üsetup_infra.sh sde-sample-bkt دقت داشته باشید نامی که انتخاب میکنید، باید منحصر به فرد باشد. اگر هر یک از دستورات ناموفق بود، setup_infra.sh را باز کنید و دستورات ناموفق را به صورت دستی اجرا کنید. اسکریپت راهاندازی حدود ۱۰ دقیقه طول میکشد تا زیرساخت مورد نیاز را تنظیم کند. لاگهای مرحله راهاندازی به صورت محلی در فایلی به نام setup.log ذخیره میشوند. برای مشاهده رابط کاربری Airflow به آدرس زیر مراجعه نمایید. www.localhost:8080 username: airflow password: airflow پس از اتمام پروژه، فراموش نکنید که با استفاده از اسکریپت tear_down_infra.sh زیرساخت را down کنید. ./tear_down_infra.sh {your-bucket-name} ۲-۴) زیر ساخت برای پروژه خود موارد زیر را خواهیم داشت: کامپوننتهای محلی: وب سرور Apache Airflow و زمانبندی در کانتینرهای داکر. پایگاه داده متادیتا Apache Airflow (Postgres) در یک کانتینر داکر. کامپوننتهای AWS AWS S3 به عنوان برکه داده AWS Redshift Spectrum به عنوان انبار داده. AWS IAM اجازه میدهد به Spectrum تا در زمان اجرای کوئری به دادههای موجود در S3 دسترسی داشته باشد. AWS EMR برای اجرای کارهای Apache Spark نیاز است. ۳-۴) ساختار برکه داده از AWS S3 به عنوان برکه داده خود استفاده خواهیم کرد. دادههای سیستمهای خارجی برای پردازش بیشتر در اینجا ذخیره میشوند. AWS S3 به عنوان فضای ذخیره سازی برای استفاده با AWS Redshift Spectrum استفاده خواهد شد. در پروژه خود از یک باکِت با چندین فولدر استفاده خواهیم کرد. کامپوننتهای AWS S3 Raw: برای ذخیره دادههای خام به کار می رود. در بخش طراحی به عنوان Raw Area نشان داده میشود. Stage: در قسمت طراحی به عنوان Stage Area نشان داده میشود. Scripts: جهت ذخیره اسکریپت های اسپارک برای استفاده توسط AWS EMR استفاده میشود. این الگو به ما اجازه میدهد باکِتهای مختلفی را برای محیطهای مختلف ایجاد کنیم. ۵-۴) ایجاد جداول و پیکره بندی Airflow اسکریپت راهاندازی، جداول مورد نیاز خط پردازش داده ما را نیز ایجاد میکند. جداول زیر را ایجاد میکنیم: جدول retail.user_purchase: اسکریپت ایجاد این جدول در فایل create_user_purchase.sql در فولدر pgsetup در مخزن گیت تعریف شده است. دادهها در سیستم فایل کانتینر Postgres ذخیره میشوند. این دادهها با استفاده از دستور COPY در جدول بارگذاری میشوند. جدول spectrum.user_purchase_staging: به این صورت تعریف میشود که دادههای آن در محل استیج برکه داده ذخیره میشود. توجه داشته باشید که جدول یک پارتیشن نیز دارد که در insert_date تعریف شده است. جدول spectrum.classified_movie_review: به این صورت تعریف میشود که دادههای آن در محل استیج برکه داده ذخیره میشود. جدول public.user_behavior_metric: جدولی است که میخواهیم دادهها را در آن بارگذاری کنیم. علاوه بر جداول، اسکریپت راهاندازی، اتصالات و متغیرهای Airflow را نیز ایجاد میکند. اتصال redshift: برای اتصال به خوشه AWS Redshift استفاده میشود. اتصال postgres_default: برای اتصال به پایگاه داده محلی Postgres مورد استفاده قرار میگیرد. متغیر BUCKET: برای نشان دادن باکِت مورد استفاده به عنوان برکه داده برای خط پردازش داده استفاده میشود. متغیر EMR_ID: برای ارسال دستورات به یک خوشه AWS EMR استفاده میشود. اتصالات و متغیرها را در محیط Airflow مطابق شکل زیر میتوانید ببینید. مرور کد دادههای جدول user_behavior_metric از دو مجموعه داده اصلی تولید میشود. ما به نحوه جمعآوری، تبدیل و استفاده از هر یک از آنها برای تولید دادههای جدول نهایی خواهیم پرداخت. ۱-۵) بارگذاری دادههای خرید کاربر در انبار داده برای بارگذاری اطلاعات خرید کاربر از Postgres در AWS Redshift، تسکهای زیر را اجرا میکنیم. extract_user_purchase_data: دادهها را از Postgres به یک سیستم فایل محلی در کانتینر Postgres بارگذاری میکند. این فایل سیستم بین کانتینرهای محلی Postgres و Airflow همگامسازی میشود. این روال اجازه میدهد تا Airflow به این دادهها دسترسی داشته باشد. user_purchase_to_stage_data_lake: دادههای استخراج شده را به ناحیه استیج برکه داده منتقل میکند. در مسیر زیر میتوانید آن را بیابید. stage/user_purchase/{{ ds }}/user_purchase.csv ds با تاریخ اجرا با فرمت YYYY-MM-DD جایگزین میشود. ds با پارتیشن insert_date که در ایجاد جدول تعریف شده است تنظیم میگردد. user_purchase_stage_data_lake_to_stage_tbl: یک کوئری Redshift را اجرا میکند تا جدول spectrum.user_purchase_staging را از پارتیشن تاریخ جدید آگاه کند. extract_user_purchase_data = PostgresOperator( dag=dag, task_id="extract_user_purchase_data", sql="./scripts/sql/unload_user_purchase.sql", postgres_conn_id="postgres_default", params={"user_purchase": "/temp/user_purchase.csv"}, depends_on_past=True, wait_for_downstream=True, ) user_purchase_to_stage_data_lake = PythonOperator( dag=dag, task_id="user_purchase_to_stage_data_lake", python_callable=_local_to_s3, op_kwargs={ "file_name": "/temp/user_purchase.csv", "key": "stage/user_purchase/{{ ds }}/user_purchase.csv", "bucket_name": BUCKET_NAME, "remove_local": "true", }, ) user_purchase_stage_data_lake_to_stage_tbl = PythonOperator( dag=dag, task_id="user_purchase_stage_data_lake_to_stage_tbl", python_callable=run_redshift_external_query, op_kwargs={ "qry": "alter table spectrum.user_purchase_staging add if not exists partition(insert_date='{{ ds }}') location 's3://" + BUCKET_NAME + "/stage/user_purchase/{{ ds }}'", }, ) extract_user_purchase_data >> user_purchase_to_stage_data_lake >> user_purchase_stage_data_lake_to_stage_tbl ./scripts/sql/unload_user_purchase.sql COPY ( select invoice_number, stock_code, detail, quantity, invoice_date, unit_price, customer_id, country from retail.user_purchase -- we should have a date filter here to pull only required date's data ) TO '{{ params.user_purchase }}' WITH (FORMAT CSV, HEADER); -- user_purchase will be replaced with /temp/user_purchase.csv from the params in extract_user_purchase_data task ذخیره کل مجموعه داده در سیستم فایل محلی به طور کلی نمی تواند الگوی خوبی باشد. زیرا اگر مجموعه داده ما خیلی بزرگ باشد، میتواند منجر به خطای سرریز حافظه شود. راه حل جایگزین وجود یک فرآیند جریان[۱] است که به صورت دستهای در برکه داده نوشته شود. ۲-۵) بارگذاری دادههای طبقهبندی بازبینی فیلم در انبار داده برای دریافت دادههای طبقهبندی شده بازبینی فیلم در AWS Redshift، تسکهای زیر را اجرا میکنیم: movie_review_to_raw_data_lake: دادههای فایل محلی (data/movie_review.csv) را در ناحیه دادههای خام برکه داده کپی میکند. spark_script_to_S3: اسکریپت اسپارک را در ناحیه اسکریپتهای برکه داده کپی میکند. این کار به AWS EMR اجازه میدهد تا بتواند به آن اسکریپتها ارجاع دهد. start_emr_movie_classification_script: گامهای EMR تعریف شده در فایل dags/scripts/emr/clean_movie_review.json را به دسته EMR اضافه میکند. این کار ۳ مرحله EMR را به دسته اضافه میکند، این سه مرحله عبارتند از: دادههای خام را از S3 به HDFS منتقل میکند: دادهها را از ناحیه خام برکه داده به HDFS EMR کپی میکند. بازبینیهای فیلم را طبقهبندی میکند: اسکریپت اسپارک طبقهبندی بازبینی را اجرا میکند. دادههای طبقهبندی شده را از HDFS به S3 منتقل میکند: دادهها را از HDFS EMR به ناحیه استیج برکه داده کپی میکند. Wait_for_movie_classification_transformation: این یک تسک حسگر است که منتظر می ماند تا مرحله نهایی (انتقال دادههای طبقهبندی شده از HDFS به S3) به پایان برسد. [۱] Streaming Process movie_review_to_raw_data_lake = PythonOperator( dag=dag, task_id="movie_review_to_raw_data_lake", python_callable=_local_to_s3, op_kwargs={ "file_name": "/data/movie_review.csv", "key": "raw/movie_review/{{ ds }}/movie.csv", "bucket_name": BUCKET_NAME, }, ) spark_script_to_s3 = PythonOperator( dag=dag, task_id="spark_script_to_s3", python_callable=_local_to_s3, op_kwargs={ "file_name": "./dags/scripts/spark/random_text_classification.py", "key": "scripts/random_text_classification.py", "bucket_name": BUCKET_NAME, }, ) start_emr_movie_classification_script = EmrAddStepsOperator( dag=dag, task_id="start_emr_movie_classification_script", job_flow_id=EMR_ID, aws_conn_id="aws_default", steps=EMR_STEPS, params={ "BUCKET_NAME": BUCKET_NAME, "raw_movie_review": "raw/movie_review", "text_classifier_script": "scripts/random_text_classifier.py", "stage_movie_review": "stage/movie_review", }, depends_on_past=True, ) last_step = len(EMR_STEPS) - 1 wait_for_movie_classification_transformation = EmrStepSensor( dag=dag, task_id="wait_for_movie_classification_transformation", job_flow_id=EMR_ID, step_id='{{ task_instance.xcom_pull("start_emr_movie_classification_script", key="return_value")[' + str(last_step) + "] }}", depends_on_past=True, ) [ movie_review_to_raw_data_lake, spark_script_to_s3, ] >> start_emr_movie_classification_script >> wait_for_movie_classification_transformation ۳-۵) ایجاد معیارهای رفتار کاربر با دادههای خرید کاربر و همچنین با دادههای طبقهبندی شده فیلم که در انبار داده ذخیره شده است، میتوانیم دادههای جدول user_behavior_metric را تولید کنیم. این کار با استفاده از تسک gene_user_behavior_metric انجام میشود. این کار یک اسکریپت SQL shift را برای پر کردن جدول public.user_behavior_metric اجرا میکند. generate_user_behavior_metric = PostgresOperator( dag=dag, task_id="generate_user_behavior_metric", sql="scripts/sql/generate_user_behavior_metric.sql", postgres_conn_id="redshift", ) end_of_data_pipeline = DummyOperator(task_id="end_of_data_pipeline", dag=dag) # dummy operator to indicate DAG complete [ user_purchase_stage_data_lake_to_stage_tbl, wait_for_movie_classification_transformation, ] >> generate_user_behavior_metric >> end_of_data_pipeline کوئری SQL با استفاده از spectrum.user_purchase_staging و spectrum.classified_movie_review، معیارهای رفتار مشتری را تولید میکند. -- scripts/sql/generate_user_behavior_metric.sql DELETE FROM public.user_behavior_metric WHERE insert_date = '{{ ds }}'; INSERT INTO public.user_behavior_metric ( customerid, amount_spent, review_score, review_count, insert_date ) SELECT ups.customerid, CAST( SUM(ups.Quantity * ups.UnitPrice) AS DECIMAL(18, 5) ) AS amount_spent, SUM(mrcs.positive_review) AS review_score, count(mrcs.cid) AS review_count, '{{ ds }}' FROM spectrum.user_purchase_staging ups JOIN ( SELECT cid, CASE WHEN positive_review IS True THEN 1 ELSE 0 END AS positive_review FROM spectrum.classified_movie_review WHERE insert_date = '{{ ds }}' ) mrcs ON ups.customerid = mrcs.cid WHERE ups.insert_date = '{{ ds }}' GROUP BY ups.customerid; برای مشاهده رابط کاربری Airflow به آدرس زیر مراجعه کنید: www.localhost:8080 username: airflow password: airflow DAG را روشن کنید. دقت داشته باشید ممکن است حدود ۱۰ دقیقه طول بکشد تا کامل اجرا شود. ۴-۵) بررسی نتایج همانطور که در زیر نشان داده شده است میتوانید جدول public.user_behavior_metric را از قسمت ترمینال بررسی کنید. export REDSHIFT_HOST=$(aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text) psql postgres://sde_user:sdeP0ssword0987@$REDSHIFT_HOST:5439/dev در SQL prompt، از کوئریهای زیر برای مشاهده دادههای تولید شده استفاده کنید. select insert_date, count(*) as cnt from spectrum.classified_movie_review group by insert_date order by cnt desc; -- 100,000 per day select insert_date, count(*) as cnt from spectrum.user_purchase_staging group by insert_date order by cnt desc; -- 541,908 per day select insert_date, count(*) as cnt from public.user_behavior_metric group by insert_date order by cnt desc; -- 908 per day q پس از اتمام پروژه، فراموش نکنید که با استفاده از اسکریپت tear_down_infra.sh زیرساخت را down کنید. ./tear_down_infra.sh {your-bucket-name} ۲) نکات کلیدی طراحی اکنون که خط پردازش داده با موفقیت اجرا میشود، وقت آن است که برخی از انتخابهای مربوط به طراحی را مرور کنیم. ۱. خط پردازش داده ناتوان بررسی کنید که همه تسکها ناتوان هستند. اگر یک تسک نیمهکاره اجرا شده است و یا یک تسک ناموفق را مجدداً اجرا کنید، خروجی شما نباید با حالتی که تسک با موفقیت اجرا شده بود متفاوت باشد. ۲. نظارت و هشدار خط پردازش داده را میتوان از طریق رابط کاربری Airflow نظارت کرد. مراحل EMR را میتوان از طریق رابط کاربری AWS نظارت کرد. در صورت خرابی تسکها، مشکلات کیفیت دادهها، وظایف معلق و … هیچ هشداری در این پروژه نداریم. در پروژههای واقعی معمولاً سیستم نظارت و هشدار وجود دارد. برخی از سیستمهای متداول مورد استفاده برای نظارت و هشدار عبارتند از: Cloud Watch، datadog، newrelic. ۳. کنترل کیفیت ما کیفیت داده را در این خط پردازش داده بررسی نمیکنیم. قبل از بارگذاری دادهها در جدول نهایی، میتوانیم تعداد اولیه، انحراف استاندارد، و غیره را بررسی کنیم. برای الزامات تست پیشرفته، از فریمورکهای بررسی کیفیت داده، مانند great_expectations میتوان استفاده کرد. ۴. اجراهای همزمان اگر مجبور به اجرای مجدد خط پردازش داده پس از چند ماه هستید، ایده آل آن است که آنها را به طور همزمان و نه پشت سر هم اجرا کنید. در مورد سطوح همزمانی و تنظیم آنها میتوانید تحقیق کنید. ۵. بکفیل فرض کنید یک تغییر منطقی در اسکریپت طبقهبندی فیلم دارید. میخواهید این تغییر در ۳ هفته گذشته اعمال شود. میتوانید از اجرای بکفیل استفاده کنید. نتیجهگیری امیدواریم این مقاله ایده خوبی در مورد نحوه طراحی و ساخت خطوط پردازش داده انتها به انتها به شما داده باشد. به عنوان جمعبندی موارد زیر در این مقاله مورد بررسی قرار گرفته شد: • راهاندازی زیرساخت • بهترین شیوههای خطوط پردازش داده • نکات کلیدی طراحی منابع https://www.startdataengineering.com/post/data-engineering-project-for-beginners-batch-edition چه رتبه ای میدهید؟ میانگین ۰ / ۵. از مجموع ۰ اولین نفر باش معرفی نویسنده مقالات 401 مقاله توسط این نویسنده محصولات 0 دوره توسط این نویسنده تیم فنی نیک آموز معرفی محصول مجتبی بنائی دوره آموزش مهندسی داده [Data Engineering] 2.380.000 تومان مقالات مرتبط ۰۴ مهر مهندسی داده معماری Data Lakehouse چیست و چگونه کار میکند؟ نگین فاتحی ۲۴ شهریور مهندسی داده ردیس چیست و انواع آن کدامند؟ نگین فاتحی ۱۸ شهریور مهندسی داده مراحل ساده برای تحلیل داده با ChatGPT و پایتون نگین فاتحی ۱۰ شهریور مهندسی داده NoSQL چیست؟ هر آن چیزی که درباره پایگاه داده NoSQL باید بدانید تیم فنی نیک آموز دیدگاه کاربران لغو پاسخ دیدگاه نام و نام خانوادگی ایمیل ذخیره نام، ایمیل و وبسایت من در مرورگر برای زمانی که دوباره دیدگاهی مینویسم. موبایل برای اطلاع از پاسخ لطفاً مرا با خبر کن ثبت دیدگاه Δ