# Documentation Links
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://spark.apache.org/developer-tools.html
https://spark.apache.org/examples.html
https://spark.apache.org/docs/latest/api/python/reference/
https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#tuning
# To practice PySpark in online use below code in Google Collab notebooks
-- Google Collab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark
#################################################################################
# Taking the List and converting in to RDD .
data = ["Project Gutenberg’s",
"Alice’s Adventures in Wonderland",
"Project Gutenberg’s",
"Adventures in Wonderland",
"Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for i in rdd.collect():
print(i)
#Flatmap
rdd2=rdd.flatMap(lambda x: x.split(" "))
for k in rdd2.collect():
print(k)
##########################################################################
#Converting RDD to Dataframe
rddtodf = sc.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])
rddtodf.show()
##########################################################################
#Displaying RDD data. Here show() will not work
myData = sc.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
myData.collect()
##########################################################################
#Reading Data from Files into Collections
#df_california = spark.read.csv.options(header='true',inferschema='true').load("/content/sample_data/california_housing_test.csv",header=True)
#df_california.show()
df_california=spark.read.format('csv').options(header='true').load('/content/sample_data/california_housing_test.csv')
df_california.show()
##########################################################################
# Reading RDD's and applying joins
#Joining RDD's
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d',
15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()
##########################################################################
#Defining Schema Using struct
from pyspark.sql import *
from pyspark.sql import *
k = [{'name': 'Alice', 'age': 1}]
sc.parallelize(k).collect()
#Defining schema to the RDD
Schema=StructType([StructField("name",StringType(),True),StructField("age",IntegerType(),True)])
datafrme=spark.createDataFrame(k,Schema)
datafrme.collect()
##########################################################################
#Creating Temporary view and creating table from temporary view
datafrme.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT * from table1")
df2.collect()
###########################################################################
# Implementing Intersect between two values
rdd1intersect = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2intersect = sc.parallelize([1, 6, 2, 3, 7, 8])
resultinter=rdd1.intersection(rdd2)
resultinter.collect()
###########################################################################
# Implementing Map function on RDD's
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
###########################################################################
#creating directories through magic commands
%mkdir /content/sample_data/Parquet/
%cd /content/sample_data/Parquet/
%ls
%mkdir /content/sample_data/Parquet_out/
###########################################################################
#Reading Parquet files from the Drive
df_parquet = spark.read.format("parquet").load('/content/sample_data/Parquet/')
#Writing Parquet files into Storage
df_parquet.write.format("parquet").mode("overwrite").save("/content/sample_data/Parquet_out/")
###########################################################################
# Writing Parquet file with overwrite mode
df_parquet.write.mode("overwrite").saveAsTable("parqu_tab")
df1_parquet = spark.sql("SELECT timeperiod,flow1,flow2,flow3 from parqu_tab")
df1_parquet.show()
###########################################################################
# Actions on RDD
>>> df.dtypes
>>> df.show()
>>> df.head()
>>> df.first(n)
>>> df.schema
>>> df.describe().show()
>>> df.columns
>>> df.count()
>>> df.distinct().count()
>>> df.printSchema()
>>> df.explain()
###########################################################################
#Adding a Column
from pyspark.sql.functions import col
df1_parquet1 = df1_parquet.withColumn('flow4',col('flow1'))
df1_parquet1.show()
###########################################################################
#Update a Column
df3 = df1_parquet1.withColumn("salary",col("flow2")*100)
df3.show()
###########################################################################
#Renaming a column
df4 = df3.withColumnRenamed("salary","salary_modified")
df4.show()
###########################################################################
#Dropping a Column
df5=df4.drop("salary_modified")
df5.show()
###########################################################################
#Adding constant value to a dataframe
from pyspark.sql.functions import lit
df6 = df5.withColumn("Country", lit("USA"))
df6.show()
###########################################################################
#Doing Filter Operation
df7=df6.filter(df6["flow4"]>3)
df7.show()
###########################################################################
#Doing Order by Operation
#df8=df7.sort(df7.flow4.desc())
#df8.show()
#df8=df7.sort("flow4", ascending=False)
#df8.show()
df8=df7.orderBy(["flow4","flow3"],ascending=[0,1])
df8.show()
###########################################################################
#Deletng and Relacing the Null values
df9=df8.na.fill(20)
df9.show()
#df.na.drop().show()
#df.na \ .replace(10, 20) \ .show()
###########################################################################
#Reading Data from Files into Collections
orders_file = open('/content/sample_data/orders.txt')
orders_raw = orders_file.read()
# here orders_raw is read as String file
orders_file.close ()
orders = orders_raw.splitlines()
# It will convert each row into string (orders) and put it in list
#len(orders)
#orders[0]
orders[0:10]
#for i in orders[0:10]: print(i)
###########################################################################
#Defining the Schema
orders_df=spark.read.csv('/content/sample_data/orders.txt').toDF('order_id','order_date','order_customer_id','order_status')
###########################################################################
# Here first coloumn is the coulun name and second column is the transformation in WithColumn . If we change the name of the Columnname it will create new Column in the dataframe
from pyspark.sql.types import *
orders_wth=orders_df.withColumn('order_id',orders_df.order_id.cast(IntegerType())). \
withColumn('order_customer_id',orders_df.order_customer_id.cast(IntegerType()))
###########################################################################
#DSL Language
from pyspark.sql.functions import *
orders_df.select(substring('order_date',1,20).alias('order_date')).show()
from pyspark.sql.functions import *
help(functions.substring)
----------------------------------------------------------------------
from pyspark.sql.functions import *
#orders_df1=orders_df.select(orders_df.order_id,orders_df.order_date,date_format(orders_df.order_date,"YYYYMM"))
odrers_df1=orders_df.select('*')
###########################################################################
# Without alias applying date function
from pyspark.sql.functions import *
orders_strn.select(date_format('order_date', 'MM/dd/yyy').alias('date')).collect()
-----------------------------------------------------------------------------------
# Importing Functions in the alias format
# When ever column names are passed with out dataframe name it
need to be like dataframename.column name . If they are passed directly
we can pass in single quotes
from pyspark.sql import functions as f
orders_strn.select(orders_strn.order_id,orders_strn.order_date,f.date_format(orders_strn.order_date, "MMyyyy").alias('dateformat')).show()
###########################################################################
#left outer join
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').show()
------------------------------------------------------------------------------------
#left outer join and applying where clause
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').where(order_item_join.order_itemid.isNotNull()).show()
------------------------------------------------------------------------------------
# Here passsing Column types as Strings.
order_item_join.groupby('orderitem_order_id').agg(round(sum('orderitem_order_id'),2).alias('Order_Revenue')).show()#left outer join and and selecting from orders_join table
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').where(order_item_join.order_itemid.isNotNull()). \
select (orders_join.order_id,orders_join.order_date,orders_join.order_customer_id,orders_join.order_status).show()
###########################################################################
# Here passsing Column types as Strings.
order_item_join.groupby('orderitem_order_id').agg(round(sum('orderitem_order_id'),2).alias('Order_Revenue')).show()
###########################################################################
# Joining dataframes and doing sum
from pyspark.sql.functions import sum, round
orders.where('order_status in ("COMPLETE", "CLOSED")'). \
join(orderItems, orders.order_id == orderItems.order_item_order_id). \
groupBy('order_date', 'order_item_product_id'). \
agg(round(sum('order_item_subtotal'), 2).alias('revenue')).show()
###########################################################################
What is Hash Partitioning in Bucketing
##################################################################################
No comments:
Post a Comment