بررسی عملی سه روش پردازش داده‌های آفلاین‌ در اسپارک [بخش دوم]

بررسی عملی سه روش پردازش داده‌های آفلاین‌ در اسپارک [بخش دوم]

نوشته شده توسط: مجتبی بنائی
تاریخ انتشار: ۲۹ آبان ۱۳۹۹
آخرین بروزرسانی: ۱۹ بهمن ۱۴۰۰
زمان مطالعه: 40 دقیقه
۵
(۱)

کار با دیتافریم‌ها / Spark SQL راهکار اول:‌ استفاده از جداول دیتابریکز

ابتدا فایل stock را به‌صورت یک جدول با schemaای به‌صورت زیر در بخش داده‌های سامانه‌ی databricks آپلود می‌کنیم:

چند نمونه از رکوردهای این جدول نیز به‌صورت زیر می‌باشد:

  • به‌منظور حل سوالات با دیتافریم، با دستور spark.read.table، جدول stock را خوانده و در یک دیتافریم به‌نام df ذخیره می‌کنیم. در ادامه با عملیات بر روی این دیتافریم، به سوالات پاسخ می‌دهیم.
df = spark.read.table("stock")
  • ولی به‌منظور حل سوالات با اسپارک sql، در همان بخش مربوطه، مستقیما از دستور ()spark.sql برای پاسخ به سوال استفاده می‌کنیم.

بخش اول: یک ستون اطلاعات جدید به‌نام HV ایجاد کنید که نسبت بالاترین قیمت بر حجم سهام معامله شده برای یک روز را نشان می‌دهد.

Spark dataframe:

با دستور withColumn، یک ستون جدید به‌نام ‘HV’ ایجاد می‌کنیم که مقدار هر سطر آن از تقسیم مقدار ستون ‘High’ بر ستون ‘Volume’ در آن سطر به‌دست می‌آید. به‌منظور درستی عملیات انجام شده، ما این دیتافریم جدید را new_df نامیده و در ادامه با دستور display() آنرا نمایش می‌دهیم.

new_df_sql = spark.sql("SELECT *, (High/Volume) AS HV FROM stock")
display(new_df_sql)

Spark.sql:

new_df_sql = spark.sql("SELECT *, (High/Volume) AS HV FROM stock")
display(new_df_sql)

بخش دوم: پیک بالاترین قیمت برای چه روزی بوده است؟

Spark dataframe:

  •  ابتدا با دستور orderBy(‘High’, ascending=False)، سطرهای دیتافریم بر حسب ستون ‘High’ به‌صورت نزولی مرتب می‌کنیم.
  •  سپس اولین سطر آن (که شامل ماکزیمم مقدار ستون ‘High’ است) را با دستور limit(1) استخراج می‌کنیم.
  •  در نهایت، مقدار ستون ‘Date’ این دیتافریم جدید را با دستور select(‘Date’) انتخاب کرده و سپس با دستور show() خروجی را نمایش می‌دهیم.
-  df.orderBy('High', ascending=False).limit(1).select('Date').show()

Spark.sql:

•  spark.sql("""SELECT Date FROM stock 
•               WHERE High = (SELECT MAX(High) FROM stock)
•            """).show()

 

بخش سوم: میانگین ستون Close چقدر است؟

Spark dataframe

  •  در اینجا ابتدا با دستور agg({‘Close’:’mean’})، میانگین ستون Close را استخراج کرده و سپس با دستور show() خروجی را نمایش می‌دهیم.
-  df.agg({'Close':'mean'}).show()

بخش چهارم: مقدار ماکزیمم و مینیمم ستون Volume را مشخص کنید.

• Spark dataframe:

  •  در اینجا ابتدا با دستور agg({‘Volume’:’max’})، ماکزیمم مقدار ستون ‘Volume’ را استخراج می‌کنیم.
  •  سپس با دستور join، دیتافریم حاصل که تنها شامل یک ستون ‘max(Volume)’ و آن‌هم تنها با یک سطر است را با دیتافریم حاصل از agg({‘Volume’:’min’}) که تنها شامل یک ستون ‘min(Volume)’ و آن‌هم تنها با یک سطر است، ضرب دکارتی می‌کنیم (چونکه مشخص نکردیم که join بر روی چه ستونی انجام گیرد، به‌همین دلیل ضرب دکارتی انجام می‌‌شود و مقدار دو ستون ‘max(Volume)’ و ‘min(Volume)’ در یک سطر در کنارهم قرار می‌گیرند).
  • در نهایت خروجی را با دستور show() نمایش می‌دهیم.
