Azure Cloud, Databricks

Azure Cloud

Azure Key Vault

This tutorial explains how to save the keys of a blob storage in Azure Key Vault (and use that connection in Azure Databricks)

This tutorial <https://microsoft-bitools.blogspot.com/2020/02/use-azure-key-vault-for-azure-databricks.html> explains how to store secrets in Azure Key Vaults

Note that within Azure Databricks, if the workspace was created with Standard plan, during the creation of the secret scope (by adding #secrets/createScope to the workspace URL, so that url is like https://<location>.azuredatabricks.net/?o=<orgID>#secrets/createScope), in the Manage Principal option, we need to select the “users”, otherwise it will not work.

Azure Databricks

Sasha Dittmann’s lectures “Databricks MLOps - Deploy Machine Learning Model On Azure”. Extremely useful to link Azure Databricks and Azure DevOps:

https://www.youtube.com/watch?v=NLXis7FlnMM

https://www.youtube.com/watch?v=HL36Q-eU5wU&t=198s

https://www.youtube.com/watch?v=fv3p3r3ByfY&t=1016s

Here is the full chain with Databricks’ MLflow and azure ML: https://databricks.com/blog/2020/10/13/using-mlops-with-mlflow-and-azure.html Very good tutorial.

This tutorial explains how to connect an Azure Blob storage to Azure Databricks within Databricks notebooks (having the keys of the blob storage in Azure Key Vault).

Deploying an MLflow model as a container on ACI and AKS: https://docs.azuredatabricks.net/_static/notebooks/mlflow/mlflow-quick-start-deployment-azure.html

Databricks workflow with databricks-connect: https://menziess.github.io/howto/enhance/your-databricks-workflow/

DBX - Databricks Extended CLI (supersedes Databricks-connect)

Main documentation:

Some great examples:

What are the necessary steps needed in order to deploy and launch a custom code?

For a project which is meant to be deployed on the databricks platform as a package, use:

dbx deploy --jobs=validation --deployment-file=./conf/deployment-validation.json
dbx launch --job=validation --trace

For a code which is just deployed and run (without installing the project as a package on databricks), use:

dbx deploy --jobs=validation --no-rebuild --no-package --deployment-file=./conf/deployment-validation.json
dbx launch --job=validation --trace

Databricks-connect

Intro blog from databricks: https://databricks.com/blog/2019/06/14/databricks-connect-bringing-the-capabilities-of-hosted-apache-spark-to-applications-and-microservices.html

Also, more recent one (2021): https://docs.databricks.com/dev-tools/databricks-connect.html

To install it: https://menziess.github.io/howto/install/databricks-connect/ (short intro)

See also https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect for a in-depth documentation

Good example of use of databricks-connect with pyspark UDF: https://medium.com/swlh/productionizing-a-spark-job-with-databricks-notebook-dd950a242c7d

How is databricks-connect maintained: https://pypi.org/project/databricks-connect/#history

Before installing databricks-connect, we need to uninstall pyspark, since they might conflict:

$ pip uninstall pyspark

Then install databricks-connect with the right version, the one that matches the databricks cluster version:

$ pip install -U databricks-connect==7.3.*

Then we can configure the connection: https://docs.databricks.com/dev-tools/databricks-connect.html#step-2-configure-connection-properties

Example here: https://menziess.github.io/howto/install/databricks-connect/

$ databricks-connect configure The current configuration is: * Databricks Host: https://westeurope.azuredatabricks.net/ * Databricks Token: dapi5c376de3a2a54a2b03016c8c3b123456 (build it yourself from settings) * Cluster ID: 0214-195926-aptin821 * Org ID: 3892784943666666 (get it from /?o= argument in URL of cluster) * Port: 8787

Run databricks-connect test to test your installation. You’ll hopefully see something along the lines of:

$ databricks-connect test

Tricks to set-up and optimize databricks-connect: https://dev.to/aloneguid/tips-and-tricks-for-using-python-with-databricks-connect-593k

Fixing Out-of-Memory Issues. Often when using Databricks Connect you might encounter an error like Java Heap Space etc. This simply means your local spark node (driver) is running out of memory, which by default is 2Gb. If you need more memory, it’s easy to increase it. First, find out where PySpark’s home directory is:

$ databricks-connect get-spark-home /home/philippe/Documents/Github/Time-series-prediction/.venv/lib/python3.7/site-packages/pyspark

This should have a subfolder conf (create it if it doesn’t exist). And a file spark-defaults.conf (again, create if doesn’t exist). Full file path would be /home/philippe/Documents/Github/Time-series-prediction/.venv/lib/python3.7/site-packages/pyspark/conf/spark-defaults.conf. Add a line:

spark.driver.memory 8g (or 4g)

List of limitations of databricks-connect: https://datathirst.net/blog/2019/3/7/databricks-connect-limitations

How to connect data (azure storage for example) and also explore dbutils?

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

setting = spark.conf.get("spark.master")
if "local" in setting:
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)  # HERE spark!  (in some places I saw spark.sparkContext, but wrong/outdated)
else:
    print("Do nothing - dbutils should be available already")

