خانه مهندسی داده بررسی عملی سه روش پردازش دادههای آفلاین در اسپارک [بخش اول] مهندسی داده نوشته شده توسط: مجتبی بنائی ۱۰ آبان ۱۳۹۹ زمان مطالعه: 18 دقیقه ۰ (۰) مقدمه یکی از ابزارهای پرکاربرد و رایج در پردازش دادههای کلان، اسپارک است. این چارچوب پردازش داده محبوب، مشابه با هدوپ، پردازشها را در سطح شبکه توزیع میکند و پس از جمع آوری نتایج میانی، آنها را به کاربر نمایش میدهد اما بر خلاف هدوپ، استفاده موثرتری از حافظه انجام میدهد که باعث افزایش سرعتی گاها تا صدبرابر هدوپ می شود و نحوه کار و برنامه نویسی با آن، بسیار راحتتر و محدود به سبک کلید/مقداری هدوپ نیست (کتابخانه نگاشت/کاهش یا Map/Reduce). به عنوان یک مهندس داده، خیلی از اوقات نیاز دارید روی دادههای موجود پردازشی انجام بدهید و خروجی را در یک دیتابیس یا فضای ذخیره سازی فایل، ذخیره کنید. در این سری مقالات، قصد داریم مروری سریع بر سه پارادایم پردازش دادههای معمولی (دادههای ذخیره شده و جدولمانند) در اسپارک با استفاده از چند مثال ساده اما کاربردی داشته باشیم. پردازش به کمک RDD های پایه اسپارک . پردازش با دیتافریمها (مشابه با دیتافریمهای پانداز در پایتون) پردازش با Spark SQL و در انتها هم یک مثال پردازش گراف را با هم بررسی خواهیم کرد. برای این منظور، ابتدا مجموعه سوالاتی را بر روی چند دیتاست تعریف نموده، سپس به کمک روشهای سه گانه فوق، به آنها پاسخ خواهیم داد. برای انجام این کارگاه عملی، بهتر است با معماری و مولفههای اصلی اسپارک آشنا باشید. معماری و دستورات پایه اسپارک را میتوانید در این آدرس فرابگیرید. برای کار با اسپارک هم از محیط آنلاین Databricks استفاده خواهد شد و یک ارتباط خوب اینترنتی میتواند تنها نیازمندی این کارگاه باشد. فایلهای مورد نیاز برا ی بخشهای مختلف این مقاله آموزشی در انتهای مقاله قرار داده شده است. سوالات و خواستهها سوالات با دید تمرین و کار عملی طراحی شده است. بنابراین ابتدا سوالات را به دقت بخوانید و اگر قبلا با اسپارک کار کردهاید، سعی کنید آنها را خودتان انجام دهید. در هر صورت، می توانید به بخش کارگاه عملی در انتهای سوالات مراجعه و راه حلهای پیشنهادی را مشاهده کرده و به صورت عملی، آنها را انجام دهید. دستورات پایه در این بخش قصد داریم عملیات نگاشت و کاهش که سبک پیش فرض پردازش در هدوپ است را شبیه سازی کنیم. در این حالت، باید ورودیها را به شکل زوج مرتب کلید مقدار (k,v) در آوریم و خروجیها را هم به همین شکل تولید کنیم. همانطور که خواهید داد، علیرغم امکانات متعدد اسپارک، اگر بخواهیم به این سبک وفادار بمانیم، سرعت توسعه و تولید کدها کاهش خواهد یافت چون برای هر مساله، علاوه بر تمرکز بر خود ورودی و خروجی و الگوریتم پردازش، باید روی تبدیل آنها به این قالب هم وقت بگذاریم. بخش اول در این قسمت با استفاده از مدل نگاشت-کاهش(Map/Reduce) تعداد لغات فایل input.txt (نسخه متنی کتاب جنگ و صلح تولستوی) را شمارش کرده و نمایش دهید. همچنین گزارش کنید که هر کلمه چند بار تکرار شده است و خروجی را در یک فایل .txt ذخیره کنید. در این گام تنها علائم نقطه گذاری (علامت تعجب، سوال، نقطه و …) را حذف کنید و پیشپردازش دیگری لازم نیست. بخش دوم حال در این گام با استفاده از مدل (Map/Reduce) تعداد تمامی کلماتی که با حرف (M) آغاز میشوند را بیابید. (مستقل از کوچک و بزرگ بودن M) بخش سوم در این بخش نیز همانند دو گام قبلی، تعداد لغات ۵ حرفی موجود درفایل input.txt را یافته، لغاتی که با حروف صدادار شروع میشوند را از خروجی حذف کنید و نتیجه نهایی را به صورت مرتب نمایش دهید. بخش چهارم به کمک مراحل قبلی، ایست واژهها(stop words) را بیابید. کلمهای را ایست واژه درنظر بگیرید که جزء ده درصد کلمات پرتکرار این فایل قرار بگیرد. سپس تابعی بنویسید که یک خط را گرفته، تمام حروف غیر الفبایی و ایست واژههای آنرا حذف کند. این تابع را روی تمام خطوط اعمال کرده، نتیجه را در یک فایل، ذخیره کنید. بخش پنجم تعداد دو کلمهایهایی که بیشتر از یک بار در فایل اصلی (input.txt) کنار هم آمدهاند را به ترتیب فرکانس، یافته و نمایش دهید. منظور از دوکلمهای(bigram)، دو لغتی هستند که پشت سر هم به کار رفتهاند بررسی یک فایل لاگ وب سرور فایل لاگ پیوست این تمرین با نام” Log “که مربوط به در خواستهای HTTP است. بااستفاده از این فایل به سوال زیر پاسخ دهید (برای این بخش از دستورات پایه اسپارک -کار با RDD – استفاده کنید) بخش اول چند Host یکتا در این لاگ فایل وجود دارد؟ بخش دوم متوسط تعداد درخواستهای روزانه برای هر میزبان منحصر به فرد (آیپی یا نام دامنه) چقدر است؟ ابتدا متوسط تعداد درخواستهای هر دامنه در هر روز را به دست آورید و سپس، متوسط نهایی را برای هر دامنه یا آیپی، تعیین کنید. بخش سوم تعداد فایلهای گیف درخواست شده در این فایل لاگ چقدر است؟ بخش چهارم دامنههای پرتقاضا (بیش از ۳ بار ) را یافته، آنها را به صورت مرتب شده نمایش دهید. آی پیها را جزء این دامنهها در نظر نگیرید. سپس دامنه پرتقاضا به ازای هر روز را پیدا کنید (دامنهای با بیشترین تعداد درخواست در یک روز). بخش پنجم خطاهای HTTP (غیر ازکد ۲۰۰، بقیه را همه خطا در نظر بگیرید.) را یافته، تعداد تکرار آنها در یک نمودار ستونی نمایش دهید. کار با دیتافریمها / Spark SQL با توجه به دیتاست stock.csv به سوالات زیر پاسخ دهید. این دیتاست، دادههای بورس مربوط به یکی از کمپانیهای بزرگ از سال ۲۰۱۲ تا ۲۰۱۷ میباشد. برای انجام این تمرین از دو روش استفاده کنید یعنی برای هر بخش، خروجی مورد نظر را با هر کدام از دو روش زیر به صورت جداگانه به دست آورید: Spark DataFrames- با توابع دیتافریم ( DataFrame Operations such as min,avg,…) Spark SQL – با دستورات SQL (spark.sql) بخش اول: یک ستون اطلاعات جدید با ستونی به نام HV ایجاد کنید که این نسبت بالاترین قیمت بر حجم سهام معامله شده برای یک روز است. بخش دوم: پیک بالاترین قیمت، برای چه روزی بوده است؟ بخش سوم: میانگین ستون، Close، چه مقدار است؟ بخش چهارم: مقدار ماکزیمم و مینیمم ستون Volum را مشخص کنید. بخش پنجم: چند روز ستون Close کمتر از ۶۰ دلار بوده است؟ بخش ششم: Pearson correlation بین ستون های High و Volum چقدر است؟ بخش هفتم: ماکزیمم ستون High در هر سال چقدر است؟ Spark GraphX فایل پیوست edgs.txt یالها و فایل پیوست vertex.txt درجههای یک گراف هستند. گراف مورد نظر ما از مقالات ویکی پدیا استخراج شدهاند. هر گره یک مقاله ویکی پدیا و یال از مقاله A به مقاله B نشان دهنده این است که مقاله A به مقاله B ارجاع داده است. نکته: میتوانید برای کار با گراف در اسپارک از کتابخانه GraphFrames استفاده کنید. بخش اول: با استفاده از فایل یالها و گرهها، این گراف را ایجاد کنید. بخش دوم: بیشترین درجه ورودی در این گراف چقدر است؟ بیشترین درجه خروجی (مقالهای که احتمالا Survey بوده و شامل لینک زیادی به سایر مقالات است.) چند است؟ بخش سوم: سایز هرکدام از ConnectedComponentها چقدر است؟ بخش چهارم: ده تا از مقالات برتر را بیابید (مقالاتی که بیشترین درجه ورودی را داشتهاند). راه اندازی محیط کار در این بخش به چگونگی راه اندازی و استفاده از بستر Databricks Cloud میپردازیم. ثبت نام در سرویس ابری Databricks ابتدا لازم است تا عملیات ثبتنام درون این سایت انجام شود. این سایت یک پلن رایگان با نام Community ارائه میکند که امکان ساخت یک کلاستر با ۱۵ گیگابایت حافظه موقت و دو هسته پردازنده را بدست میدهد. ایجاد کلاستر پردازشی پس از ورود به پنل کاربری، ابتدا لازم است تا یک کلاستر پردازشی ساخته شود. برای اینکار از منوی سمت چپ و بخش Clusters اقدام میکنیم. همانند تصویر زیر، لازم است تا یک نام برای کلاستر انتخاب کرده و تنظیمات مربوط به نسخه ابزارهای نصب شده روی کلاستر، زون قرارگیری کلاستر بر روی سرورهای سرویسدهنده و همچنین تنظیمات مربوط به کانفیگ اسپارک را مشخص کنیم. در این مرحله موارد پیش فرض انتخاب شده را تغییر نمیدهیم (نسخه اسپارک ۲.۴.۵ بدون GPU).لازم به ذکر است که در پلن رایگان Community Edition، تنها امکان ایجاد یک کلاستر برای کاربر وجود داشته و همچنین در صورت عدم استفاده از کلاستر ایجاد شده به مدت ۲ ساعت، کلاستر به طور خودکار به حالت Terminated رفته و پس از آن امکان Resume برای آن وجود نخواهد داشت. در این حالت میبایست کلاستر را از لیست کلاسترهای ایجاد شده حذف کرده و مجدداً یک کلاستر جدید ایجاد کرد. پس از ایجاد یک کلاستر، پیکربندی و راه اندازی آن به مدت زمانی حدود پنج دقیقه زمان احتیاج دارد که در طول این مدت، کلاستر در حالت Pending بوده و امکان استفاده از آن مهیا نمیباشد. پس از اتمام عملیات راه اندازی، وضعیت کلاستر به حالت Ready در خواهد آمد. نصب کتابخانهها بدیهی است که تمامی کتابخانههای مورد نیاز به صورت built-in در ابزارهایی نظیر پایتون و اسپارک که بر روی کلاستر به صورت خودکار نصب شدهاند موجود نخواهد بود. برای اینکار لازم است تا کتابخانههای مورد نیاز به صورت دستی نصب گردند. نصب کتابخانههای PyPI پایتون در طی انجام این تمرین به کتابخانههای unidecode (سؤال اول) و matplotlib (سؤال دوم) و networkx (سؤال چهارم) احتیاج خواهیم داشت. برای نصب این کتابخانهها لازم است تا از منوی سمت راست، گزینه Workspace انتخاب شده و از زیر منوی Shared با کلیک بر فلش جهتدار، از منوی مربوطه گزینه Create و سپس Library انتخاب گردد:در صفحه باز شده با انتخاب گزینه PyPI میتوان اسم کتابخانه مورد نظر را وارد کرده و سپس با فشردن دکمه Create دادههای مربوط به این کتابخانه به Workspace کاربری منتقل شده و در صفحه بعد در صورتی که کلاستر ایجاد شده در حالت Ready باشد، به صورت خودکار روی آن نصب میگردد.لازم به ذکر است که در صورتی که کلاستر در حالت Ready نبوده و یا کلاستر جدیدی پس از تولید کتابخانه در Workspace ایجاد شده باشد، لازم است تا از منوی Workspace و بخش Shared و انتخاب کتابخانه مورد نظر، طبق تصویر زیر با انتخاب کلاستر مربوطه و دکمه Install، کتابخانه پایتون را بر روی آن نصب کرد: نصب کتابخانههای جاوا (بستههای اسپارک) در این تمرین لازم است تا کتابخانه Graphframes (سؤال چهارم) بر روی کلاستر پردازشی نصب گردد. برای اینکار ابتدا لازم است تا از این سایت (Spark Packages) نسخه مناسب برای نسخه اسپارک موجود بر روی کلاستر پردازشی بارگذاری گردد. در اینجا فایل jar کتابخانه Graphframes با نسخه ۰.۸.۰-spark2.4-s_2.11 مورد نظر میباشد. پس از بارگذاری فایل jar مربوطه، لازم است تا از منوی سمت راست پنل Databricks، انتخاب گزینه Workspace انتخاب شده و از زیر منوی Shared با کلیک بر فلش جهتدار، از منوی مربوطه گزینه Create و سپس Library انتخاب گردد. در صفحه باز شده نیز با انتخاب بخش Upload و سپس Jar، فایل مربوط به کتابخانه را بارگذاری کرده و با فشردن دکمه Create، کتابخانه مورد نظر به صورت خودکار بر روی کلاستر مربوطه (پس از انتقال به Workspace) نصب میگردد. همانند بخش قبل لازم است تا در صورت ایجاد کلاستر جدید پس از ایجاد کتابخانه در Workspace، از طریق صفحه مربوط به کتابخانه موجود در Workspace اقدام به نصب آن روی کلاستر مربوطه کرد. بارگذاری دیتا و نحوه آدرسدهی در DBFS سرویس Databricks برای راحتی کار با دادهها برای کاربران، بستر فایل سرویس خود را از بستر کلاسترهای پردازشی به صورت جداگانه و با نام DBFS ارائه میدهد. این بستر به صورت مستقل از کلاستر پردازشی کار کرده و در صورت انقضای یک کلاستر، دادههای موجود در DBFS کماکان برای هر کاربر در دسترس میباشد. بر این اساس، در صورت حذف یک کلاستر، کماکان میتوان به فایلهای بارگذاری شده در این بستر دسترسی داشت. فایلهایی که به طور معمول در این بستر ذخیره میشوند عبارتند از: فایل دادههای خام و ورودی (نظیر CSV، TXT و …) فایل خروجی برنامهها (متن، نمودار، …) فایلهای Temporary کتابخانههای مختلف مورد استفاده در برنامهها لاگفایلهای اجرای برنامهها لازم به ذکر است که آدرسدهی فایلسیستم DBFS میبایست به صورت خاص در برنامههای اجرایی صورت گیرد، در غیر اینصورت فایلها به صورت Local از کلاستر پردازشی آدرسدهی خواهند شد و این باعث میشود که دسترسی به این فایلها دشوار و در صورتی که کلاستر منقضی شود، این فایلها از بین بروند. بارگذاری دیتاستها برای بارگذاری دیتاستها در فایلسیستم Databricks میبایست از منوی Data دکمه Add Data را فشرد. در صفحه باز شده از بخش Upload File با کلیک در باکس File و انتخاب فایل دیتاست از کامپیوتر شخصی خود، فایل مورد نظر در فایلسیستم DBFS بارگذاری خواهد شد. پس از بارگذاری موفقیت آمیز، مسیر فایل باگذاری شده در پایین بخش مربوطه نمایش داده میشودلازم به ذکر است که تمامی دیتاهای بارگذاری شده به این نحو، در زیر مسیر /FileStore/tables قرار خواهند گرفت. میتوان در این زیر مسیر از بخش Upload to DBFS دایرکتوریهای زیرینی نیز برای مسیر این فایل تعیین کرد. دسترسی و مشاهده فایلها فایلهای موجود در DBFS کاربری از طریق بخش DBFS از صفحه Create New Table (که در بخش قبلی ذکر شد) قابل مشاهده خواهد بود:در این صفحه میتوان لیست فایلهای موجود در DBFS را مشاهده کرده و همچنین به ازای هر فایل، مسیر مربوط به آن فایل در DBFS را مشاهده کرد. اما متأسفانه امکان دسترسی به محتویات فایلها ازین بخش ممکن نیست. برای دسترسی به محتویات فایلها میتوان از URL تأمین شده توسط Databricks استفاده کرد. به طور مثال برای دسترسی به یک فایل با مسیر /FileStore/HW3/Sec1/1.txt میبایست از طریق آدرس زیر اقدام کرد: ???????????=https://community.cloud.databricks.com/files/HW3/Sec1/1.txt?o که در آن، مقدار پارامتر o برابر همان مقدار موجود در URL دسترسی به پنل Databricks میباشد. لازم به ذکر است که دسترسی به فایلها از این طریق تنها برای فایلهایی ممکن است که در زیر مسیر FileStore ذخیره باشند.[/vc_column_text][vc_empty_space height=”20px”][/vc_column][/vc_row][vc_row][vc_column][vc_column_text] مدیریت و آدرسدهی فایلها از طریق برنامه دسترسی به فایلهای موجود در DBFS و یا نوشتن یک فایل بر روی DBFS با استفاده از برنامه پایتون (نوتبوک) میبایست با آدرسدهیهای مخصوصی انجام گردد. در غیر اینصورت مسیرها به صورت Local در کلاستر پردازشی در نظر گرفته خواهند شد. بر این اساس، و بر اساس نیازهای این تمرین، از دو نوع دسترسی مختلف که Databricks تأمین کرده است استفاده میکنیم. لازم به ذکر است که به صورت مشروح دسترسیهای مختلف در این لینک توضیح داده شده اند. اولین نوع دسترسی از طریق دستورات مربوط به اسپارک صورت میگیرد. برای دسترسی به فایلهای موجود بر روی DBFS از طریق توابع اسپارک، نظیر sc.textFile میتوان از این نوع آدرسدهی استفاده کرد. برای یک فایل با آدرس /FileStore/tables/input.txt میبایست از آدرس زیر استفاده کرد: dbfs:/FileStore/tables/input.txt این نوع از آدرسدهی را میتوان در تمامی توابع اسپارک استفاده نمود. لازم به ذکر است که این نوع از آدرس دهی در توابع IO پایتون معتبر نمیباشد. برای آدرسدهی فایلها در توابع IO پایتون، به طور مثال، برای نوشتن محتویات درون یک فایل با آدرس /FileStore/HW3/Sec1/1.txt میبایست از آدرسدهی زیر استفاده کرد: /dbfs/FileStore/HW3/Sec1/1.txt لازم به ذکر است که این نوع آدرسدهی در توابع مربوط به اسپارک معتبر نمیباشد. زیر مسیر dbfs یک دایرکتوری mount شده توسط Databricks در کلاسترهای پردازشی بوده که امکان دسترسی مستقیم به DBFS را فراهم میکند. ایجاد نوتبوک برای ایجاد نوتبوک در پنل Databricks لازم است تا از منوی Workspace زیر منوی Users و زیر منوی «نام کاربری» و فشردن فلش جهتدار، گزینه Create از منوی ظاهر شده و انتخاب گزینه Notebook اقدام کرد. در صفحه جدید باز شده، میتوان با نام گذاری نوتبوک و انتخاب زبان آن، یک کلاستر فعال را برای آن انتخاب کرد. پس از آن صفحه نوتبوک ایجاد شده نمایش داده خواهد شد. برای دسترسی آتی به این نوتبوک میتوان از همان مسیر یاد شده اقدام کرد. سؤالات پایه بخش اول from unidecode import unidecode import string lines = sc.textFile("dbfs:/FileStore/tables/input.txt") def cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) line = line.lower() for word in line.split(): yield word, 1 transformation = (lines.flatMap(cleaner) .reduceByKey(lambda a,b: a+b) .map(lambda t: (t[1],t[0])) .sortByKey(ascending=False) .map(lambda t: (t[1],t[0]))) print(transformation.take(10)) with open("/dbfs/FileStore/HW3/Sec1/1.txt", "w") as f: for (word, count) in transformation.collect(): f.write(f'{word}: {count}\n') خروجی: لیست ۱۰ کلمه پر تعداد به صورت زیر چاپ شده است: [('the', 336), ('and', 238), ('of', 162), ('to', 154), ('in', 128), ('a', 106), ('is', 76), ('this', 74), ('people', 62), ('that', 61)] توضیحات: در این کد ابتدا یک تابع cleaner به عنوان map عملیات پاکسازی و پیشپردازشهای مورد نیاز را روی هر خط از فایل ورودی انجام میدهد. بدین منظور در مرحله اول تمامی کاراکترهای Unicode موجود در متن به نزدیکترین کاراکتر ASCII توسط کتابخانه Unidecode تبدیل شدهاند. علت انجام این امر، استفاده از کاراکترهای گرامری )نظیر (“ در متن داده شده بوده که اگرچه در ظاهر قابل مشاهده بوده ولی در عمل با کاراکترهای ASCII (نظیر “) مطابقت ندارند. همین امر باعث میشود که امکان تشخیص و حذف آنها در متن ممکن نباشد. در مرحله بعدی مقادیر فاصله از ابتدا و انتهای خطوط حذف شده و سپس به حذف تمامی علامتهای گرامری (Punctuations) از خط پرداخته میشود. در ادامه تمامی کاراکترهای موجود در خط به صورت Minimal تبدیل شده و در نهایت کلمات موجود در خط جداسازی و به صورت یک generator بازگردانده میشوند. مقدار خروجی لیستی از تاپلهای کلمات بوده که هر کلمه با شمارنده خود (عدد ۱) جفت شده است. در مرحله بعد از Tranformation، یک عملیات به صورت reduceByKey صورت گرفته و در طی آن به ازای هر کلمه شمارش لازم روی شمارندههای کلمات صورت میگیرد. در انتها نیز با انجام یک عملیات map برای معکوس کردن تاپلها، مرتبسازی آنها و انجام یک map دیگر برای تبدیل آنها به فرمت تاپل اولیه، کار مرتبسازی نزولی بر اساس تعداد کلمات انجام میگیرد. لازم به ذکر است که در این مرحله به جای استفاده از تابع sortByKey امکان استفاده از تابع sortBy نیز وجود داشت که در آن تابع نگاشت مرتبسازی برای انجام مرتبسازی بر اساس عضو دوم تاپل تعریف میشود، اما برای ذکر روشهای متنوع مرتبسازی استفاده از این تابع در سؤال ۲ صورت گرفته است. در صورت استفاده از تابع sortBy دیگر نیازی به انجام دو مرحله map قبل و بعد از مرتبسازی نمیباشد. در انتهای عملیات پردازش، یک action برای نمایش ۱۰ رکورد ابتدایی با استفاده از تابع take صورت گرفته است. همچنین با استفاده از action ای برابر collect، مقادیر نهایی محاسبه شده در یک فایل متنی به صورت ستونی نیز چاپ شده است. لازم به ذکر است که استفاده از take و collect در این مرحله هر یک باعث میشود تا یک job مجزا برای پردازش صورت گیرد، که در این سؤال با توجه به حجم کم دادهها از هر دو action استفاده شده است. ولی در یک پردازش با حجم داده بالا، اینکار باعث انجام پردازش duplicate میشود که مطلوب نیست و تنها باید یکبار action صورت گیرد بخش دوم from unidecode import unidecode import string lines = sc.textFile("dbfs:/FileStore/tables/input.txt") def cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) line = line.lower() for word in line.split(): yield word transformation = (lines.flatMap(cleaner) .filter(lambda word: word.startswith('m')) .map(lambda word: (word, 1)) .reduceByKey(lambda a,b: a+b) .map(lambda t: (t[1],t[0])) .sortByKey(ascending=False) .map(lambda t: (t[1],t[0]))) print(transformation.take(10)) print(f'Number of distinct words that starts with <m>: {transformation.count()}') خروجی [('more', 18), ('many', 17), ('me', 14), ('my', 9), ('most', 9), ('make', 6), ('much', 6), ('modern', 5), ('m', 5), ('marjane', 5)] Number of distinct words that starts with <m>: 56 توضیحات: پس از انجام عملیات پاکسازی، یک فیلتر برای جداسازی کلماتی که با حرف m آغاز میشوند تعریف شده و در ادامه با ایجاد تاپلهای همراه با شمارنده، به شمارش این کلمات اقدام شده و با مرتب سازی نزولی، در نهایت به ازای هر کلمه، تعداد آن در خروجی چاپ شده است بخش سوم from unidecode import unidecode import string lines = sc.textFile("dbfs:/FileStore/tables/input.txt") def cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) line = line.lower() for word in line.split(): yield word transformation = (lines.flatMap(cleaner) .filter(lambda word: len(word) == 5) .filter(lambda word: not word.startswith(('a', 'e', 'i', 'o', 'u'))) .map(lambda word: (word, 1)) .reduceByKey(lambda a,b: a+b) .map(lambda t: (t[1],t[0])) .sortByKey(ascending=False) .map(lambda t: (t[1],t[0]))) print(transformation.take(10)) print(f'Number of distinct words that starts with <m>: {transformation.count()}') خروجی [('these', 20), ('which', 19), ('their', 19), ('there', 18), ('games', 16), ('could', 15), ('style', 13), ('where', 12), ('think', 12), ('place', 11)] Number of distinct words that starts with <m>: 180 توضیحات: پس از انجام عملیات پاکسازی، یک فیلتر برای جداسازی کلماتی با طول ۵ تعریف شده و در ادامه یک فیلتر برای جداسازی کلماتی که با حروف غیرصدا دار آغاز میشوند اعمال شده است. پس از آن نیز مجدداً همانند بخشهای پیشین عملیات شمارش صورت گرفته است بخش چهارم from unidecode import unidecode import string lines = sc.textFile("dbfs:/FileStore/tables/input.txt") def cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) line = line.lower() for word in line.split(): yield word, 1 transformation = (lines.flatMap(cleaner) .reduceByKey(lambda a,b: a+b) .map(lambda t: (t[1],t[0])) .sortByKey(ascending=False) .map(lambda t: t[1])) t10p = round(0.1*transformation.count()) print(f'10 percent of the number of distinct words is: {t10p}') stop_words = list(transformation.take(t10p)) print(f'list of stop words are: ') print(stop_words) def stop_words_cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) words = line.split() words = [word for word in words if word.lower() not in stop_words] return ' '.join(words) lines = sc.textFile("dbfs:/FileStore/tables/input.txt") transformation = lines.map(stop_words_cleaner) with open("/dbfs/FileStore/HW3/Sec1/4.txt", "w") as f: for line in transformation.collect(): f.write(f'{line}\n') خروجی ۱۰ percent of the number of distinct words is: 125 list of stop words are: ['the', 'and', 'of', 'to', 'in', 'a', 'is', 'this', 'people', 'that', 'it', 'they', 'i', 'was', 'on', 'have', 'with', 'you', 'them', 'are', 'all', 'were', 'for', 'square', 'as', 'be', 'can', 'up', 'when', 'time', 'what', 'one', …, 'gender']ش توضیحات: در این بخش از دو مرحله Transformation-Action استفاده شده است. مرحله اول اقدام به استخراج کلمات Stop Words کرده و مرحله دوم اقدام به حذف این کلمات از فایل ورودی و پاکسازی فایل به صورت خواسته شده مینماید. در مرحله اول، برای تشخیص کلمات Stop Words ابتدا به پاکسازی فایل به نحوه ذکر شده در مراحل قبلی صورت گرفته میشود. پس از آن کل تعداد کلمات فایل مطابق بخشهای پیشین شمارش میگردد. پس از آن تعداد ۱۰ درصد بیشترین کلمات یکتا در فایل بدست آمده و این تعداد از ابتدای لیست نزولی تعداد کلمات فایل جداسازی میشود (در اینجا ۱۲۵ کلمه برابر ۱۰ درصد کل کلمات یکتا در فایل میباشد). در نهایت این کلمات در یک لیست به عنوان کلمات Stop Words ذخیره میگردد. در مرحله دوم، همانند بخشهای قبلی عملیات پاکسازی صورت گرفته و در همان تابع cleaner از map اولیه، اقدام به حذف لیست کلمات Stop Words که از مرحله پیشین بدست آمده است صورت میگیرد. پس از این هیچ Transformation دیگری روی فایل صورت نگرفته و در ادامه با یک Action برابر collect، خطوط فایل پاکسازی شده در یک فایل خروجی نوشته میشود. بخش پنجم from unidecode import unidecode import string lines = sc.textFile("dbfs:/FileStore/tables/input.txt") def cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) line = line.lower() for word in line.split(): yield word, 1 transformation = (lines.flatMap(cleaner) .reduceByKey(lambda a,b: a+b) .map(lambda t: (t[1],t[0])) .sortByKey(ascending=False) .map(lambda t: t[1])) t10p = round(0.1*transformation.count()) print(f'10 percent of the number of distinct words is: {t10p}') stop_words = list(transformation.take(t10p)) print(f'list of stop words are: ') print(stop_words) def stop_words_cleaner(line): line = unidecode(line) line = line.strip() punctuations = string.punctuation line = line.translate(line.maketrans(punctuations, ' '*len(punctuations))) words = line.split() words = [word for word in words if word.lower() not in stop_words] return ' '.join(words) lines = sc.textFile("dbfs:/FileStore/tables/input.txt") transformation = lines.map(stop_words_cleaner) with open("/dbfs/FileStore/HW3/Sec1/4.txt", "w") as f: for line in transformation.collect(): f.write(f'{line}\n') خروجی: Out[46]: [(('of', 'the'), 56), (('in', 'the'), 53), (('and', 'the'), 15), (('this', 'is'), 15), (('all', 'of'), 13), (('it', 'is'), 13), (('on', 'the'), 12), (('to', 'the'), 11), (('soho', 'square'), 11), (('of', 'this'), 10), (('people', 'to'), 10), (('of', 'a'), 10), (('to', 'get'), 9), (('with', 'the'), 9),… توضیحات: در این بخش پس از انجام عملیات پاکسازی روی هر خط، کلمات آن جداسازی میگردند. سپس به ازای هر خط، تاپلهایی شامل کلمات متوالی (تاپل اول شامل کلمه اول و دوم، تاپل دوم شامل کلمه دوم و سوم، …) بازگردانده میشوند. این تابع توسط Transformation ای برابر flatMap باز گردانده میشود، چرا که در این مرحله نیازی به حفظ ساختار خطوط نداریم. در ادامه با شمارش تعداد هر تاپل، یک فیلتر برای جداسازی تاپلهایی که تعداد تکراری بیشتر از ۱ دارند را تعبیه میکنیم. در نهایت نیز عملیات مرتبسازی را انجام میدهیم. بررسی یک فایل لاگ وب سرور بخش اول import logging import re import sys regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() return groups[0] transformation = (lines.map(parser) .filter(lambda record: record is not None) .distinct()) print(transformation.count()) خروجی: ۸۱۹۶۹ توضیحات: در این بخش ابتدا یک Regular Expression برای جداسازی بخشهای مختلف این فایل طراحی شده است. با بررسی بیشتر فایل به نظر میآید که خطوطی از فایل دارای مشکل و خطا بوده و خط لاگ به صورت استاندارد درج نشده است. کار طراحی این Regex بر اساس بیشترین مطابقت با خطوط انجام شده و در نهایت حدود ۸۰۰ خط از فایل لاگ قابلیت Parse شدن را به علت خرابی داده نداشتهاند. به همین منظور، تابع Parser در صورتی که یک خط از فایل با Regex طراحی شده مطابقت نداشت، به ازای آن خط مقدار None را باز میگرداند و در غیر اینصورت، دیتای Parse شده را باز خواهد گرداند. در مرحله بعد لازم است تا با استفاده از یک filter، مقادیر None را از map مرحله قبل حذف کنیم و سپس دادهها را به مراحل بعد ارجاع دهیم. لازم به ذکر است که این الگو در تمامی بخشهای این سؤال تکرار شده است که توضیح آن در اینجا داده شد. در این بخش، تابع Parser اقدام به بازگرداندن مقدار Host از هر خط کرده، در مرحله بعد دادههای مجاز فیلتر شده و در انتها تابع Distinct برای حذف هاستهای تکراری اعمال شده است. پس از آن با اجرای Action ای برابر count، اقدام به شمارش هاستهای یکتای فایل صورت گرفته است بخش دوم import logging import re import sys from datetime import datetime regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' import logging import re import sys from datetime import datetime regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() try: datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z") except: return None return ((groups[0],str(datetimeobject.date())), 1) transformation = (lines.map(parser) .filter(lambda record: record is not None) .reduceByKey(lambda a,b: a+b) .map(lambda ip_date_count: (ip_date_count[0][0], (ip_date_count[1], 1))) .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) .mapValues(lambda sum_count: round(sum_count[0]/sum_count[1], 2)) .sortBy(lambda t: t[1],ascending=False)) print(transformation.take(10)) خروجی [('rush.internic.net', 1119.0), ('siltb10.orl.mmc.com', 841.44), ('piweba3y.prodigy.com', 627.57), ('marina.cea.berkeley.edu', 599.67), ('alyssa.prodigy.com', 523.47), ('nidhogg.srl.caltech.edu', 506.5), ('piweba4y.prodigy.com', 482.96), ('bill.ksc.nasa.gov', 445.14), ('spidey.cor.epa.gov', 401.0), ('hoohoo.ncsa.uiuc.edu', 367.0)] توضیحات: در این بخش عملیات Parse به صورتی که در بخش قبل گفته شد صورت میگیرد. تابع Parse دادههای را به صورت یک تاپل با دو عضو باز میگرداند. عضو اول این تاپل، یک تاپل به صورت هاست-تاریخ بوده و عضو دوم آن شمارشگر برابر ۱ میباشد. در مرحله دوم Transformation، عملیات حذف دادههای غیر مجاز صورت گرفته، در مرحله سوم به ازای کلیدها (هاست-تاریخ) کار شمارش انجام میگردد. خروجی این مرحله نشاندهنده تعداد دسترسی به هر هاست در هر روز میباشد. در مرحله سوم، لازم است تا عملیات میانگینگیری روی دسترسیهای هر هاست به ازای روزهای مختلف انجام گیرد. کار میانگیری در الگوی MapReduce بایستی به صورتی صورت گیرد که در هنگام انجام عملیات Reduce، همزمان هم مقدار جمع اعداد و هم مقدار تعداد اعداد تا هر لحظه نگهداری شوند. بدین منظور لازم است تا دو مقدار به صورت یک تاپل، یکی مقدار جمع تا این لحظه و یکی تعداد تا این لحظه نگهداری گرند. بر این اساس، در مرحله سوم تاپلها به فرمت جدید map میگردند. در این فرمت یک تاپل خواهیم داشت که کلید آن برابر هاست و مقدار آن برابر یک تاپل که عضو اول آن تعداد مشاهده آن هاست در هر روز و عضو دوم آن شمارنده ۱ میباشد. در مرحله چهارم اقدام به میانگینگیری روی مقادیر خواهیم کرد. بر این اساس به ازای هر کلید، مقدار برابر یک تاپل خواهد شد که عنصر اول آن برابر مجموع کل بازدیدهای هر روز آن هاست و عنصر دوم برابر تعداد روزهای مشاهده آن هاست میباشد. در مرحله پنجم با یک عملیات map اقدام به تقسیم عنصر اول هر تاپل به عنصر دوم آن به ازای هر کلید میکنیم و در نتیجه، خروجی این مرحله یک تاپل با کلید هاست و مقدار میانگین بازدید روزانه آن هاست میباشد. در محله آخر نیز خروجی را مرتب کرده و در انتها با Action ای برابر take، ۱۰ هاست با بیشترین بازدید روزانه را نمایش میدهیم. بخش سوم: logging import re import sys from datetime import datetime regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() return groups[5] transformation = (lines.map(parser) .filter(lambda record: record is not None) .filter(lambda url: url.endswith(".gif")) .distinct()) print(transformation.take(10)) print(f'Number of distinct gif files: {transformation.count()}') خروجی: ['/shuttle/countdown/count.gif', '/images/KSC-logosmall.gif', '/images/ksclogo-medium.gif', '/software/winvn/bluemarb.gif', '/software/winvn/wvsmall.gif', '/history/apollo/images/footprint-small.gif', '/shuttle/resources/orbiters/endeavour.gif', '/shuttle/missions/sts-78/sts-78-patch-small.gif', '/shuttle/resources/orbiters/columbia-logo.gif', '/images/ksclogosmall.gif'] Number of distinct gif files: 1194 توضیحات: در این بخش پس از انجام عملیات Parse، با بازگرداندن مقدار URL خط لاگ مربوطه توسط یک فیلتر مقادیری که به پسوند .gif ختم میشوند را جداسازی کرده و در نهایت به صورت یکتا در میآوریم. در انتها نیز تعدادی از آنها را به صورت نمونه چاپ کرده و تعداد کل آنها را شمارش میکنیم بخش چهارم import logging import re import sys from datetime import datetime import ipaddress regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() return (groups[0], 1) def is_ip_address(host): try: ipaddress.ip_address(host) except: return False return True transformation = (lines.map(parser) .filter(lambda record: record is not None) .filter(lambda host: not is_ip_address(host[0])) .reduceByKey(lambda a,b: a+b) .filter(lambda host_count: host_count[1] > 3) .sortBy(lambda t: t[1],ascending=False) ) print(f'Top 10 all time visited domains:') print(transformation.take(10)) ############# lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() try: datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z") except: return None return ((groups[0],str(datetimeobject.date())), 1) transformation = (lines.map(parser) .filter(lambda record: record is not None) .filter(lambda host: not is_ip_address(host[0][0])) .reduceByKey(lambda a,b: a+b) .map(lambda host_date_count: (host_date_count[0][0], host_date_count[1])) .sortBy(lambda t: t[1],ascending=False) #If we want to retrieve only top record, we can use the below action #.reduce(lambda a,b: a if a[1] > b[1] else b) ) print(f'Top 10 per day visited domains:') print(transformation.take(10)) ############ lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() try: datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z") except: return None return ((groups[0],str(datetimeobject.date())), 1) transformation = (lines.map(parser) .filter(lambda record: record is not None) .filter(lambda host_date_count: not is_ip_address(host_date_count[0][0])) .reduceByKey(lambda a,b: a+b) .filter(lambda host_date_count: host_date_count[1] > 3) .map(lambda host_date_count: (host_date_count[0][1], (host_date_count[0][0], host_date_count[1]))) .reduceByKey(lambda a,b: a if a[1] > b[1] else b) .sortByKey() ) print(f'Most visited domain per day:') transformation.take(10) خروجی: Top 10 all time visited domains: [(('bill.ksc.nasa.gov', '1995-07-11'), 1394), (('indy.gradient.com', '1995-07-12'), 1356), (('siltb10.orl.mmc.com', '1995-07-21'), 1354), (('bill.ksc.nasa.gov', '1995-07-12'), 1317), (('piweba3y.prodigy.com', '1995-07-16'), 1280), (('piweba4y.prodigy.com', '1995-07-16'), 1269), (('piweba3y.prodigy.com', '1995-07-13'), 1202), …, ('۱۹۹۵-۰۷-۱۰', ('e659229.boeing.com', 358))] توضیحات: در این بخش سه عملیات مجزا را انجام میدهیم. قسمت اول پر درخواستترین دامنهها را مییابد، قسمت دوم دامنههایی را مییابد که بیشترین تعداد درخواست در یک روز داشتهاند و قسمت سوم به ازای هر روز، پر درخواستترین دامنه را بدست میدهد. در قسمت اول، پس از Parse کردن فایل لاگ، یک فیلتر برای تشخیص دامنه بودن هاست مورد نظر اعمال میگردد. این کار توسط یک تابع انجام میگیرد که تشخیص میدهد این هاست با یک آدرس IP مطابقت دارد یا خیر. در مرحله بعدی از Transformation، تعداد درخواستهای هر دامنه شمارش میشود (دقت کنید که در این مرحله چون هر رکورد از لاگ به معنی یک درخواست میباشد، نیازی به دخالت دادن تاریخ درخواست در این شمارش نیست)، و سپس با اعمال یک فیلتر، دامنههایی که تعداد بازدید بیشتر از ۳ دارند فیلتر میگردد. در نهایت نیز با تابع sortByValue دامنهها بر اساس تعداد بازدید به صورت نزولی مرتب میشوند. در قسمت دوم، همانند بخش قبلی عمل میکنیم با این تفاوت که کلید تاپلهای شمارش به جای نام دامنه، یک تاپل شامل نام دامنه و تاریخ بازدید است. به این صورت میتواند مجموع تعداد بازدید هر دامنه را به ازای هر روز بدست آورد. در ادامه با حذف تاریخ از این تاپلها، عملیات مرتبسازی صورت گرفته و دامنههایی با بیشترین بازدید در یک روز به صورت مرتب بدست میآیند. در قسمت سوم، همانند قسمت دوم، تاپلهایی شامل نام دامنه و تاریخ را به عنوان کلید در نظر میگیریم. در ادامه عملیات شمارش را به ازای هر دامنه و هر روز انجام داده، دامنههایی که تعداد بازدید زیر ۳ دارند را حذف و سپس با یک تبدیل، مقدار تاریخ را به عنوان کلید و نام دامنه و تعداد بازدید روزانه را به عنوان مقدار در نظر میگیریم. در این حالت، با انجام یک مرحله عملیات Reduce روی تاریخ به ازای هر روز، تابع Reduce را به صورت ماکسیممگیری تعریف میکنیم. پس از این مرحله، به ازای هر تاریخ، دامنهای که بیشترین بازدید را بین دامنههای آن تاریخ داشته است بدست خواهد آمد. بخش پنجم: import logging import re import sys from datetime import datetime import ipaddress import matplotlib.pyplot as plt regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$' lines = sc.textFile("dbfs:/FileStore/tables/Log") def parser(line): line = line.strip() regmatch = re.match(regex, line) if regmatch is None: return None groups = regmatch.groups() return (groups[7], 1) transformation = (lines.map(parser) .filter(lambda record: record is not None) .filter(lambda code: code[0] != '200' and code[0] != '-') .reduceByKey(lambda a,b: a+b) .sortBy(lambda t: t[1],ascending=False) ) errors = transformation.collect() codes = [t[0] for t in errors] counts = [t[1] for t in errors] x = codes y = counts plt.barh(x, y, color=(0.1, 0.1, 0.1, 0.1), edgecolor='blue') axes = plt.gca() axes.set_xlim([0,160000]) for index, value in enumerate(y): plt.text(value+2000, index-0.12, str(value)) plt.ylabel("Error Codes") plt.xlabel("Counts") plt.show() خروجی: توضیحات: در این بخش پس از انجام عملیات Parse و خروجی گرفتن تاپل کد-شمارنده، با یک فیلتر تمامی کدهای غیر از «۲۰۰» و «-» را فیلتر میکنیم. در ادامه به ازای هر کد، تعداد تکرار را بدست آورده و در نهایت به صورت نزولی مرتب میکنیم. پس از آن با استفاده از کتابخانه matplotlib یک نمودار میلهای برای این توزیع رسم میکنیم. چه رتبه ای میدهید؟ میانگین ۰ / ۵. از مجموع ۰ اولین نفر باش برچسب ها # spark graphx# spark sql# آموزش مهندسی داده# اسپارک# بارگذاری دیتا و نحوه آدرسدهی در DBFS# کار با دیتافریم ها# مهندسی داده معرفی نویسنده مقالات 5 مقاله توسط این نویسنده محصولات 3 دوره توسط این نویسنده مجتبی بنائی دانشجوی دکترای نرمافزار دانشگاه تهران، مدرس دانشگاه و فعال در حوزه مهندسی نرمافزار و علم داده که تمرکز کاری خود را در چند سال اخیر بر روی مطالعه و تحقیق در حوزه کلانداده و تولید محتوای تخصصی و کاربردی به زبان فارسی و انتشار آنها در سایت مهندسی داده گذاشته است. مدیریت پروژههای نرمافزاری و طراحی سامانههای مقیاسپذیر اطلاعاتی از دیگر فعالیتهای صورت گرفته ایشان در چند سال گذشته است.مهندس بنائی همچنین: مدرس دورههای BigDataی نیک آموز، دانشجوی دکترای نرم افزار و مدرس دانشگاه تهران، مجری و مشاور پروژههای کلانداده در سطح ملی و بین المللی، فعال در حوزه تولید محتوای تخصصی در زمینه پردازش داده می باشد. پروفایل نویسنده معرفی محصول مجتبی بنائی دوره آموزش مهندسی داده [Data Engineering] 1.990.000 تومان مقالات مرتبط ۱۶ بهمن مهندسی داده آموزش Apache Superset تیم فنی نیک آموز ۱۳ بهمن مهندسی داده چگونه تستها را میتوان به خطوط پردازش داده اضافه کرد؟ تیم فنی نیک آموز ۰۴ بهمن مهندسی داده هوش تجاری طراحی یک سیستم ELTبا استفاده از Stitch و dbt تیم فنی نیک آموز ۲۷ دی مهندسی داده بررسی آپاچی Airflow معایب و مزایای آن تیم فنی نیک آموز دیدگاه کاربران لغو پاسخ دیدگاه نام و نام خانوادگی ایمیل ذخیره نام، ایمیل و وبسایت من در مرورگر برای زمانی که دوباره دیدگاهی مینویسم. موبایل برای اطلاع از پاسخ لطفاً مرا با خبر کن ثبت دیدگاه Δ