بررسی عملی سه روش پردازش داده‌های آفلاین‌ در اسپارک [بخش اول]

بررسی عملی سه روش پردازش داده‌های آفلاین‌ در اسپارک [بخش اول]

نوشته شده توسط: مجتبی بنائی
تاریخ انتشار: ۱۰ آبان ۱۳۹۹
آخرین بروزرسانی: ۲۱ بهمن ۱۴۰۰
زمان مطالعه: 18 دقیقه
۵
(۱)

مقدمه

یکی از ابزارهای پرکاربرد و رایج در پردازش داده‌های کلان، اسپارک است. این چارچوب پردازش داده محبوب، مشابه با هدوپ، پردازش‌ها را در سطح شبکه توزیع میکند و پس از جمع آوری نتایج میانی، آنها را به کاربر نمایش می‌دهد اما بر خلاف هدوپ، استفاده موثرتری از حافظه انجام میدهد که باعث افزایش سرعتی گاها تا صدبرابر هدوپ می شود و نحوه کار و برنامه نویسی با آن، بسیار راحت‌تر و محدود به سبک کلید/مقداری هدوپ نیست (کتابخانه نگاشت/کاهش یا Map/Reduce).
به عنوان یک مهندس داده،‌ خیلی از اوقات نیاز دارید روی داده‌های موجود پردازشی انجام بدهید و خروجی را در یک دیتابیس یا فضای ذخیره سازی فایل،‌ ذخیره کنید. در این سری مقالات، قصد داریم مروری سریع بر سه پارادایم پردازش داده‌های معمولی (داده‌های ذخیره شده و جدول‌مانند) در اسپارک با استفاده از چند مثال ساده اما کاربردی داشته باشیم.

  •  پردازش به کمک RDD‌ های پایه اسپارک .
  •  پردازش با دیتافریم‌ها (مشابه با دیتافریم‌های پانداز در پایتون)
  • پردازش با Spark SQL
  • و در انتها هم یک مثال پردازش گراف را با هم بررسی خواهیم کرد.

برای این منظور، ابتدا مجموعه سوالاتی را بر روی چند دیتاست تعریف نموده،‌ سپس به کمک روش‌های سه گانه فوق، به آنها پاسخ خواهیم داد.
برای انجام این کارگاه عملی، بهتر است با معماری و مولفه‌های اصلی اسپارک آشنا باشید. معماری و دستورات پایه اسپارک را می‌توانید در این آدرس فرابگیرید. برای کار با اسپارک هم از محیط آنلاین Databricks‌ استفاده خواهد شد و یک ارتباط خوب اینترنتی می‌تواند تنها نیازمندی این کارگاه باشد.
فایل‌های مورد نیاز برا ی بخش‌های مختلف این مقاله آموزشی در انتهای مقاله قرار داده شده است.

سوالات و خواسته‌ها

سوالات با دید تمرین و کار عملی طراحی شده است. بنابراین ابتدا سوالات را به دقت بخوانید و اگر قبلا با اسپارک کار کرده‌اید، سعی کنید آنها را خودتان انجام دهید. در هر صورت، می توانید به بخش کارگاه عملی در انتهای سوالات مراجعه و راه حل‌های پیشنهادی را مشاهده کرده و به صورت عملی، آنها را انجام دهید.

دستورات پایه

در این بخش قصد داریم عملیات نگاشت و کاهش که سبک پیش فرض پردازش در هدوپ است را شبیه سازی کنیم. در این حالت، باید ورودی‌ها را به شکل زوج مرتب کلید مقدار (k,v) در آوریم و خروجی‌ها را هم به همین شکل تولید کنیم. همانطور که خواهید داد، علیرغم امکانات متعدد اسپارک، اگر بخواهیم به این سبک وفادار بمانیم، سرعت توسعه و تولید کدها کاهش خواهد یافت چون برای هر مساله، علاوه بر تمرکز بر خود ورودی و خروجی و الگوریتم پردازش، باید روی تبدیل آنها به این قالب هم وقت بگذاریم.

بخش اول

در این قسمت با استفاده از مدل نگاشت-کاهش(Map/Reduce) تعداد لغات فایل input.txt (نسخه متنی کتاب جنگ و صلح تولستوی) را شمارش کرده و نمایش دهید. همچنین گزارش کنید که هر کلمه چند بار تکرار شده است و خروجی را در یک فایل .txt ذخیره کنید.
در این گام تنها علائم نقطه گذاری (علامت تعجب، سوال، نقطه و …) را حذف کنید و پیش‌پردازش دیگری لازم نیست.

بخش دوم

حال در این گام با استفاده از مدل (Map/Reduce) تعداد تمامی کلماتی که با حرف (M) آغاز می‌شوند را بیابید. (مستقل از کوچک و بزرگ بودن M)

بخش سوم

در این بخش نیز همانند دو گام قبلی، تعداد لغات ۵ حرفی موجود درفایل input.txt را یافته، لغاتی که با حروف صدادار شروع می‌شوند را از خروجی حذف کنید و نتیجه نهایی را به صورت مرتب نمایش دهید.

بخش چهارم

به کمک مراحل قبلی، ایست واژه‌ها(stop words) را بیابید. کلمه‌ای را ایست واژه درنظر بگیرید که جزء ده درصد کلمات پرتکرار این فایل قرار بگیرد. سپس تابعی بنویسید که یک خط را گرفته، تمام حروف غیر الفبایی و ایست ‌واژه‌های آنرا حذف کند. این تابع را روی تمام خطوط اعمال کرده، نتیجه را در یک فایل، ذخیره کنید.

بخش پنجم

تعداد دو کلمه‌ای‌هایی که بیشتر از یک بار در فایل اصلی (input.txt) کنار هم آمده‌اند را به ترتیب فرکانس، یافته و نمایش دهید. منظور از دوکلمه‌ای(bigram)، دو لغتی هستند که پشت سر هم به کار رفته‌اند

بررسی یک فایل لاگ وب سرور

