Column Level Encryption using PySpark
2 mins read

Column Level Encryption using PySpark

If you ever get a requirement to encrypt some of the column’s (Sensitive/Personal Identifiable Information) data before storing it anywhere, then you are at the right place.
Below step by step code block can help you achieve this.

In this demonstration, Fernet library will be used to generate key which will further be used to encrypt and decrypt the data. For simplification, one sample dataframe will be created using Spark on which the encryption logic will be applied.

Script has been executed in Synapse spark notebook.

  • Generate Fernet Key
# Generate key using Fernet library
  from cryptography.fernet import Fernet
  key = Fernet.generate_key()
  • Define User-Defined Function
#Define Encrypt User Defined Function 

def encrypt_val(clear_text,MASTER_KEY):
    from cryptography.fernet import Fernet
    f = Fernet(MASTER_KEY)
    clear_text_b=bytes(clear_text, 'utf-8')
    cipher_text = f.encrypt(clear_text_b)
    cipher_text = str(cipher_text.decode('ascii'))
    return cipher_text

# Define decrypt user defined function 
def decrypt_val(cipher_text,MASTER_KEY):
    from cryptography.fernet import Fernet
    f = Fernet(MASTER_KEY)
    clear_val=f.decrypt(cipher_text.encode()).decode()
    return clear_val 
  • Create Dataframe
# Create dataframe
  from pyspark.sql import SparkSession

  columns = ["Name","Phone"]
  data = [("Tom", "8989767656"), ("Jhon", "9997878676"), ("Sam", "8990344323")]

  spark = SparkSession.builder.appName('Enc').getOrCreate()
  rdd = spark.sparkContext.parallelize(data)

  dfFromRDD1 = rdd.toDF()

  columns = ["Name","Phone"]
  df = rdd.toDF(columns) 
  df.show()
  • Apply Encryption
# Apply encryption  
  from pyspark.sql.functions import udf, lit, md5
  from pyspark.sql.types import StringType
# Register UDF's
  encrypt = udf(encrypt_val, StringType())
  decrypt = udf(decrypt_val, StringType())

# Fetch key from secrets
  # encryptionKey = dbutils.preview.secret.get(scope = "encrypt", key = 
"fernetkey")
encryptionKey = key
# Encrypt the data 
  #df = spark.table("Test_Encryption")
  encrypted = df.withColumn("Phone", encrypt("Phone",lit(encryptionKey)))
  encrypted.show()
  • Apply Decryption
# Apply decryption
  decrypted = encrypted.withColumn("Phone", 
  decrypt("Phone",lit(encryptionKey)))
  decrypted.show()

Feedback is always appreciated. Please do comment if you have any.
Cheers 🥂

Leave a Reply

Your email address will not be published. Required fields are marked *