print(setting)
# local[*] #when running from local laptop
# spark... #when running from databricks notebook

print(dbutils.fs.ls("dbfs:/"))

# suppose the mnt/ is ALREADY mounted in your databricks cluster (do it in databricks, not from local)
cwd = "/dbfs/mnt/demo/"

# read from mnt point (could be Azure storage mounted there!)
df = spark.read.csv("/mnt/demo/sampledata.csv")
df.show()

+---+----------+---------+
|_c0|       _c1|      _c2|
+---+----------+---------+
| id| firstname| lastname|
|  1|        JC|   Denton|
+---+----------+---------+

# write to mount point
(df.write
   .mode("overwrite")
   .parquet("/mnt/demo/sampledata_copy.parquet"))

Databricks CLI

Main doc: https://docs.databricks.com/dev-tools/cli/index.html

Installation and configuration:

# installation
pip install databricks-cli

# configuration
databricks configure --token

> Databricks Host (should begin with https://): https://yourpath.azuredatabricks.net
> Token: (put your token, get it from "Generate tokens" in User Settings)

After you complete the prompts, your access credentials are stored in the file ~/.databrickscfg on Unix, Linux, or macOS, or %USERPROFILE%\.databrickscfg on Windows

# list clusters:
databricks clusters list
> 1211-084728-chalk447  small_73ML   TERMINATED
> 1217-223436-cab783    job-6-run-1  TERMINATED
> 1217-222539-aunt76    job-5-run-1  TERMINATED

# delete a cluster permanently:
databricks clusters permanent-delete --cluster-id 1217-223436-cab783

# check again:
databricks clusters list
> 1211-084728-chalk447  small_73ML   TERMINATED
> 1217-222539-aunt76    job-5-run-1  TERMINATED

Note for multiple workspaces:

# Multiple connection profiles are also supported with
databricks configure --profile <profile> [--token]

# To use the profile associated to a workspace, use
databricks <group> <command> --profile <profile-name>

# For example:
databricks workspace ls --profile <profile>

The databricks cli is subdivided into sub-cli’s:

More info: https://docs.databricks.com/dev-tools/cli/index.html

Centralized Databricks workspace

One can create a Databricks workspace which will contain centralized MLflow and Feature Store instances, that can be used from other workspaces (dev, staging, prod).

To connect such centralized workspace to each of the other ones, this is useful:

For MLflow, simply do like here: https://cprosenjit.medium.com/mlflow-azure-databricks-7e7e666b7327

For Feature Store, one needs to use the metastore of the centralized workspace, and refer to it when working from clusters in other workspaces. See here for the metastore declaration in other workspaces: https://docs.microsoft.com/en-us/azure/databricks/data/metastores/external-hive-metastore . Then follow this to connect the different workspaces together: https://docs.microsoft.com/en-us/azure/databricks/applications/machine-learning/feature-store/multiple-workspaces. It is also very important to make sure that the hive client is connected to a centralized Azure Blob Storage: https://docs.databricks.com/data/data-sources/azure/azure-storage.html#access-azure-blob-storage-from-the-hive-client

To link a local workspace to the centralized workspace, one has first to create a personal access token into the centralized workspace, something like dapi1232142

Then, using the databricks cli (first configure it to be able to talk to the local workspace, see “Databricks CLI” section just above), in a bash shell you can:

databricks secrets create-scope –scope connection-to-data-workspace –initial-manage-principal users

where the “connection-to-data-workspace” is the name of the scope (that i chose), and the “–initial-manage-principal users” is needed when not in Premium workspace

Then

databricks secrets put –scope connection-to-data-workspace –key data-workspace-host

This will request to enter the url of the CENTRALIZED workspace. Then

databricks secrets put –scope connection-to-data-workspace –key data-workspace-token

This will request the token of the PAT created in the CENTRALIZED workspace. Then finally

databricks secrets put –scope connection-to-data-workspace –key data-workspace-workspace-id

This will request the workspace id of the CENTRALIZED workspace (contained in the URL of that workspace usually, if not can be obtained using CLI)

Note:

Delta Lake

Delta documentation: https://docs.delta.io/latest/delta-batch.html#overwrite, https://delta.io/

Introduction to delta lake: https://books.japila.pl/delta-lake-internals/installation/

Delta is the default file format in databricks.

Delta Lake brings ACID to object storage:

Atomicity: all transactions are either succeeded or failed completely

Consistency: guarantees that state of data is observed same by simultaneous operations

Isolation: simultaneous operations should not conflict with one another

Durability: changes are permanent

Problems solved by ACID:

  1. Hard to append data: appends will not fail due to conflict, even when writing from multiple sources simultaneously

  2. Modification of existing data difficult: upserts allow to apply updates and deletes as a simple atomic transaction

  3. Jobs failing mid-way: changes are not committed until a job has succeeded. Jobs either fail or succeed completely (atomicity)

  4. Real-time operations are hard: delta allows atomic micro-batch transaction processing in near real-time (within structured streaming). We can use both real-time and batch operations in the same set of delta lake tables

  5. Costly to keep historical data versions:

How to build a database in DataBricks (based on a lecture from DataBricks): time travel enabled on delta lake table using properties as atomicity, consistency and isolation of transactions

username = "my_name"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

Download some data to a raw place:

%sh
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json

# Then have a look to raw place:
%sh ls

conf derby.log eventlogs health_tracker_data_2020_1.json

Then mode data to raw directory:

#Step 3: Move the data to the raw directory

dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json",
              health_tracker + "raw/health_tracker_data_2020_1.json")