فایل لاگ پیوست این تمرین با نام” Log “که مربوط به در خواست‌های HTTP است. بااستفاده از این فایل به سوال زیر پاسخ دهید (برای این بخش از دستورات پایه اسپارک -کار با RDD‌ – استفاده کنید)

بخش اول

چند Host یکتا در این لاگ فایل وجود دارد؟

بخش دوم

متوسط تعداد درخواست‌های روزانه برای هر میزبان منحصر به فرد (آی‌پی یا نام دامنه) چقدر است؟ ابتدا متوسط تعداد درخواست‌های هر دامنه در هر روز را به دست آورید و سپس، متوسط نهایی را برای هر دامنه یا آی‌پی، تعیین کنید.

بخش سوم

تعداد فایل‌های گیف درخواست شده در این فایل لاگ چقدر است؟

بخش چهارم

دامنه‌های پرتقاضا (بیش از ۳ بار ) را یافته، آنها را به صورت مرتب شده نمایش دهید. آی پی‌ها را جزء این دامنه‌ها در نظر نگیرید. سپس دامنه پرتقاضا به ازای هر روز را پیدا کنید (دامنه‌ای با بیشترین تعداد درخواست در یک روز).

بخش پنجم

خطاهای HTTP‌ (غیر ازکد ۲۰۰، بقیه را همه خطا در نظر بگیرید.) را یافته، تعداد تکرار آنها در یک نمودار ستونی نمایش دهید.

کار با دیتافریم‌ها / Spark SQL

با توجه به دیتاست stock.csv به سوالات زیر پاسخ دهید. این دیتاست، داده‌های بورس مربوط به یکی از کمپانی‌های بزرگ از سال ۲۰۱۲ تا ۲۰۱۷ می‌باشد.
برای انجام این تمرین از دو روش استفاده کنید یعنی برای هر بخش، خروجی مورد نظر را با هر کدام از دو روش زیر به صورت جداگانه به دست آورید:

  •  Spark DataFrames- با توابع دیتافریم‌ ( DataFrame Operations such as min,avg,…)
  • Spark SQL – با دستورات SQL (spark.sql)

بخش اول:

یک ستون اطلاعات جدید با ستونی به نام HV ایجاد کنید که این نسبت بالاترین قیمت بر حجم سهام معامله شده برای یک روز است.

بخش دوم:

پیک بالاترین قیمت، برای چه روزی بوده است؟

بخش سوم:

میانگین ستون، Close، چه مقدار است؟

بخش چهارم:

مقدار ماکزیمم و مینیمم ستون Volum را مشخص کنید.

بخش پنجم:

چند روز ستون Close کمتر از ۶۰ دلار بوده است؟

بخش ششم:

Pearson correlation بین ستون های High و Volum چقدر است؟

بخش هفتم:

ماکزیمم ستون High در هر سال چقدر است؟

Spark GraphX

فایل پیوست edgs.txt یال‌ها و فایل پیوست vertex.txt درجه‌های یک گراف هستند. گراف مورد نظر ما از مقالات ویکی پدیا استخراج شده‌اند. هر گره یک مقاله ویکی پدیا و یال از مقاله A به مقاله B نشان دهنده این است که مقاله A به مقاله B ارجاع داده است.
نکته: می‌توانید برای کار با گراف در اسپارک از کتابخانه GraphFrames استفاده کنید.

بخش اول:

با استفاده از فایل یال‌ها و گره‌ها، این گراف را ایجاد کنید.

بخش دوم:

بیشترین درجه ورودی در این گراف چقدر است؟ بیشترین درجه خروجی (مقاله‌ای که احتمالا Survey‌ بوده و شامل لینک زیادی به سایر مقالات است.) چند است؟‌

بخش سوم:

سایز هرکدام از ConnectedComponentها چقدر است؟

بخش چهارم:

ده تا از مقالات برتر را بیابید (مقالاتی که بیشترین درجه ورودی را داشته‌اند).

راه اندازی محیط کار

در این بخش به چگونگی راه اندازی و استفاده از بستر Databricks Cloud می‌پردازیم.

  •  ثبت نام در سرویس ابری Databricks

ابتدا لازم است تا عملیات ثبت‌نام درون این سایت انجام شود. این سایت یک پلن رایگان با نام Community ارائه می‌کند که امکان ساخت یک کلاستر با ۱۵ گیگابایت حافظه موقت و دو هسته پردازنده را بدست می‌دهد.

  •  ایجاد کلاستر پردازشی

پس از ورود به پنل کاربری، ابتدا لازم است تا یک کلاستر پردازشی ساخته شود. برای اینکار از منوی سمت چپ و بخش Clusters اقدام می‌کنیم. همانند تصویر زیر، لازم است تا یک نام برای کلاستر انتخاب کرده و تنظیمات مربوط به نسخه ابزارهای نصب شده روی کلاستر، زون قرارگیری کلاستر بر روی سرورهای سرویس‌دهنده و همچنین تنظیمات مربوط به کانفیگ اسپارک را مشخص کنیم. در این مرحله موارد پیش فرض انتخاب شده را تغییر نمی‌دهیم (نسخه اسپارک ۲.۴.۵ بدون GPU).لازم به ذکر است که در پلن رایگان Community Edition، تنها امکان ایجاد یک کلاستر برای کاربر وجود داشته و همچنین در صورت عدم استفاده از کلاستر ایجاد شده به مدت ۲ ساعت، کلاستر به طور خودکار به حالت Terminated رفته و پس از آن امکان Resume برای آن وجود نخواهد داشت. در این حالت می‌بایست کلاستر را از لیست کلاسترهای ایجاد شده حذف کرده و مجدداً یک کلاستر جدید ایجاد کرد.
پس از ایجاد یک کلاستر، پیکربندی و راه اندازی آن به مدت زمانی حدود پنج دقیقه زمان احتیاج دارد که در طول این مدت، کلاستر در حالت Pending بوده و امکان استفاده از آن مهیا نمی‌باشد. پس از اتمام عملیات راه اندازی، وضعیت کلاستر به حالت Ready در خواهد آمد.

  •  نصب کتابخانه‌ها

