Wednesday, 13 December 2023

PySpark -

# Documentation Links

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://spark.apache.org/developer-tools.html

https://spark.apache.org/examples.html

https://spark.apache.org/docs/latest/api/python/reference/

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#tuning


# To practice PySpark in online use below code in Google Collab notebooks 

-- Google Collab

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

!tar xf spark-3.1.1-bin-hadoop3.2.tgz

!pip install -q findspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark 

#################################################################################

# Taking the List and converting in to RDD .

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)

for i in rdd.collect():
    print(i)
#Flatmap    
rdd2=rdd.flatMap(lambda x: x.split(" "))
for k in rdd2.collect():
    print(k)

##########################################################################
#Converting RDD to Dataframe
rddtodf = sc.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])
rddtodf.show()
##########################################################################
#Displaying RDD data. Here show() will not work
myData = sc.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
myData.collect()
##########################################################################
#Reading Data from Files into Collections
#df_california = spark.read.csv.options(header='true',inferschema='true').load("/content/sample_data/california_housing_test.csv",header=True)
#df_california.show()
df_california=spark.read.format('csv').options(header='true').load('/content/sample_data/california_housing_test.csv')
df_california.show()
##########################################################################
# Reading RDD's and applying joins
#Joining RDD's
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d',
15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()
##########################################################################
#Defining Schema Using struct
from pyspark.sql import *
from pyspark.sql import *


k = [{'name': 'Alice', 'age': 1}]
sc.parallelize(k).collect()
#Defining schema to the RDD
Schema=StructType([StructField("name",StringType(),True),StructField("age",IntegerType(),True)])
datafrme=spark.createDataFrame(k,Schema)
datafrme.collect()
##########################################################################
#Creating Temporary view and creating table from temporary view
datafrme.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT * from table1")
df2.collect()
###########################################################################
# Implementing Intersect between two values
rdd1intersect = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2intersect = sc.parallelize([1, 6, 2, 3, 7, 8])
resultinter=rdd1.intersection(rdd2)
resultinter.collect()
###########################################################################
# Implementing Map function on RDD's
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
###########################################################################
#creating directories through magic commands
%mkdir /content/sample_data/Parquet/
%cd /content/sample_data/Parquet/
%ls
%mkdir /content/sample_data/Parquet_out/
###########################################################################
#Reading Parquet files from the Drive
df_parquet = spark.read.format("parquet").load('/content/sample_data/Parquet/')
#Writing Parquet files into Storage

df_parquet.write.format("parquet").mode("overwrite").save("/content/sample_data/Parquet_out/")
###########################################################################
# Writing Parquet file with overwrite mode
df_parquet.write.mode("overwrite").saveAsTable("parqu_tab")
df1_parquet = spark.sql("SELECT timeperiod,flow1,flow2,flow3 from parqu_tab")
df1_parquet.show()
###########################################################################
# Actions on RDD
>>> df.dtypes                  
>>> df.show()                    
>>> df.head()                    
>>> df.first(n)                  
>>> df.schema                    
>>> df.describe().show()        
>>> df.columns                  
>>> df.count()                  
>>> df.distinct().count()        
>>> df.printSchema()            
>>> df.explain()  
###########################################################################
#Adding a Column
from pyspark.sql.functions import col
df1_parquet1 = df1_parquet.withColumn('flow4',col('flow1'))
df1_parquet1.show()
###########################################################################
#Update a Column
df3 = df1_parquet1.withColumn("salary",col("flow2")*100)
df3.show()
###########################################################################
#Renaming a column
df4 = df3.withColumnRenamed("salary","salary_modified")
df4.show()
###########################################################################
#Dropping a Column
df5=df4.drop("salary_modified")
df5.show()
###########################################################################
#Adding constant value to a dataframe
from pyspark.sql.functions import lit