Load the data as a Spark DataFrame from the raw directory. This is done using the .format(“json”) option:

file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
health_tracker_data_2020_1_df = (spark.read.format("json").load(file_path))

# Next, we remove the files in the /dbacademy/DLRS/healthtracker/processed directory. This step will make the notebook idempotent. In other words, it could be run more than once without throwing errors or introducing extra files.

dbutils.fs.rm(health_tracker + "processed", recurse=True)

Then transform data:

from pyspark.sql.functions import col, from_unixtime

def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )

processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

Then write the file in processed dir (Note that we are partitioning the data by device id):

(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

Next, Register the table in the metastore:

%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING PARQUET
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

File optimization with Delta Lake: https://docs.databricks.com/delta/optimizations/file-mgmt.html#language-python

  1. Compaction (bin-packing) (https://docs.databricks.com/delta/optimizations/file-mgmt.html#compaction-bin-packing):

Delta Lake on Databricks (starting from Runtime 11) can improve the speed of read queries from a table. One way to improve this speed is to coalesce small files into larger ones. You trigger compaction by running the OPTIMIZE command:

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events") # by table path
deltaTable.optimize().executeCompaction()

# or

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events") # by table name
deltaTable.optimize().executeCompaction()

If you have a large amount of data and only want to optimize a SUBSET of it, you can specify an optional partition predicate using WHERE:

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().where("date='2021-11-18'").executeCompaction() # COMPACTION ONLY FOR THE DATE SELECTED
  1. Z-Ordering (https://docs.databricks.com/delta/optimizations/file-mgmt.html#z-ordering-multi-dimensional-clustering)”

Example notebooks: https://docs.databricks.com/delta/optimizations/optimization-examples.html

Azure Data Factory (ADF)

Azure ML

Deployment of python ML databricks notebooks on Azure ML (through MLflow): https://medium.com/pgs-software/mlflow-tracking-ml-model-changes-deployment-in-azure-7bc6ba74f47e