بدیهی است که تمامی کتابخانه‌های مورد نیاز به صورت built-in در ابزارهایی نظیر پایتون و اسپارک که بر روی کلاستر به صورت خودکار نصب شده‌اند موجود نخواهد بود. برای اینکار لازم است تا کتابخانه‌های مورد نیاز به صورت دستی نصب گردند.

  • نصب کتابخانه‌های PyPI پایتون

در طی انجام این تمرین به کتابخانه‌های unidecode (سؤال اول) و matplotlib (سؤال دوم) و networkx (سؤال چهارم) احتیاج خواهیم داشت. برای نصب این کتابخانه‌ها لازم است تا از منوی سمت راست، گزینه Workspace انتخاب شده و از زیر منوی Shared با کلیک بر فلش جهت‌دار، از منوی مربوطه گزینه Create و سپس Library انتخاب گردد:در صفحه باز شده با انتخاب گزینه PyPI می‌توان اسم کتابخانه مورد نظر را وارد کرده و سپس با فشردن دکمه Create داده‌های مربوط به این کتابخانه به Workspace کاربری منتقل شده و در صفحه بعد در صورتی که کلاستر ایجاد شده در حالت Ready باشد، به صورت خودکار روی آن نصب می‌گردد.لازم به ذکر است که در صورتی که کلاستر در حالت Ready نبوده و یا کلاستر جدیدی پس از تولید کتابخانه در Workspace ایجاد شده باشد، لازم است تا از منوی Workspace و بخش Shared و انتخاب کتابخانه مورد نظر، طبق تصویر زیر با انتخاب کلاستر مربوطه و دکمه Install، کتابخانه پایتون را بر روی آن نصب کرد:

  •  نصب کتابخانه‌های جاوا (بسته‌های اسپارک)

در این تمرین لازم است تا کتابخانه Graphframes (سؤال چهارم) بر روی کلاستر پردازشی نصب گردد. برای اینکار ابتدا لازم است تا از این سایت (Spark Packages) نسخه مناسب برای نسخه اسپارک موجود بر روی کلاستر پردازشی بارگذاری گردد.
در اینجا فایل jar کتابخانه Graphframes با نسخه ۰.۸.۰-spark2.4-s_2.11 مورد نظر می‌باشد.
پس از بارگذاری فایل jar مربوطه، لازم است تا از منوی سمت راست پنل Databricks، انتخاب گزینه Workspace انتخاب شده و از زیر منوی Shared با کلیک بر فلش جهت‌دار، از منوی مربوطه گزینه Create و سپس Library انتخاب گردد. در صفحه باز شده نیز با انتخاب بخش Upload و سپس Jar، فایل مربوط به کتابخانه را بارگذاری کرده و با فشردن دکمه Create، کتابخانه مورد نظر به صورت خودکار بر روی کلاستر مربوطه (پس از انتقال به Workspace) نصب می‌گردد.
همانند بخش قبل لازم است تا در صورت ایجاد کلاستر جدید پس از ایجاد کتابخانه در Workspace، از طریق صفحه مربوط به کتابخانه موجود در Workspace اقدام به نصب آن روی کلاستر مربوطه کرد.

بارگذاری دیتا و نحوه آدرس‌دهی در DBFS

سرویس Databricks برای راحتی کار با داده‌ها برای کاربران، بستر فایل سرویس خود را از بستر کلاسترهای پردازشی به صورت جداگانه و با نام DBFS ارائه می‌دهد. این بستر به صورت مستقل از کلاستر پردازشی کار کرده و در صورت انقضای یک کلاستر، داده‌های موجود در DBFS کماکان برای هر کاربر در دسترس می‌باشد.
بر این اساس، در صورت حذف یک کلاستر، کماکان می‌توان به فایل‌های بارگذاری شده در این بستر دسترسی داشت. فایل‌هایی که به طور معمول در این بستر ذخیره می‌شوند عبارتند از:

  •  فایل داده‌های خام و ورودی (نظیر CSV، TXT و …)
  • فایل خروجی برنامه‌ها (متن، نمودار، …)
  • فایل‌های Temporary
  • کتابخانه‌های مختلف مورد استفاده در برنامه‌ها
  • لاگ‌فایل‌های اجرای برنامه‌ها

لازم به ذکر است که آدرس‌دهی فایل‌سیستم DBFS می‌بایست به صورت خاص در برنامه‌های اجرایی صورت گیرد، در غیر اینصورت فایل‌ها به صورت Local از کلاستر پردازشی آدرس‌دهی خواهند شد و این باعث می‌شود که دسترسی به این فایل‌ها دشوار و در صورتی که کلاستر منقضی شود، این فایل‌ها از بین بروند.

  •  بارگذاری دیتاست‌ها

برای بارگذاری دیتاست‌ها در فایل‌سیستم Databricks می‌بایست از منوی Data دکمه Add Data را فشرد. در صفحه باز شده از بخش Upload File با کلیک در باکس File و انتخاب فایل دیتاست از کامپیوتر شخصی خود، فایل مورد نظر در فایل‌سیستم DBFS بارگذاری خواهد شد. پس از بارگذاری موفقیت آمیز، مسیر فایل باگذاری شده در پایین بخش مربوطه نمایش داده می‌شودلازم به ذکر است که تمامی دیتاهای بارگذاری شده به این نحو، در زیر مسیر /FileStore/tables قرار خواهند گرفت. می‌توان در این زیر مسیر از بخش Upload to DBFS دایرکتوری‌های زیرینی نیز برای مسیر این فایل تعیین کرد.

  •  دسترسی و مشاهده فایل‌ها

فایل‌های موجود در DBFS کاربری از طریق بخش DBFS از صفحه Create New Table (که در بخش قبلی ذکر شد) قابل مشاهده خواهد بود:در این صفحه می‌توان لیست فایل‌های موجود در DBFS را مشاهده کرده و همچنین به ازای هر فایل، مسیر مربوط به آن فایل در DBFS را مشاهده کرد. اما متأسفانه امکان دسترسی به محتویات فایل‌ها ازین بخش ممکن نیست.
برای دسترسی به محتویات فایل‌ها می‌توان از URL تأمین شده توسط Databricks استفاده کرد. به طور مثال برای دسترسی به یک فایل با مسیر /FileStore/HW3/Sec1/1.txt می‌بایست از طریق آدرس زیر اقدام کرد:

