خانه مهندسی داده بررسی عملی سه روش پردازش دادههای آفلاین در اسپارک [بخش دوم] مهندسی داده Spark نوشته شده توسط: مجتبی بنائی تاریخ انتشار: ۲۹ آبان ۱۳۹۹ آخرین بروزرسانی: ۱۹ بهمن ۱۴۰۰ زمان مطالعه: 40 دقیقه ۵ (۱) کار با دیتافریمها / Spark SQL راهکار اول: استفاده از جداول دیتابریکز ابتدا فایل stock را بهصورت یک جدول با schemaای بهصورت زیر در بخش دادههای سامانهی databricks آپلود میکنیم: چند نمونه از رکوردهای این جدول نیز بهصورت زیر میباشد: بهمنظور حل سوالات با دیتافریم، با دستور spark.read.table، جدول stock را خوانده و در یک دیتافریم بهنام df ذخیره میکنیم. در ادامه با عملیات بر روی این دیتافریم، به سوالات پاسخ میدهیم. df = spark.read.table("stock") ولی بهمنظور حل سوالات با اسپارک sql، در همان بخش مربوطه، مستقیما از دستور ()spark.sql برای پاسخ به سوال استفاده میکنیم. بخش اول: یک ستون اطلاعات جدید بهنام HV ایجاد کنید که نسبت بالاترین قیمت بر حجم سهام معامله شده برای یک روز را نشان میدهد. Spark dataframe: با دستور withColumn، یک ستون جدید بهنام ‘HV’ ایجاد میکنیم که مقدار هر سطر آن از تقسیم مقدار ستون ‘High’ بر ستون ‘Volume’ در آن سطر بهدست میآید. بهمنظور درستی عملیات انجام شده، ما این دیتافریم جدید را new_df نامیده و در ادامه با دستور display() آنرا نمایش میدهیم. new_df_sql = spark.sql("SELECT *, (High/Volume) AS HV FROM stock") display(new_df_sql) Spark.sql: new_df_sql = spark.sql("SELECT *, (High/Volume) AS HV FROM stock") display(new_df_sql) بخش دوم: پیک بالاترین قیمت برای چه روزی بوده است؟ Spark dataframe: ابتدا با دستور orderBy(‘High’, ascending=False)، سطرهای دیتافریم بر حسب ستون ‘High’ بهصورت نزولی مرتب میکنیم. سپس اولین سطر آن (که شامل ماکزیمم مقدار ستون ‘High’ است) را با دستور limit(1) استخراج میکنیم. در نهایت، مقدار ستون ‘Date’ این دیتافریم جدید را با دستور select(‘Date’) انتخاب کرده و سپس با دستور show() خروجی را نمایش میدهیم. - df.orderBy('High', ascending=False).limit(1).select('Date').show() Spark.sql: • spark.sql("""SELECT Date FROM stock • WHERE High = (SELECT MAX(High) FROM stock) • """).show() بخش سوم: میانگین ستون Close چقدر است؟ Spark dataframe در اینجا ابتدا با دستور agg({‘Close’:’mean’})، میانگین ستون Close را استخراج کرده و سپس با دستور show() خروجی را نمایش میدهیم. - df.agg({'Close':'mean'}).show() بخش چهارم: مقدار ماکزیمم و مینیمم ستون Volume را مشخص کنید. • Spark dataframe: در اینجا ابتدا با دستور agg({‘Volume’:’max’})، ماکزیمم مقدار ستون ‘Volume’ را استخراج میکنیم. سپس با دستور join، دیتافریم حاصل که تنها شامل یک ستون ‘max(Volume)’ و آنهم تنها با یک سطر است را با دیتافریم حاصل از agg({‘Volume’:’min’}) که تنها شامل یک ستون ‘min(Volume)’ و آنهم تنها با یک سطر است، ضرب دکارتی میکنیم (چونکه مشخص نکردیم که join بر روی چه ستونی انجام گیرد، بههمین دلیل ضرب دکارتی انجام میشود و مقدار دو ستون ‘max(Volume)’ و ‘min(Volume)’ در یک سطر در کنارهم قرار میگیرند). در نهایت خروجی را با دستور show() نمایش میدهیم. - df.agg({'Volume':'max'}).join(df.agg({'Volume':'min'})).show() Spark.sql: spark.sql("SELECT MAX(Volume),MIN(Volume) FROM stock").show() بخش پنجم: چند روز ستون Close کمتر از ۶۰دلار بوده است؟ Spark dataframe: در اینجا ابتدا با دستور filter(df.Close < 60)، سطرهایی که مقدار ستون ‘Close’ آنها کمتر از ۶۰ دلار بوده است را استخراج میکنیم. سپس با دستور count()، تعداد سطرهای حاصل را شمارش مینماییم. - df.filter(df.Close < 60).count() Spark.sql: spark.sql("SELECT count(*) FROM stock WHERE Close<60").show() بخش ششم: Pearson correlation بین ستونهای High و Volume چقدر است؟ Spark dataframe: در اینجا با دستور corr(‘High’, ‘Volume’, method=’pearson’)، کورلیشن بین ستون ‘High’ و ‘Volume’ را با روش pearson محاسبه میکنیم. البته لازم به ذکر است که اگر از پارامتر method=’pearson’ نیز استفاده نمیکردیم مشکلی نبود، چونکه بهطور پیشفرض از روش pearson برای محاسبهی کورلیشن استفاده میشود. df.corr('High','Volume',method='pearson') spark.sql("select corr(High,Volume) from stock").show() بخش هفتم: ماکزیمم ستون High در هر سال چقدر است؟ Spark dataframe: در اینجا ابتدا با دستور withColumn(‘year’, df.Date.substr(0,4))، یک ستون جدید بهنام ‘year’ اضافه میکنیم که حاوی مقدار سال ستون ‘Date’ است. * لازم به ذکر است که در اینجا بهجای df.Date.substr(0,4) میتوانستیم از دستور year(‘Date’) نیز استفاده کنیم. ولی چونکه year تابعی از spark.sql است، از استفاده از آن خودداری نمودیم. سپس با دستور GroupBy(‘year’).max(‘High’)، ماکزیمم مقدار ستون ‘High’ در هر سال را محاسبه میکنیم. در نهایت، با دستور orderBy(‘year’)، خروجی را بر حسب سال و بهصورت صعودی مرتب میکنیم و با دستور show() آنرا نمایش میدهیم - df_with_year = df.withColumn('year', df.Date.substr(0,4)) - df_with_year.groupBy('year').max('High').orderBy('year').show() sql: • spark.sql(""" SELECT year(Date) as year, max(High) • FROM stock • GROUP BY year • ORDER BY year • """).show() کار با دیتافریمها/Spark SQL – راهکار دوم : تبدیل یک دیتافریم به جدول موقت بخش اول کد DataFrame: from pyspark.sql import SQLContext #from pyspark.sql import Row import pyspark.sql.functions as F sqlContext = SQLContext(sc) lines = sc.textFile("dbfs:/FileStore/tables/stock.csv") parsed_lines = (lines.zipWithIndex() .filter(lambda line_index: line_index[1] > 0) .map(lambda line_index: line_index[0]) .map(lambda line: line.split(',')) .map(lambda line: [line[0]]+[float(record) for record in line[1:]]) #Method2 to create dataframe #.map(lambda line: Row(Date=line[0], Open=float(line[1]), High=float(line[2]), Low=float(line[3]), Close=float(line[4]), Volume=float(line[5]), AdjClose=float(line[6]))) ) #Method1 to create dataframe df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"]) #Method3 to create dataframe #df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dbfs:/FileStore/tables/stock.csv') #Method1 df = df.withColumn("HV", df["High"] / df["Volume"]) #Method2 #df = df.withColumn("HV", F.col("High") / F.col("Volume")) display(df.head(10)) کد SQL: ## This Part must be run after the previous one df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"]) df.registerTempTable('df_table') df_table = sqlContext.sql(''' SELECT *, High/Volume as HS from df_table ''' ) display(df_table.head(10)) خروجی: بخش دوم کد DataFrame: df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"]) ( df.join(df.agg(F.max("High").alias("High")).select("High"), "High") .select(["Date","High"]) .show() ) df.registerTempTable('df_table') df_table = sqlContext.sql(''' SELECT a.Date, a.High FROM df_table a INNER JOIN ( SELECT MAX(High) High FROM df_table ) b ON a.High = b.High ''' ) df_table.show() خروجی توضیحات: در بخش DataFrame، ابتدا با استفاده از تابع aggregation اقدام به پیدا کردن بالاترین مقدار در ستون High میکنیم. در اینجا نام ستون بازگردانده شده را با استفاده از دستور alias به همان نام High تغییر میدهیم تا در مراحل بعدی راحتتر بتوانیم به این ستون ارجاع بدهیم. با توجه به اینکه دستورات aggregation بر روی مقادیر کار میکنند و به ازای مقادیر امکان بازگرداندن کل ردیف مربوطه را ندارند، لازم است تا پس از یافتن بالاترین مقدار، با استفاده از join روی همان جدول و بر روی متغیر High، کل ردیف دادهای که شامل آن بالاترین مقدار است یافت شود. در انتها نیز تنها ستونهای High و Date را با استفاده از تابع select باز میگردانیم و دیتافریم نهایی را با دستور show نمایش میدهیم. در بخش SQL لازم است تا ابتدا با یک پرس و جو، بیشترین مقدار ستون High یافت شود. با توجه به اینکه توابع Aggregation در SQL امکان بازگرداندن کل ردیف را ندارند، در اینجا نیز لازم است تا یک عملیات JOIN صورت گیرد تا کل ردیف داده بازگردانده شود. بخش سوم کد DataFrame df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"]) df.agg(F.avg("Close").alias("averageClose")).show() کد خروجی df.registerTempTable('df_table') df_table = sqlContext.sql(''' SELECT AVG(Close) averageClose FROM df_table ''' ) df_table.show() خروجی توضیحات: در بخش DataFrame، با استفاده از تابع agg و انجام عملیات avg (با استفاده از مجموعه توابع sql.functions)، مقدار میانگین خواسته شده محاسبه شده است. همچنین یک تغییر نام با استفاده از تابع alias به صورت زنجیرهای روی تابع aggregation میانگین نیز انجام شده است تا نام ستون خواناتر باشد. در بخش SQL نیز به راحتی با تابع Aggregation با نام AVG مقدار میانگین برای ستون خواسته شده محاسبه شده است. بخش چهارم کد DataFrame: df.agg(F.max("Volume").alias("maxVolume"),F.min("Volume").alias("minVolume")).show() df_table = sqlContext.sql(''' SELECT MAX(Volume) maxVolume, MIN(Volume) minVolume FROM df_table ''' ) df_table.show() توضیحات: در بخش DataFrame، با استفاده از تابع agg و انجام دو عملیات max و min (با استفاده از مجموعه توابع sql.functions) به صورت همزمان، مقادیر خواسته شده محاسبه شده است. نتیجه هر دو مقدار در یک دیتافریم واحد بازگردانده شده است. در بخش SQL نیز به با استفاده از توابع MIN و MAX پرس و جو انجام شده است. بخش پنجم کد DataFrame: df.agg(F.count(F.when(F.col("Close") < 60, True)).alias("Count")).show() کد SQL df_table = sqlContext.sql(''' SELECT COUNT(*) Count FROM df_table WHERE Close < 60 ''' ) df_table.show() خروجی توضیحات: در بخش DataFrame، با استفاده از تابع agg و انجام عملیات count با شرایطی که تابع when مشخص کرده است، مقدار شمارش صورت گرفته است. در بخش SQL نیز به با استفاده از توابع COUNT و شرط WHERE پرس و جو انجام شده است. بخش ششم کد DataFrame: print("The pearson correlation between High and Volume is: ") print(df.corr("High","Volume",method="pearson")) کد SQL df_table = sqlContext.sql(''' SELECT ((SUM(High*Volume)-((SUM(High)*SUM(Volume))/COUNT(*)))/(SQRT(SUM(High*High)-((SUM(High)*SUM(High))/COUNT(*)))*SQRT(SUM(Volume*Volume)-((SUM(Volume)*SUM(Volume))/COUNT(*))))) AS pearson FROM df_table ''' ) df_table.show() توضیحات: در بخش DataFrame، با استفاده از تابع corr مقدار Pearson Correlation برای ستون خواسته شده محاسبه شده است. این تابع در میان توابع Spark SQL موجود میباشد و در حال حاضر تنها از method ای برابر pearson پشتیبانی میکند. در بخش SQL، در صورتی که بخواهیم به توابع استاندارد SQL پایبند باشیم، تابعی برای محاسبه Pearson Correlation نداریم. به همین منظور و با توجه به جذابیت مسئله، مقدار Pearson Correlation را با استفاده از توابع پایه SUM، COUNT و SQRT پیادهسازی کردهایم. محض اطلاع، فرمول Pearson Correlation به صورت زیر تعریف میگردد: بخش هفتم کد DataFrame: udf_year = udf(lambda x: x[:4]) df.withColumn("year",udf_year("Date")).groupby("year").agg(F.max("High").alias("High")).sort(F.asc("year")).show() df_table = sqlContext.sql(''' SELECT SUBSTRING(Date,1,4) as year, MAX(High) as High FROM df_table GROUP BY SUBSTRING(Date,1,4) ORDER BY year ''' ) df_table.show() توضیحات: در بخش DataFrame، با توجه به اینکه مقدار سال در فیلد Date الحاق است، با استفاده از تابع udf_year، به ازای هر رکورد ستون Date، چهار کاراکتر اول که مقادیر سال میباشد را جدا کردهایم و توسط تابع withColumn اقدام به تولید یک ستون جدید با نام year کردهایم. در ادامه با استفاده از تابع groupby روی مقادیر year، عملیات Aggregation از نوع max را روی مقدار ستون High انجام دادهایم. در نهایت نتیجه را با استفاده از تابع sort بر روی مقادیر year به صورت صعودی برگرداندهایم. در بخش SQL، عملیات GROUP BY را بر روی مقادیر سال انجام دادهایم. مقادیر سال به راحتی با اعمال تابع SUBSTRING بر روی چهار کاراکتر ستون Date بدست میآیند. در نهایت این مقدار را به صورت صعودی مرتب کرده و به همراه MAX مقدار ستون High بر میگردانیم. Spark GraphX بخش اول کد: from pyspark.sql import SQLContext from graphframes import GraphFrame import pyspark.sql.functions as F sqlContext = SQLContext(sc) lines = sc.textFile("dbfs:/FileStore/tables/Vertex.txt") parsed_lines = lines.map(lambda line:line.split('\t')) vertex_df = sqlContext.createDataFrame(parsed_lines,["id","name"]) lines = sc.textFile("dbfs:/FileStore/tables/edges.txt") parsed_lines = lines.map(lambda line:line.split('\t')) edges_df = sqlContext.createDataFrame(parsed_lines,["src","dst"]) g = graphFrame(vertex_df,edges_df) g.edges.limit(5).show() g.vertices.limit(5).show() توضیحات: با توجه به توضیحات کتابخانه GraphFrames، لازم است تا ابتدا دو دیتافریم، یکی شامل اطلاعات نودهای گراف و یکی شامل اطلاعات یالهای گراف ایجاد کنیم. دیتافریم اطلاعات نودهای گراف میبایست شامل ستونی با نام id باشد که این ستون نشاندهنده شناسه یکتای هر نود باشد. دیتافریم یالها نیست میبایست شامل دو ستون با نامهای src و dst باشد که هر یک نشاندهنده شناسه یکتای نود مبدأ و مقصد در یال باشد. در اینجا با خواندن فایلهای نودها و یالها، با استفاده از یک map اقدام به جداسازی هر خط با جداساز «Tab» کرده و در ادامه به ازای هر خط لیستی از اجزای آن خط بر میگردانیم. پس از آن استفاده از تابع createDataFrame و تعیین نام ستونهای دیتافریم، برای هر کدام از لیست نودها و یالها یک دیتافریم میسازیم. در مرحله بعد با استفاده از کلاس GraphFrame این دو دیتافریم را به عنوان ورودی به این کلاس میدهیم تا گراف مربوطه ساخته شود. در ادامه با صدا زدن متدهای edges و vertices دیتافریمهایی شامل اطلاعات نودها و یالهای گراف ساخته شده قابل دریافت است. بخش دوم کد: g.vertices.join(g.inDegrees.sort(F.desc("inDegree")).limit(1),"id").show() g.vertices.join(g.outDegrees.sort(F.desc("outDegree")).limit(1),"id").show() خروجی توضیحات: در این مرحله با صدا زدن توابع inDegrees و outDegrees، دو دیتافریم که هر یک شامل دو ستون id و in/outDegree است بدست میآید. در این مرحله لازم است تا برای یافتن بیشترین درجه، این دیتافریمها را بر اساس ستون in/outDegree مرتب کرده و با دستور limit(1) اولین عنصر آنها را بیابیم. در این مرحله تنها اطلاعات id نود با بیشترین درجه را در اختیار داریم، فلذا لازم است یک عملیات join بر اساس ستون id با دیتافریم نودها که با متد vertices از گراف بدست میآید انجام شده و اطلاعات کامل نود مربوطه نمایش داده شود. بخش سوم کد: cdf = g.connectedComponents() cdf.groupBy("component").count().sort(F.desc("count")).show() توضیحات: در این بخش از تابع connectedComponents بر روی گراف استفاده میکنیم. این تابع یک دیتافریم بر میگرداند که شامل دو ستون id و component میباشد که به ازای هر نود نشانمیدهد که آن نود عضو کدام Component هست. شناسههای Component نیز به صورت تصادفی و نامرتب توسط کتابخانه انتخاب میشوند. پس از دریافت دیتافریم ناشی از اجرای تابع connectedComponents، به ازای هر مقدار از ستون component یک عملیات aggregation با شمارش تعداد مقادیر آن انجام میگیرد. در نهایت نیز نتایج به صورت نزولی بر اساس تعداد اعضای آن Component صورت میگیرد. توضیحات: در این بخش از تابع connectedComponents بر روی گراف استفاده میکنیم. این تابع یک دیتافریم بر میگرداند که شامل دو ستون id و component میباشد که به ازای هر نود نشانمیدهد که آن نود عضو کدام Component هست. شناسههای Component نیز به صورت تصادفی و نامرتب توسط کتابخانه انتخاب میشوند. پس از دریافت دیتافریم ناشی از اجرای تابع connectedComponents، به ازای هر مقدار از ستون component یک عملیات aggregation با شمارش تعداد مقادیر آن انجام میگیرد. در نهایت نیز نتایج به صورت نزولی بر اساس تعداد اعضای آن Component صورت میگیرد. بخش چهارم کد: g.vertices.join(g.inDegrees.sort(F.desc("inDegree")).limit(10),"id").sort(F.desc("inDegree")).show() خروجی توضیحات: در این بخش همانند بخش دوم با اجرای تابع inDegrees روی گراف، شناسه نودها همراه تعداد درجات ورودی آنها بدست میآید. در ادامه با مرتبسازی روی inDegree و جداسازی ۱۰ مقدار بیشترین درجه، شناسه ۱۰ نود با بیشترین درجه ورودی نتیجه میشود. حال لازم است تا مابقی مشخصات این نودها با انجام عملیات join روی دیتافریم ناشی از اجرای vertices بدست آمده و در اینجا با توجه به اینکه مرتبسازی انجام شده به هم میریزد، مجدد یک مرتب سازی نزولی روی مشخصه inDegree انجام گردد. نمایش گرافیکی نودها کد: import networkx as nx import matplotlib.pyplot as plt edges = g.edges nxGraph=nx.Graph() for edge in edges.select('src','dst').collect(): nxGraph.add_edge(edge['src'],edge['dst']) plt.figure(1,figsize=(12,12)) nx.draw(nxGraph, node_size=10, linewidths=0.5, width=0.5) توضیحات: در این بخش میخواهیم با استفاده از کتابخانه NetworkX از پایتون، گراف مورد نظر را رسم کنیم. برای اینکار ابتدا لیست یالها را با اجرای متد edges از گراف به صورت یک دیتافریم استخراج میکنیم. با توجه به اینکه در نمایش گراف نمیخواهیم اسامی نودها را نمایش دهیم (با توجه به طولانی بودن اسامی و همچنین تعداد بالای نودهای گراف)، در این مرحله نیازی به استخراج دیتافریم نودها نمیباشد. در مرحله بعد با اجرای یک حلقه روی دیتافریم collect شده، یالها را به گراف NetworkX اضافه میکنیم. در ادامه با تنظیم سایز نمودار به مقدار مناسب، با استفاده از تابع draw روی گراف NetworkX گراف را با پارامترهای مناسب رسم مینماییم. لازم به ذکر است که NetworkX برای رسم گراف از نمودارهای Matplotlib بهره میبرد و به همین جهت میتوان جزئیات نمودار را با دستورات این کتابخانه تنظیم کرد. چه رتبه ای میدهید؟ میانگین ۵ / ۵. از مجموع ۱ اولین نفر باش معرفی نویسنده مقالات 3 مقاله توسط این نویسنده محصولات 5 دوره توسط این نویسنده مجتبی بنائی دانشجوی دکترای نرمافزار دانشگاه تهران، مدرس دانشگاه و فعال در حوزه مهندسی نرمافزار و علم داده که تمرکز کاری خود را در چند سال اخیر بر روی مطالعه و تحقیق در حوزه کلانداده و تولید محتوای تخصصی و کاربردی به زبان فارسی و انتشار آنها در سایت مهندسی داده گذاشته است. مدیریت پروژههای نرمافزاری و طراحی سامانههای مقیاسپذیر اطلاعاتی از دیگر فعالیتهای صورت گرفته ایشان در چند سال گذشته است.مهندس بنائی همچنین: مدرس دورههای BigDataی نیک آموز، دانشجوی دکترای نرم افزار و مدرس دانشگاه تهران، مجری و مشاور پروژههای کلانداده در سطح ملی و بین المللی، فعال در حوزه تولید محتوای تخصصی در زمینه پردازش داده می باشد. معرفی محصول مجتبی بنائی دوره آموزش مهندسی داده [Data Engineering] 2.380.000 تومان مقالات مرتبط ۰۴ مهر مهندسی داده معماری Data Lakehouse چیست و چگونه کار میکند؟ نگین فاتحی ۲۴ شهریور مهندسی داده ردیس چیست و انواع آن کدامند؟ نگین فاتحی ۱۸ شهریور مهندسی داده مراحل ساده برای تحلیل داده با ChatGPT و پایتون نگین فاتحی ۱۰ شهریور مهندسی داده NoSQL چیست؟ هر آن چیزی که درباره پایگاه داده NoSQL باید بدانید تیم فنی نیک آموز دیدگاه کاربران لغو پاسخ دیدگاه نام و نام خانوادگی ایمیل ذخیره نام، ایمیل و وبسایت من در مرورگر برای زمانی که دوباره دیدگاهی مینویسم. موبایل برای اطلاع از پاسخ لطفاً مرا با خبر کن ثبت دیدگاه Δ