- df.agg({'Volume':'max'}).join(df.agg({'Volume':'min'})).show()

Spark.sql:

spark.sql("SELECT MAX(Volume),MIN(Volume) FROM stock").show()

بخش پنجم: چند روز ستون Close کمتر از ۶۰دلار بوده است؟

Spark dataframe:

  •  در اینجا ابتدا با دستور filter(df.Close < 60)، سطرهایی که مقدار ستون ‘Close’ آنها کمتر از ۶۰ دلار بوده است را استخراج می‌کنیم.
  •  سپس با دستور count()، تعداد سطرهای حاصل را شمارش می‌نماییم.
-  df.filter(df.Close < 60).count()

Spark.sql:

spark.sql("SELECT count(*) FROM stock WHERE Close<60").show()

بخش ششم: Pearson correlation بین ستون‌های High و Volume چقدر است؟

Spark dataframe:

  • در اینجا با دستور corr(‘High’, ‘Volume’, method=’pearson’)، کورلیشن بین ستون ‘High’ و ‘Volume’ را با روش pearson محاسبه می‌کنیم.
  •  البته لازم به ذکر است که اگر از پارامتر method=’pearson’ نیز استفاده نمی‌کردیم مشکلی نبود، چونکه به‌طور پیش‌فرض از روش pearson برای محاسبه‌ی کورلیشن استفاده می‌شود.
df.corr('High','Volume',method='pearson')

spark.sql("select corr(High,Volume) from stock").show()

 

 

بخش هفتم: ماکزیمم ستون High در هر سال چقدر است؟

Spark dataframe:

  •  در اینجا ابتدا با دستور withColumn(‘year’, df.Date.substr(0,4))، یک ستون جدید به‌نام ‘year’ اضافه می‌کنیم که حاوی مقدار سال ستون ‘Date’ است.
    * لازم به ذکر است که در اینجا به‌جای df.Date.substr(0,4) می‌توانستیم از دستور year(‘Date’) نیز استفاده کنیم. ولی چونکه year تابعی از spark.sql است، از استفاده از آن خودداری نمودیم.
  • سپس با دستور GroupBy(‘year’).max(‘High’)، ماکزیمم مقدار ستون ‘High’ در هر سال را محاسبه می‌کنیم.
  • در نهایت، با دستور orderBy(‘year’)، خروجی را بر حسب سال و به‌صورت صعودی مرتب می‌کنیم و با دستور show() آنرا نمایش می‌دهیم
-  df_with_year = df.withColumn('year', df.Date.substr(0,4))
-  df_with_year.groupBy('year').max('High').orderBy('year').show()


sql:

•  spark.sql(""" SELECT year(Date) as year, max(High) 
•                FROM stock 
•                GROUP BY year 
•                ORDER BY year
•            """).show()

کار با دیتافریم‌ها/Spark SQL – راهکار دوم :‌ تبدیل یک دیتافریم به جدول موقت

بخش اول

کد DataFrame:

from pyspark.sql import SQLContext
#from pyspark.sql import Row
import pyspark.sql.functions as F
sqlContext = SQLContext(sc)
lines = sc.textFile("dbfs:/FileStore/tables/stock.csv")
parsed_lines = (lines.zipWithIndex()
                     .filter(lambda line_index: line_index[1] > 0)
                     .map(lambda line_index: line_index[0])
                     .map(lambda line: line.split(','))
                     .map(lambda line: [line[0]]+[float(record) for record in line[1:]])
                     #Method2 to create dataframe
                     #.map(lambda line: Row(Date=line[0], Open=float(line[1]), High=float(line[2]), Low=float(line[3]), Close=float(line[4]), Volume=float(line[5]), AdjClose=float(line[6])))
               )