???????????=https://community.cloud.databricks.com/files/HW3/Sec1/1.txt?o

که در آن، مقدار پارامتر o برابر همان مقدار موجود در URL دسترسی به پنل Databricks می‌باشد.
لازم به ذکر است که دسترسی به فایل‌ها از این طریق تنها برای فایل‌هایی ممکن است که در زیر مسیر FileStore ذخیره باشند.[/vc_column_text][vc_empty_space height=”20px”][/vc_column][/vc_row][vc_row][vc_column][vc_column_text]

  •  مدیریت و آدرس‌دهی فایل‌ها از طریق برنامه

دسترسی به فایل‌های موجود در DBFS و یا نوشتن یک فایل بر روی DBFS با استفاده از برنامه پایتون (نوت‌بوک) می‌بایست با آدرس‌دهی‌های مخصوصی انجام گردد. در غیر اینصورت مسیرها به صورت Local در کلاستر پردازشی در نظر گرفته خواهند شد.
بر این اساس، و بر اساس نیازهای این تمرین، از دو نوع دسترسی مختلف که Databricks تأمین کرده است استفاده می‌کنیم. لازم به ذکر است که به صورت مشروح دسترسی‌های مختلف در این لینک توضیح داده شده اند.
اولین نوع دسترسی از طریق دستورات مربوط به اسپارک صورت می‌گیرد. برای دسترسی به فایل‌های موجود بر روی DBFS از طریق توابع اسپارک، نظیر sc.textFile می‌توان از این نوع آدرس‌دهی استفاده کرد. برای یک فایل با آدرس /FileStore/tables/input.txt می‌بایست از آدرس زیر استفاده کرد:

dbfs:/FileStore/tables/input.txt

این نوع از آدرس‌دهی را می‌توان در تمامی توابع اسپارک استفاده نمود. لازم به ذکر است که این نوع از آدرس دهی در توابع IO پایتون معتبر نمی‌باشد.
برای آدرس‌دهی فایل‌ها در توابع IO پایتون، به طور مثال، برای نوشتن محتویات درون یک فایل با آدرس /FileStore/HW3/Sec1/1.txt می‌بایست از آدرس‌دهی زیر استفاده کرد:

/dbfs/FileStore/HW3/Sec1/1.txt

لازم به ذکر است که این نوع آدرس‌دهی در توابع مربوط به اسپارک معتبر نمی‌باشد. زیر مسیر dbfs یک دایرکتوری mount شده توسط Databricks در کلاسترهای پردازشی بوده که امکان دسترسی مستقیم به DBFS را فراهم می‌کند.

ایجاد نوت‌بوک

برای ایجاد نوت‌بوک در پنل Databricks لازم است تا از منوی Workspace زیر منوی Users و زیر منوی «نام کاربری» و فشردن فلش جهت‌دار، گزینه Create از منوی ظاهر شده و انتخاب گزینه Notebook اقدام کرد.
در صفحه جدید باز شده، می‌توان با نام‌ گذاری نوت‌بوک و انتخاب زبان آن، یک کلاستر فعال را برای آن انتخاب کرد. پس از آن صفحه نوت‌بوک ایجاد شده نمایش داده خواهد شد. برای دسترسی آتی به این نوت‌بوک می‌توان از همان مسیر یاد شده اقدام کرد.

سؤالات پایه

بخش اول

from unidecode import unidecode
import string
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
def cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  line = line.lower()
  for word in line.split():
    yield word, 1
transformation = (lines.flatMap(cleaner)
                  .reduceByKey(lambda a,b: a+b)
                  .map(lambda t: (t[1],t[0]))
                  .sortByKey(ascending=False)
                  .map(lambda t: (t[1],t[0])))
print(transformation.take(10))
with open("/dbfs/FileStore/HW3/Sec1/1.txt", "w") as f:
  for (word, count) in transformation.collect():
    f.write(f'{word}: {count}\n')

خروجی: لیست ۱۰ کلمه پر تعداد به صورت زیر چاپ شده است:

[('the', 336), ('and', 238), ('of', 162), ('to', 154), ('in', 128), ('a', 106), ('is', 76), ('this', 74), ('people', 62), ('that', 61)]

 توضیحات: در این کد ابتدا یک تابع cleaner به عنوان map عملیات پاکسازی و پیش‌پردازش‌های مورد نیاز را روی هر خط از فایل ورودی انجام می‌دهد. بدین منظور در مرحله اول تمامی کاراکترهای Unicode موجود در متن به نزدیک‌ترین کاراکتر ASCII توسط کتابخانه Unidecode تبدیل شده‌اند. علت انجام این امر، استفاده از کاراکترهای گرامری )نظیر (“ در متن داده شده بوده که اگرچه در ظاهر قابل مشاهده بوده ولی در عمل با کاراکترهای ASCII (نظیر “) مطابقت ندارند. همین امر باعث می‌شود که امکان تشخیص و حذف آن‌ها در متن ممکن نباشد. در مرحله بعدی مقادیر فاصله از ابتدا و انتهای خطوط حذف شده و سپس به حذف تمامی علامت‌های گرامری (Punctuations) از خط پرداخته می‌شود. در ادامه تمامی کاراکترهای موجود در خط به صورت Minimal تبدیل شده و در نهایت کلمات موجود در خط جداسازی و به صورت یک generator بازگردانده می‌شوند. مقدار خروجی لیستی از تاپل‌های کلمات بوده که هر کلمه با شمارنده خود (عدد ۱) جفت شده است.

در مرحله بعد از Tranformation، یک عملیات به صورت reduceByKey صورت گرفته و در طی آن به ازای هر کلمه شمارش لازم روی شمارنده‌های کلمات صورت می‌گیرد.
در انتها نیز با انجام یک عملیات map برای معکوس کردن تاپل‌ها، مرتب‌سازی آن‌ها و انجام یک map دیگر برای تبدیل آن‌ها به فرمت تاپل اولیه، کار مرتب‌سازی نزولی بر اساس تعداد کلمات انجام می‌گیرد. لازم به ذکر است که در این مرحله به جای استفاده از تابع sortByKey امکان استفاده از تابع sortBy نیز وجود داشت که در آن تابع نگاشت مرتب‌سازی برای انجام مرتب‌سازی بر اساس عضو دوم تاپل تعریف می‌شود، اما برای ذکر روش‌های متنوع مرتب‌سازی استفاده از این تابع در سؤال ۲ صورت گرفته است. در صورت استفاده از تابع sortBy دیگر نیازی به انجام دو مرحله map قبل و بعد از مرتب‌سازی نمی‌باشد.

در انتهای عملیات پردازش، یک action برای نمایش ۱۰ رکورد ابتدایی با استفاده از تابع take صورت گرفته است. همچنین با استفاده از action ای برابر collect، مقادیر نهایی محاسبه شده در یک فایل متنی به صورت ستونی نیز چاپ شده است. لازم به ذکر است که استفاده از take و collect در این مرحله هر یک باعث می‌شود تا یک job مجزا برای پردازش صورت گیرد، که در این سؤال با توجه به حجم کم داده‌ها از هر دو action استفاده شده است. ولی در یک پردازش با حجم داده بالا، اینکار باعث انجام پردازش duplicate می‌شود که مطلوب نیست و تنها باید یکبار action صورت گیرد

بخش دوم

from unidecode import unidecode
import string
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
def cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  line = line.lower()
  for word in line.split():
    yield word
transformation = (lines.flatMap(cleaner)
                  .filter(lambda word: word.startswith('m'))
                  .map(lambda word: (word, 1))
                  .reduceByKey(lambda a,b: a+b)
                  .map(lambda t: (t[1],t[0]))
                  .sortByKey(ascending=False)
                  .map(lambda t: (t[1],t[0])))
print(transformation.take(10))
print(f'Number of distinct words that starts with <m>: {transformation.count()}')

خروجی

[('more', 18), ('many', 17), ('me', 14), ('my', 9), ('most', 9), ('make', 6), ('much', 6), ('modern', 5), ('m', 5), ('marjane', 5)] Number of distinct words that starts with <m>: 56

توضیحات:
پس از انجام عملیات پاکسازی، یک فیلتر برای جداسازی کلماتی که با حرف m آغاز می‌شوند تعریف شده و در ادامه با ایجاد تاپل‌های همراه با شمارنده، به شمارش این کلمات اقدام شده و با مرتب سازی نزولی، در نهایت به ازای هر کلمه، تعداد آن در خروجی چاپ شده است

بخش سوم

from unidecode import unidecode
import string
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
def cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  line = line.lower()
  for word in line.split():
    yield word
transformation = (lines.flatMap(cleaner)
                  .filter(lambda word: len(word) == 5)
                  .filter(lambda word: not word.startswith(('a', 'e', 'i', 'o', 'u')))
                  .map(lambda word: (word, 1))
                  .reduceByKey(lambda a,b: a+b)
                  .map(lambda t: (t[1],t[0]))
                  .sortByKey(ascending=False)
                  .map(lambda t: (t[1],t[0])))
print(transformation.take(10))
print(f'Number of distinct words that starts with <m>: {transformation.count()}')

خروجی

[('these', 20), ('which', 19), ('their', 19), ('there', 18), ('games', 16), ('could', 15), ('style', 13), ('where', 12), ('think', 12), ('place', 11)] Number of distinct words that starts with <m>: 180

توضیحات:
پس از انجام عملیات پاکسازی، یک فیلتر برای جداسازی کلماتی با طول ۵ تعریف شده و در ادامه یک فیلتر برای جداسازی کلماتی که با حروف غیرصدا دار آغاز می‌شوند اعمال شده است. پس از آن نیز مجدداً همانند بخش‌های پیشین عملیات شمارش صورت گرفته است

بخش چهارم

from unidecode import unidecode
import string
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
def cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  line = line.lower()
  for word in line.split():
    yield word, 1
transformation = (lines.flatMap(cleaner)
                  .reduceByKey(lambda a,b: a+b)
                  .map(lambda t: (t[1],t[0]))
                  .sortByKey(ascending=False)
                  .map(lambda t: t[1]))
t10p = round(0.1*transformation.count())
print(f'10 percent of the number of distinct words is: {t10p}')
stop_words = list(transformation.take(t10p))
print(f'list of stop words are: ')
print(stop_words)
def stop_words_cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  words = line.split()
  words = [word for word in words if word.lower() not in stop_words]
  return ' '.join(words)
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
transformation = lines.map(stop_words_cleaner)
with open("/dbfs/FileStore/HW3/Sec1/4.txt", "w") as f:
  for line in transformation.collect():
    f.write(f'{line}\n')

خروجی

۱۰ percent of the number of distinct words is: 125 list of stop words are: ['the', 'and', 'of', 'to', 'in', 'a', 'is', 'this', 'people', 'that', 'it', 'they', 'i', 'was', 'on', 'have', 'with', 'you', 'them', 'are', 'all', 'were', 'for', 'square', 'as', 'be', 'can', 'up', 'when', 'time', 'what', 'one', …, 'gender']ش

توضیحات:
در این بخش از دو مرحله Transformation-Action استفاده شده است. مرحله اول اقدام به استخراج کلمات Stop Words کرده و مرحله دوم اقدام به حذف این کلمات از فایل ورودی و پاکسازی فایل به صورت خواسته شده می‌نماید.
در مرحله اول، برای تشخیص کلمات Stop Words ابتدا به پاکسازی فایل به نحوه ذکر شده در مراحل قبلی صورت گرفته می‌شود. پس از آن کل تعداد کلمات فایل مطابق بخش‌های پیشین شمارش می‌گردد. پس از آن تعداد ۱۰ درصد بیشترین کلمات یکتا در فایل بدست آمده و این تعداد از ابتدای لیست نزولی تعداد کلمات فایل جداسازی می‌شود (در اینجا ۱۲۵ کلمه برابر ۱۰ درصد کل کلمات یکتا در فایل می‌باشد). در نهایت این کلمات در یک لیست به عنوان کلمات Stop Words ذخیره می‌گردد.
در مرحله دوم، همانند بخش‌های قبلی عملیات پاکسازی صورت گرفته و در همان تابع cleaner از map اولیه، اقدام به حذف لیست کلمات Stop Words که از مرحله پیشین بدست آمده است صورت می‌گیرد. پس از این هیچ Transformation دیگری روی فایل صورت نگرفته و در ادامه با یک Action برابر collect، خطوط فایل پاکسازی شده در یک فایل خروجی نوشته می‌شود.

بخش پنجم

from unidecode import unidecode
import string
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
def cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  line = line.lower()
  for word in line.split():
    yield word, 1
transformation = (lines.flatMap(cleaner)
                  .reduceByKey(lambda a,b: a+b)
                  .map(lambda t: (t[1],t[0]))
                  .sortByKey(ascending=False)
                  .map(lambda t: t[1]))
t10p = round(0.1*transformation.count())
print(f'10 percent of the number of distinct words is: {t10p}')
stop_words = list(transformation.take(t10p))
print(f'list of stop words are: ')
print(stop_words)
def stop_words_cleaner(line):
  line = unidecode(line)
  line = line.strip()
  punctuations = string.punctuation
  line = line.translate(line.maketrans(punctuations, ' '*len(punctuations)))
  words = line.split()
  words = [word for word in words if word.lower() not in stop_words]
  return ' '.join(words)
lines = sc.textFile("dbfs:/FileStore/tables/input.txt")
transformation = lines.map(stop_words_cleaner)
with open("/dbfs/FileStore/HW3/Sec1/4.txt", "w") as f:
  for line in transformation.collect():
    f.write(f'{line}\n')

خروجی:

Out[46]: [(('of', 'the'), 56), (('in', 'the'), 53), (('and', 'the'), 15), (('this', 'is'), 15), (('all', 'of'), 13), (('it', 'is'), 13), (('on', 'the'), 12), (('to', 'the'), 11), (('soho', 'square'), 11), (('of', 'this'), 10), (('people', 'to'), 10), (('of', 'a'), 10), (('to', 'get'), 9), (('with', 'the'), 9),…

توضیحات:
در این بخش پس از انجام عملیات پاکسازی روی هر خط، کلمات آن جداسازی می‌گردند. سپس به ازای هر خط، تاپل‌هایی شامل کلمات متوالی (تاپل اول شامل کلمه اول و دوم، تاپل دوم شامل کلمه دوم و سوم، …) بازگردانده می‌شوند. این تابع توسط Transformation ای برابر flatMap باز گردانده می‌شود، چرا که در این مرحله نیازی به حفظ ساختار خطوط نداریم. در ادامه با شمارش تعداد هر تاپل، یک فیلتر برای جداسازی تاپل‌هایی که تعداد تکراری بیشتر از ۱ دارند را تعبیه میکنیم. در نهایت نیز عملیات مرتب‌سازی را انجام می‌دهیم.

بررسی یک فایل لاگ وب سرور

بخش اول

import logging
import re
import sys
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  return groups[0]
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .distinct())
print(transformation.count())

خروجی:
۸۱۹۶۹
توضیحات:
در این بخش ابتدا یک Regular Expression برای جداسازی بخش‌های مختلف این فایل طراحی شده‌ است. با بررسی بیشتر فایل به نظر می‌آید که خطوطی از فایل دارای مشکل و خطا بوده و خط لاگ به صورت استاندارد درج نشده است. کار طراحی این Regex بر اساس بیشترین مطابقت با خطوط انجام شده و در نهایت حدود ۸۰۰ خط از فایل لاگ قابلیت Parse شدن را به علت خرابی داده‌ نداشته‌اند.
به همین منظور، تابع Parser در صورتی که یک خط از فایل با Regex طراحی شده مطابقت نداشت، به ازای آن خط مقدار None را باز می‌گرداند و در غیر اینصورت، دیتای Parse شده را باز خواهد گرداند. در مرحله بعد لازم است تا با استفاده از یک filter، مقادیر None را از map مرحله قبل حذف کنیم و سپس داده‌ها را به مراحل بعد ارجاع دهیم. لازم به ذکر است که این الگو در تمامی بخش‌های این سؤال تکرار شده است که توضیح آن در اینجا داده شد.
در این بخش، تابع Parser اقدام به بازگرداندن مقدار Host از هر خط کرده، در مرحله بعد داده‌های مجاز فیلتر شده و در انتها تابع Distinct برای حذف هاست‌های تکراری اعمال شده است. پس از آن با اجرای Action ای برابر count، اقدام به شمارش هاست‌های یکتای فایل صورت گرفته است

بخش دوم

import logging
import re
import sys
from datetime import datetime
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
import logging
import re
import sys
from datetime import datetime
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  try:
    datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z")
  except:
    return None
  return ((groups[0],str(datetimeobject.date())), 1)
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .reduceByKey(lambda a,b: a+b)
                 .map(lambda ip_date_count: (ip_date_count[0][0], (ip_date_count[1], 1)))
                 .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
                 .mapValues(lambda sum_count: round(sum_count[0]/sum_count[1], 2))
                 .sortBy(lambda t: t[1],ascending=False))