df6 = df5.withColumn("Country", lit("USA"))
df6.show()
###########################################################################
#Doing Filter Operation
df7=df6.filter(df6["flow4"]>3)
df7.show()
###########################################################################
#Doing Order by Operation
#df8=df7.sort(df7.flow4.desc())
#df8.show()
#df8=df7.sort("flow4", ascending=False)
#df8.show()
df8=df7.orderBy(["flow4","flow3"],ascending=[0,1])
df8.show()
###########################################################################
#Deletng and Relacing the Null values
df9=df8.na.fill(20)
df9.show()
#df.na.drop().show()
#df.na \ .replace(10, 20) \ .show()
###########################################################################
#Reading Data from Files into Collections
orders_file = open('/content/sample_data/orders.txt')
orders_raw = orders_file.read()
# here orders_raw is read as String file
orders_file.close ()
orders = orders_raw.splitlines()
# It will convert each row into string (orders) and put it in list
#len(orders)
#orders[0]
orders[0:10]
#for i in orders[0:10]: print(i)
###########################################################################
#Defining the Schema
orders_df=spark.read.csv('/content/sample_data/orders.txt').toDF('order_id','order_date','order_customer_id','order_status')
orders_df.printSchema()
###########################################################################
# Here first coloumn is the coulun name and second column is the transformation in WithColumn . If we change the name of the Columnname it will create new Column in the dataframe
from pyspark.sql.types import *
orders_wth=orders_df.withColumn('order_id',orders_df.order_id.cast(IntegerType())). \
withColumn('order_customer_id',orders_df.order_customer_id.cast(IntegerType()))
orders_wth.printSchema()
###########################################################################
#DSL Language
from pyspark.sql.functions import *
orders_df.select(substring('order_date',1,20).alias('order_date')).show()
from pyspark.sql.functions import *
help(functions.substring)
----------------------------------------------------------------------
from pyspark.sql.functions import *
#orders_df1=orders_df.select(orders_df.order_id,orders_df.order_date,date_format(orders_df.order_date,"YYYYMM"))
odrers_df1=orders_df.select('*')
###########################################################################
# Without alias applying date function
from pyspark.sql.functions import *
orders_strn.select(date_format('order_date', 'MM/dd/yyy').alias('date')).collect()
-----------------------------------------------------------------------------------
# Importing Functions in the alias format
# When ever column names are passed with out dataframe name it
need to be like dataframename.column name . If they are passed directly
we can pass in single quotes
from pyspark.sql import functions as f
orders_strn.select(orders_strn.order_id,orders_strn.order_date,f.date_format(orders_strn.order_date, "MMyyyy").alias('dateformat')).show()
###########################################################################
#left outer join
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').show()
------------------------------------------------------------------------------------
#left outer join and applying where clause
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').where(order_item_join.order_itemid.isNotNull()).show()
------------------------------------------------------------------------------------

# Here passsing Column types as Strings.
order_item_join.groupby('orderitem_order_id').agg(round(sum('orderitem_order_id'),2).alias('Order_Revenue')).show()#left outer join and and selecting from orders_join table
orders_join.join(order_item_join,orders_join.order_id==order_item_join.order_itemid,'left').where(order_item_join.order_itemid.isNotNull()). \
select (orders_join.order_id,orders_join.order_date,orders_join.order_customer_id,orders_join.order_status).show()
###########################################################################
# Here passsing Column types as Strings.
order_item_join.groupby('orderitem_order_id').agg(round(sum('orderitem_order_id'),2).alias('Order_Revenue')).show()
###########################################################################
# Joining dataframes and doing sum
from pyspark.sql.functions import sum, round
orders.where('order_status in ("COMPLETE", "CLOSED")'). \
join(orderItems, orders.order_id == orderItems.order_item_order_id). \
groupBy('order_date', 'order_item_product_id'). \
agg(round(sum('order_item_subtotal'), 2).alias('revenue')).show()
###########################################################################

What is Hash Partitioning in Bucketing
  • Here's a breakdown of how hash functions are applied to specific columns in various contexts:

    Databases:

    • Hash partitioning:
      • Divides a table into partitions based on the hash values of a specific column.
      • Rows with the same hash value are stored in the same partition, improving query performance for queries that filter on that column.
    • Hash indexes:
      • Create indexes using hash values of a column for faster lookups and equality comparisons.
      • Efficiently locate rows with specific values in the indexed column.

    Data structures:

    • Hash tables:
      • Store data in key-value pairs, using a hash function to map keys to indices in an array.
      • The hash value of a key determines its storage location, enabling fast lookups and insertions in O(1) time on average.

    Programming languages:

    • Built-in hash functions:
      • Most languages offer built-in hash functions (e.g., hash() in Python, GetHashCode() in C#) to generate hash values from data.
      • These hash values can be used for various purposes, such as:
        • Checking for duplicates
        • Implementing hash-based data structures
        • Generating unique identifiers
        • Creating hash-based data structures like sets and dictionaries
##################################################################################

No comments:

Post a Comment

SQL -

 Window Functions -  Window function: pySpark window functions are useful when you want to examine relationships within group of data rather...