#Method1 to create dataframe
df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"])
#Method3 to create dataframe
#df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dbfs:/FileStore/tables/stock.csv')
#Method1
df = df.withColumn("HV", df["High"] / df["Volume"])
#Method2
#df = df.withColumn("HV", F.col("High") / F.col("Volume"))
display(df.head(10))

کد SQL:

## This Part must be run after the previous one
df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"])
df.registerTempTable('df_table')
df_table = sqlContext.sql('''
                          SELECT *, High/Volume as HS from df_table
                          '''
                         )
display(df_table.head(10))

خروجی:

بخش دوم

کد DataFrame:

df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"])
(
  df.join(df.agg(F.max("High").alias("High")).select("High"), "High")
    .select(["Date","High"])
    .show()
)
df.registerTempTable('df_table')
df_table = sqlContext.sql('''
                          SELECT a.Date, a.High
                          FROM df_table a
                          INNER JOIN (
                              SELECT MAX(High) High
                              FROM df_table
                          ) b ON a.High = b.High
                          '''
                         )
df_table.show()

خروجی

توضیحات:

در بخش DataFrame، ابتدا با استفاده از تابع aggregation اقدام به پیدا کردن بالاترین مقدار در ستون High می‌کنیم. در اینجا نام ستون بازگردانده شده را با استفاده از دستور alias به همان نام High تغییر می‌دهیم تا در مراحل بعدی راحت‌تر بتوانیم به این ستون ارجاع بدهیم. با توجه به اینکه دستورات aggregation بر روی مقادیر کار می‌کنند و به ازای مقادیر امکان بازگرداندن کل ردیف مربوطه را ندارند، لازم است تا پس از یافتن بالاترین مقدار، با استفاده از join روی همان جدول و بر روی متغیر High، کل ردیف داده‌ای که شامل آن بالاترین مقدار است یافت شود. در انتها نیز تنها ستون‌های High و Date را با استفاده از تابع select باز می‌گردانیم و دیتافریم نهایی را با دستور show نمایش می‌دهیم.
در بخش SQL لازم است تا ابتدا با یک پرس و جو، بیشترین مقدار ستون High یافت شود. با توجه به اینکه توابع Aggregation در SQL امکان بازگرداندن کل ردیف را ندارند، در اینجا نیز لازم است تا یک عملیات JOIN صورت گیرد تا کل ردیف داده بازگردانده شود.

بخش سوم

کد DataFrame

df = sqlContext.createDataFrame(parsed_lines, ["Date","Open","High","Low","Close","Volume","Adj Close"])
df.agg(F.avg("Close").alias("averageClose")).show()

کد خروجی

df.registerTempTable('df_table')
df_table = sqlContext.sql('''
                          SELECT AVG(Close) averageClose FROM df_table
                          '''
                         )
df_table.show()

خروجی

توضیحات:

در بخش DataFrame، با استفاده از تابع agg و انجام عملیات avg (با استفاده از مجموعه توابع sql.functions)، مقدار میانگین خواسته شده محاسبه شده است. همچنین یک تغییر نام با استفاده از تابع alias به صورت زنجیره‌ای روی تابع aggregation میانگین نیز انجام شده است تا نام ستون خوانا‌تر باشد.
در بخش SQL نیز به راحتی با تابع Aggregation با نام AVG مقدار میانگین برای ستون خواسته شده محاسبه شده است.

بخش چهارم
کد DataFrame:

df.agg(F.max("Volume").alias("maxVolume"),F.min("Volume").alias("minVolume")).show()
df_table = sqlContext.sql('''
                          SELECT MAX(Volume) maxVolume, MIN(Volume) minVolume  FROM df_table
                          '''
                         )
df_table.show()

توضیحات:

در بخش DataFrame، با استفاده از تابع agg و انجام دو عملیات max و min (با استفاده از مجموعه توابع sql.functions) به صورت همزمان، مقادیر خواسته شده محاسبه شده است. نتیجه هر دو مقدار در یک دیتافریم واحد بازگردانده شده است.

در بخش SQL نیز به با استفاده از توابع MIN و MAX پرس و جو انجام شده است.