print(transformation.take(10))

خروجی

[('rush.internic.net', 1119.0), ('siltb10.orl.mmc.com', 841.44), ('piweba3y.prodigy.com', 627.57), ('marina.cea.berkeley.edu', 599.67), ('alyssa.prodigy.com', 523.47), ('nidhogg.srl.caltech.edu', 506.5), ('piweba4y.prodigy.com', 482.96), ('bill.ksc.nasa.gov', 445.14), ('spidey.cor.epa.gov', 401.0), ('hoohoo.ncsa.uiuc.edu', 367.0)]

توضیحات:
در این بخش عملیات Parse به صورتی که در بخش قبل گفته شد صورت می‌گیرد. تابع Parse داده‌های را به صورت یک تاپل با دو عضو باز می‌گرداند. عضو اول این تاپل، یک تاپل به صورت هاست-تاریخ بوده و عضو دوم آن شمارشگر برابر ۱ می‌باشد.
در مرحله دوم Transformation، عملیات حذف داده‌های غیر مجاز صورت گرفته، در مرحله سوم به ازای کلیدها (هاست-تاریخ) کار شمارش انجام می‌گردد. خروجی این مرحله نشان‌دهنده تعداد دسترسی به هر هاست در هر روز می‌باشد.

در مرحله سوم، لازم است تا عملیات میانگین‌گیری روی دسترسی‌های هر هاست به ازای روزهای مختلف انجام گیرد. کار میانگیری در الگوی MapReduce بایستی به صورتی صورت گیرد که در هنگام انجام عملیات Reduce، همزمان هم مقدار جمع اعداد و هم مقدار تعداد اعداد تا هر لحظه نگهداری شوند. بدین منظور لازم است تا دو مقدار به صورت یک تاپل، یکی مقدار جمع تا این لحظه و یکی تعداد تا این لحظه نگهداری گرند.
بر این اساس، در مرحله سوم تاپل‌ها به فرمت جدید map می‌گردند. در این فرمت یک تاپل خواهیم داشت که کلید آن برابر هاست و مقدار آن برابر یک تاپل که عضو اول آن تعداد مشاهده آن هاست در هر روز و عضو دوم آن شمارنده ۱ می‌باشد.

در مرحله چهارم اقدام به میانگین‌گیری روی مقادیر خواهیم کرد. بر این اساس به ازای هر کلید، مقدار برابر یک تاپل خواهد شد که عنصر اول آن برابر مجموع کل بازدیدهای هر روز آن هاست و عنصر دوم برابر تعداد روزهای مشاهده آن هاست می‌باشد.
در مرحله پنجم با یک عملیات map اقدام به تقسیم عنصر اول هر تاپل به عنصر دوم آن به ازای هر کلید می‌کنیم و در نتیجه، خروجی این مرحله یک تاپل با کلید هاست و مقدار میانگین بازدید روزانه آن هاست می‌باشد.
در محله آخر نیز خروجی را مرتب کرده و در انتها با Action ای برابر take، ۱۰ هاست با بیشترین بازدید روزانه را نمایش می‌دهیم.

بخش سوم:

logging
import re
import sys
from datetime import datetime
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  return groups[5]
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .filter(lambda url: url.endswith(".gif"))
                 .distinct())
print(transformation.take(10))
print(f'Number of distinct gif files: {transformation.count()}')

خروجی:

['/shuttle/countdown/count.gif', '/images/KSC-logosmall.gif', '/images/ksclogo-medium.gif', '/software/winvn/bluemarb.gif', '/software/winvn/wvsmall.gif', '/history/apollo/images/footprint-small.gif', '/shuttle/resources/orbiters/endeavour.gif', '/shuttle/missions/sts-78/sts-78-patch-small.gif', '/shuttle/resources/orbiters/columbia-logo.gif', '/images/ksclogosmall.gif'] Number of distinct gif files: 1194

توضیحات:
در این بخش پس از انجام عملیات Parse، با بازگرداندن مقدار URL خط لاگ مربوطه توسط یک فیلتر مقادیری که به پسوند .gif ختم می‌شوند را جداسازی کرده و در نهایت به صورت یکتا در می‌آوریم. در انتها نیز تعدادی از آن‌‌ها را به صورت نمونه چاپ کرده و تعداد کل آن‌ها را شمارش می‌کنیم

بخش چهارم

 import logging
import re
import sys
from datetime import datetime
import ipaddress
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  return (groups[0], 1)
def is_ip_address(host):
  try:
    ipaddress.ip_address(host)
  except:
    return False
  return True
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .filter(lambda host: not is_ip_address(host[0]))
                 .reduceByKey(lambda a,b: a+b)
                 .filter(lambda host_count: host_count[1] > 3)
                 .sortBy(lambda t: t[1],ascending=False)
                 )
print(f'Top 10 all time visited domains:')
print(transformation.take(10))
#############
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  try:
    datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z")
  except:
    return None
  return ((groups[0],str(datetimeobject.date())), 1)
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .filter(lambda host: not is_ip_address(host[0][0]))
                 .reduceByKey(lambda a,b: a+b)
                 .map(lambda host_date_count: (host_date_count[0][0], host_date_count[1]))
                 .sortBy(lambda t: t[1],ascending=False)
                 #If we want to retrieve only top record, we can use the below action
                 #.reduce(lambda a,b: a if a[1] > b[1] else b)
                 )
print(f'Top 10 per day visited domains:')
print(transformation.take(10))
############
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  try:
    datetimeobject = datetime.strptime(groups[3], "%d/%b/%Y:%H:%M:%S %z")
  except:
    return None
  return ((groups[0],str(datetimeobject.date())), 1)
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .filter(lambda host_date_count: not is_ip_address(host_date_count[0][0]))
                 .reduceByKey(lambda a,b: a+b)
                 .filter(lambda host_date_count: host_date_count[1] > 3)
                 .map(lambda host_date_count: (host_date_count[0][1], (host_date_count[0][0], host_date_count[1])))
                 .reduceByKey(lambda a,b: a if a[1] > b[1] else b)
                 .sortByKey()
                 )
print(f'Most visited domain per day:')
transformation.take(10)

خروجی:

Top 10 all time visited domains: [(('bill.ksc.nasa.gov', '1995-07-11'), 1394), (('indy.gradient.com', '1995-07-12'), 1356), (('siltb10.orl.mmc.com', '1995-07-21'), 1354), (('bill.ksc.nasa.gov', '1995-07-12'), 1317), (('piweba3y.prodigy.com', '1995-07-16'), 1280), (('piweba4y.prodigy.com', '1995-07-16'), 1269), (('piweba3y.prodigy.com', '1995-07-13'), 1202), …, ('۱۹۹۵-۰۷-۱۰', ('e659229.boeing.com', 358))]