بخش پنجم
کد DataFrame:

df.agg(F.count(F.when(F.col("Close") < 60, True)).alias("Count")).show()

کد SQL

df_table = sqlContext.sql('''
                          SELECT COUNT(*) Count FROM df_table
                          WHERE Close < 60
                          '''
                         )
df_table.show()

خروجی

توضیحات:

در بخش DataFrame، با استفاده از تابع agg و انجام عملیات count با شرایطی که تابع when مشخص کرده است، مقدار شمارش صورت گرفته است.

در بخش SQL نیز به با استفاده از توابع COUNT و شرط WHERE پرس و جو انجام شده است.

بخش ششم

کد DataFrame:

print("The pearson correlation between High and Volume is: ")
print(df.corr("High","Volume",method="pearson"))

کد SQL

df_table = sqlContext.sql('''
                          SELECT ((SUM(High*Volume)-((SUM(High)*SUM(Volume))/COUNT(*)))/(SQRT(SUM(High*High)-((SUM(High)*SUM(High))/COUNT(*)))*SQRT(SUM(Volume*Volume)-((SUM(Volume)*SUM(Volume))/COUNT(*)))))
                          AS pearson
                          FROM df_table
                          '''
                         )
df_table.show()

 

توضیحات:

در بخش DataFrame، با استفاده از تابع corr مقدار Pearson Correlation برای ستون خواسته شده محاسبه شده است. این تابع در میان توابع Spark SQL موجود می‌باشد و در حال حاضر تنها از method ای برابر pearson پشتیبانی می‌کند.
در بخش SQL، در صورتی که بخواهیم به توابع استاندارد SQL پایبند باشیم، تابعی برای محاسبه Pearson Correlation نداریم. به همین منظور و با توجه به جذابیت مسئله، مقدار Pearson Correlation را با استفاده از توابع پایه SUM، COUNT و SQRT پیاده‌سازی کرده‌ایم. محض اطلاع، فرمول Pearson Correlation به صورت زیر تعریف می‌گردد:

بخش هفتم

کد DataFrame:

udf_year = udf(lambda x: x[:4])
df.withColumn("year",udf_year("Date")).groupby("year").agg(F.max("High").alias("High")).sort(F.asc("year")).show()
df_table = sqlContext.sql('''
                          SELECT SUBSTRING(Date,1,4) as year, MAX(High) as High FROM df_table
                          GROUP BY SUBSTRING(Date,1,4)
                          ORDER BY year
                          '''
                         )
df_table.show()

 

توضیحات:
در بخش DataFrame، با توجه به اینکه مقدار سال در فیلد Date الحاق است، با استفاده از تابع udf_year، به ازای هر رکورد ستون Date، چهار کاراکتر اول که مقادیر سال می‌باشد را جدا کرده‌ایم و توسط تابع withColumn اقدام به تولید یک ستون جدید با نام year کرده‌ایم. در ادامه با استفاده از تابع groupby روی مقادیر year، عملیات Aggregation از نوع max را روی مقدار ستون High انجام داده‌ایم. در نهایت نتیجه را با استفاده از تابع sort بر روی مقادیر year به صورت صعودی برگردانده‌ایم.
در بخش SQL، عملیات GROUP BY را بر روی مقادیر سال انجام داده‌ایم. مقادیر سال به راحتی با اعمال تابع SUBSTRING بر روی چهار کاراکتر ستون Date بدست می‌آیند. در نهایت این مقدار را به صورت صعودی مرتب کرده و به همراه MAX مقدار ستون High بر می‌گردانیم.

Spark GraphX

بخش اول
کد:

from pyspark.sql import SQLContext
from graphframes import GraphFrame
import pyspark.sql.functions as F
sqlContext = SQLContext(sc)
lines = sc.textFile("dbfs:/FileStore/tables/Vertex.txt")
parsed_lines = lines.map(lambda line:line.split('\t'))
vertex_df =  sqlContext.createDataFrame(parsed_lines,["id","name"])
lines = sc.textFile("dbfs:/FileStore/tables/edges.txt")
parsed_lines = lines.map(lambda line:line.split('\t'))
edges_df =  sqlContext.createDataFrame(parsed_lines,["src","dst"])
g = graphFrame(vertex_df,edges_df)
g.edges.limit(5).show()
g.vertices.limit(5).show()

توضیحات:

با توجه به توضیحات کتابخانه GraphFrames، لازم است تا ابتدا دو دیتافریم، یکی شامل اطلاعات نودهای گراف و یکی شامل اطلاعات یال‌های گراف ایجاد کنیم. دیتافریم اطلاعات نودهای گراف می‌بایست شامل ستونی با نام id باشد که این ستون نشان‌دهنده شناسه یکتای هر نود باشد. دیتافریم یال‌ها نیست می‌بایست شامل دو ستون با نام‌های src و dst باشد که هر یک نشان‌دهنده شناسه یکتای نود مبدأ و مقصد در یال باشد.

در اینجا با خواندن فایل‌های نودها و یال‌ها، با استفاده از یک map اقدام به جداسازی هر خط با جداساز «Tab» کرده و در ادامه به ازای هر خط لیستی از اجزای آن خط بر می‌گردانیم. پس از آن استفاده از تابع createDataFrame و تعیین نام ستون‌های دیتافریم، برای هر کدام از لیست نودها و یال‌ها یک دیتافریم می‌سازیم. در مرحله بعد با استفاده از کلاس GraphFrame این دو دیتافریم را به عنوان ورودی به این کلاس می‌دهیم تا گراف مربوطه ساخته شود. در ادامه با صدا زدن متدهای edges و vertices دیتافریم‌هایی شامل اطلاعات نودها و یال‌های گراف ساخته شده قابل دریافت است.

بخش دوم
کد:

g.vertices.join(g.inDegrees.sort(F.desc("inDegree")).limit(1),"id").show()
g.vertices.join(g.outDegrees.sort(F.desc("outDegree")).limit(1),"id").show()

خروجی

 

توضیحات:
در این مرحله با صدا زدن توابع inDegrees و outDegrees، دو دیتافریم که هر یک شامل دو ستون id و in/outDegree است بدست می‌آید. در این مرحله لازم است تا برای یافتن بیشترین درجه، این دیتافریم‌ها را بر اساس ستون in/outDegree مرتب کرده و با دستور limit(1) اولین عنصر آن‌ها را بیابیم. در این مرحله تنها اطلاعات id نود با بیشترین درجه را در اختیار داریم، فلذا لازم است یک عملیات join بر اساس ستون id با دیتافریم نودها که با متد vertices از گراف بدست می‌آید انجام شده و اطلاعات کامل نود مربوطه نمایش داده شود.

بخش سوم

کد:

cdf = g.connectedComponents()
cdf.groupBy("component").count().sort(F.desc("count")).show()

توضیحات:

در این بخش از تابع connectedComponents بر روی گراف استفاده می‌کنیم. این تابع یک دیتافریم بر می‌گرداند که شامل دو ستون id و component می‌باشد که به ازای هر نود نشان‌میدهد که آن نود عضو کدام Component هست. شناسه‌های Component نیز به صورت تصادفی و نامرتب توسط کتابخانه انتخاب می‌شوند.

پس از دریافت دیتافریم ناشی از اجرای تابع connectedComponents، به ازای هر مقدار از ستون component یک عملیات aggregation با شمارش تعداد مقادیر آن انجام می‌گیرد. در نهایت نیز نتایج به صورت نزولی بر اساس تعداد اعضای آن Component صورت می‌گیرد.

توضیحات:

در این بخش از تابع connectedComponents بر روی گراف استفاده می‌کنیم. این تابع یک دیتافریم بر می‌گرداند که شامل دو ستون id و component می‌باشد که به ازای هر نود نشان‌میدهد که آن نود عضو کدام Component هست. شناسه‌های Component نیز به صورت تصادفی و نامرتب توسط کتابخانه انتخاب می‌شوند.

پس از دریافت دیتافریم ناشی از اجرای تابع connectedComponents، به ازای هر مقدار از ستون component یک عملیات aggregation با شمارش تعداد مقادیر آن انجام می‌گیرد. در نهایت نیز نتایج به صورت نزولی بر اساس تعداد اعضای آن Component صورت می‌گیرد.
بخش چهارم
کد:

g.vertices.join(g.inDegrees.sort(F.desc("inDegree")).limit(10),"id").sort(F.desc("inDegree")).show()

خروجی

توضیحات:

در این بخش همانند بخش دوم با اجرای تابع inDegrees روی گراف، شناسه نودها همراه تعداد درجات ورودی ‌آن‌ها بدست می‌آید. در ادامه با مرتب‌سازی روی inDegree و جداسازی ۱۰ مقدار بیشترین درجه، شناسه ۱۰ نود با بیشترین درجه ورودی نتیجه می‌شود. حال لازم است تا مابقی مشخصات این نودها با انجام عملیات join روی دیتافریم ناشی از اجرای vertices بدست آمده و در اینجا با توجه به اینکه مرتب‌سازی انجام شده به هم میریزد، مجدد یک مرتب سازی نزولی روی مشخصه inDegree انجام گردد.

نمایش گرافیکی نودها

کد:

import networkx as nx
import matplotlib.pyplot as plt
edges = g.edges
nxGraph=nx.Graph()
for edge in edges.select('src','dst').collect():
  nxGraph.add_edge(edge['src'],edge['dst'])
  
plt.figure(1,figsize=(12,12))
nx.draw(nxGraph, node_size=10, linewidths=0.5, width=0.5)

 

توضیحات:

در این بخش می‌خواهیم با استفاده از کتابخانه NetworkX از پایتون، گراف مورد نظر را رسم کنیم. برای اینکار ابتدا لیست یال‌ها را با اجرای متد edges از گراف به صورت یک دیتافریم استخراج می‌کنیم. با توجه به اینکه در نمایش گراف نمیخواهیم اسامی نودها را نمایش دهیم (با توجه به طولانی بودن اسامی و همچنین تعداد بالای نودهای گراف)، در این مرحله نیازی به استخراج دیتافریم نودها نمی‌باشد.
در مرحله بعد با اجرای یک حلقه روی دیتافریم collect شده، یال‌ها را به گراف NetworkX اضافه می‌‌کنیم. در ادامه با تنظیم سایز نمودار به مقدار مناسب، با استفاده از تابع draw روی گراف NetworkX گراف را با پارامترهای مناسب رسم می‌نماییم. لازم به ذکر است که NetworkX برای رسم گراف از نمودارهای Matplotlib بهره می‌برد و به همین جهت می‌توان جزئیات نمودار را با دستورات این کتابخانه تنظیم کرد.

چه رتبه ای می‌دهید؟

میانگین ۵ / ۵. از مجموع ۱

اولین نفر باش

title sign
معرفی نویسنده
مجتبی بنائی
مقالات
5 مقاله توسط این نویسنده
محصولات
3 دوره توسط این نویسنده
مجتبی بنائی

دانشجوی دکترای نرم‌افزار دانشگاه تهران، مدرس دانشگاه و فعال در حوزه مهندسی‌ نرم‌افزار و علم داده که تمرکز کاری خود را در چند سال اخیر بر روی مطالعه و تحقیق در حوزه کلان‌داده و تولید محتوای تخصصی و کاربردی به زبان فارسی و انتشار آنها در سایت مهندسی داده گذاشته است. مدیریت پروژه‌های نرم‌افزاری و طراحی سامانه‌های مقیاس‌پذیر اطلاعاتی از دیگر فعالیتهای صورت گرفته ایشان در چند سال گذشته است.مهندس بنائی همچنین: مدرس دوره‌های BigDataی نیک آموز، دانشجوی دکترای نرم افزار و مدرس دانشگاه تهران، مجری و مشاور پروژه‌های کلان‌داده در سطح ملی و بین المللی، فعال در حوزه تولید محتوای تخصصی در زمینه پردازش داده می باشد.

پروفایل نویسنده
title sign
دیدگاه کاربران

الکامپ ۲۷ با خبرهای هیجان انگیز در راه است | تنها چند روز تا شروع جشنواره 
از شروع جشنواره با خبر شو
close-image