توضیحات:
در این بخش سه عملیات مجزا را انجام می‌دهیم. قسمت اول پر درخواست‌ترین دامنه‌ها را می‌یابد، قسمت دوم دامنه‌هایی را می‌یابد که بیشترین تعداد درخواست در یک روز داشته‌اند و قسمت سوم به ازای هر روز، پر درخواست‌‌ترین دامنه را بدست می‌دهد.
در قسمت اول، پس از Parse کردن فایل لاگ، یک فیلتر برای تشخیص دامنه بودن هاست مورد نظر اعمال می‌گردد. این‌ کار توسط یک تابع انجام می‌گیرد که تشخیص می‌دهد این هاست با یک آدرس IP مطابقت دارد یا خیر. در مرحله بعدی از Transformation، تعداد درخواست‌های هر دامنه شمارش می‌شود (دقت کنید که در این مرحله چون هر رکورد از لاگ به معنی یک درخواست می‌باشد، نیازی به دخالت دادن تاریخ درخواست در این شمارش نیست)، و سپس با اعمال یک فیلتر، دامنه‌هایی که تعداد بازدید بیشتر از ۳ دارند فیلتر می‌گردد. در نهایت نیز با تابع sortByValue دامنه‌ها بر اساس تعداد بازدید به صورت نزولی مرتب می‌شوند.

در قسمت دوم، همانند بخش قبلی عمل می‌کنیم با این تفاوت که کلید تاپل‌های شمارش به جای نام دامنه، یک تاپل شامل نام دامنه و تاریخ بازدید است. به این صورت می‌تواند مجموع تعداد بازدید هر دامنه را به ازای هر روز بدست آورد. در ادامه با حذف تاریخ از این تاپل‌ها، عملیات مرتب‌سازی صورت گرفته و دامنه‌هایی با بیشترین بازدید در یک روز به صورت مرتب بدست می‌آیند.

در قسمت سوم، همانند قسمت دوم، تاپل‌هایی شامل نام دامنه و تاریخ را به عنوان کلید در نظر می‌گیریم. در ادامه عملیات شمارش را به ازای هر دامنه و هر روز انجام داده، دامنه‌هایی که تعداد بازدید زیر ۳ دارند را حذف و سپس با یک تبدیل، مقدار تاریخ را به عنوان کلید و نام دامنه و تعداد بازدید روزانه را به عنوان مقدار در نظر می‌گیریم. در این حالت، با انجام یک مرحله عملیات Reduce روی تاریخ به ازای هر روز، تابع Reduce را به صورت ماکسیمم‌گیری تعریف می‌کنیم. پس از این مرحله، به ازای هر تاریخ، دامنه‌ای که بیشترین بازدید را بین دامنه‌های آن تاریخ داشته است بدست خواهد آمد.

بخش پنجم:

import logging
import re
import sys
from datetime import datetime
import ipaddress
import matplotlib.pyplot as plt
regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)(?:\s+)?(\S+)?(?:\s+)?(\S+)?" (\d{3}|-) (\d+|-)$'
lines = sc.textFile("dbfs:/FileStore/tables/Log")
def parser(line):
  line = line.strip()
  regmatch = re.match(regex, line)
  if regmatch is None:
      return None
  groups = regmatch.groups()
  return (groups[7], 1)
transformation = (lines.map(parser)
                 .filter(lambda record: record is not None)
                 .filter(lambda code: code[0] != '200' and code[0] != '-')
                 .reduceByKey(lambda a,b: a+b)
                 .sortBy(lambda t: t[1],ascending=False)
                 )
errors = transformation.collect()
codes = [t[0] for t in errors]
counts = [t[1] for t in errors]
x = codes
y = counts
plt.barh(x, y, color=(0.1, 0.1, 0.1, 0.1),  edgecolor='blue')
axes = plt.gca()
axes.set_xlim([0,160000])
for index, value in enumerate(y):
    plt.text(value+2000, index-0.12, str(value))
plt.ylabel("Error Codes")
plt.xlabel("Counts")
plt.show()

خروجی:
توضیحات:

در این بخش پس از انجام عملیات Parse و خروجی گرفتن تاپل کد-شمارنده، با یک فیلتر تمامی کدهای غیر از «۲۰۰» و «-» را فیلتر می‌کنیم. در ادامه به ازای هر کد، تعداد تکرار را بدست آورده و در نهایت به صورت نزولی مرتب می‌کنیم. پس از آن با استفاده از کتابخانه matplotlib یک نمودار میله‌ای برای این توزیع رسم می‌کنیم.

چه رتبه ای می‌دهید؟

میانگین ۵ / ۵. از مجموع ۱

اولین نفر باش

title sign
معرفی نویسنده
مجتبی بنائی
مقالات
5 مقاله توسط این نویسنده
محصولات
3 دوره توسط این نویسنده
مجتبی بنائی

دانشجوی دکترای نرم‌افزار دانشگاه تهران، مدرس دانشگاه و فعال در حوزه مهندسی‌ نرم‌افزار و علم داده که تمرکز کاری خود را در چند سال اخیر بر روی مطالعه و تحقیق در حوزه کلان‌داده و تولید محتوای تخصصی و کاربردی به زبان فارسی و انتشار آنها در سایت مهندسی داده گذاشته است. مدیریت پروژه‌های نرم‌افزاری و طراحی سامانه‌های مقیاس‌پذیر اطلاعاتی از دیگر فعالیتهای صورت گرفته ایشان در چند سال گذشته است.مهندس بنائی همچنین: مدرس دوره‌های BigDataی نیک آموز، دانشجوی دکترای نرم افزار و مدرس دانشگاه تهران، مجری و مشاور پروژه‌های کلان‌داده در سطح ملی و بین المللی، فعال در حوزه تولید محتوای تخصصی در زمینه پردازش داده می باشد.

پروفایل نویسنده
title sign
دیدگاه کاربران