What if ... you had to use parameters in SQL queries



SQL is a powerful language fueling analytics, product, and operations. It directly impacts decision-making and eventually revenue.


Maintaining an analytics or feature store pipeline involves a lot of SQL and parameters. We give a useful tip on how to serve those parameters in a smooth manner to cut down on headaches and errors.


Hell game of literals in BI queries


As I was designing and reviewing BI queries at Qonto – a french fintech – I had to integrate constants:

  • revenue contractual commission on certain transactions

  • compliance thresholds for certain batch reporting

  • marketing segmentation parameters (e.g. number of employees)

  • top management dashboard temporal differences to account to different moving averages


Use Case


Need

Let's pick the following example where the marketing team wants reporting based upon a medium bracket customer segment. We agreed with stakeholders on the following definition

The medium bracket should hold clients bearing between 10 and 30 valid transactions through the last 20 days

We want a base table that stores the client id belonging to that bracket and summarizes the number of transactions as well as the total amount of those transactions


First approach

The base query would group transactions by customer_id over the period of time and then filter upon the threshold aforementioned.

  1. Select all valid transactions within the timeframe

  2. Group them by customer_id and aggregate count of transactions

  3. Filter lines where the count is within the range

We can use a CTE (common table expression) for the first two steps and produce the following


WITH grouped AS (
	SELECT
		CURRENT_DATE AS day,
		c.id AS customer_id,
	  COUNT(t.amount_euros) AS transaction_count,
	  SUM(t.id) AS transaction_sum_euros
	FROM customers AS c
	LEFT JOIN transactions AS t ON (
		t.customer_id = c.id
		AND t.status = p.transation_status_ok
	)
	WHERE t.date::DATE 
		BETWEEN (start_date - INTERVAL '20 days')
		AND start_date
	GROUP BY CURRENT_DATE, c.id
)

SELECT
	g.day,
	g.customer_id,
	g.transaction_count,
	g.transaction_sum_euros
FROM grouped AS g
WHERE g.transaction_count BETWEEN 10 AND 29


Introducing tricks

One nice improvement over scattering constants all over lengthy SQL files is to group all constants and parameters in a dedicated CTE (common table expression)


Another one is to use WHERE TRUE with AND for each filter which allows easy editing


Here is what the query could look like in a Snowflake dialect

WITH parameters AS (
	SELECT
		CURRENT_DATE AS start_date,
		'validated' AS transation_status_ok,
	  10 AS transaction_count_low,
	  30 AS transaction_count_high,
	  20 AS account_period_days 
)

,grouped AS (
	SELECT
		p.start_date AS day,
		c.id AS customer_id,
	  COUNT(t.amount_euros) AS transaction_count,
	  SUM(t.id) AS transaction_sum_euros
	FROM customers AS c
	CROSS JOIN parameters AS p
	LEFT JOIN transactions AS t ON (
	    t.customer_id = c.id
			AND t.status = p.transation_status_ok
	)
	WHERE TRUE
		AND t.date::DATE 
			BETWEEN (start_date - account_period_days * INTERVAL '1 day')
			AND start_date
	GROUP BY p.start_date, c.id
)

SELECT
	g.day,
	g.customer_id,
	g.transaction_count,
	g.transaction_sum_euros
FROM grouped AS g
CROSS JOIN parameters AS p
WHERE TRUE
	AND g.transaction_count
		BETWEEN p.transaction_count_low
		AND p.transaction_count_high - 1

This example has a minimal footprint. In real-life base tables, you could face 10-20 parameters and 50+ output columns. The upsides are manyfold:

  • DRY (don't repeat yourself): declare once use multiple times if needed

  • Easy maintenance: fewer errors and easier update for anyone

  • Easy to understand: parameters should come ahead


Real-world usage

Let's explore how to use this trick with the most common use cases

  • science pandas

  • engineering airflow and dbt

  • visualization Metabase

We use python 3.9 syntax throughout those use cases (including new dictionary update)


pandas

Most database connection engine allows binding parameters:

  • Object Relationship Mapping tools - ORM (think sqlalchemy)

  • SQL tools (think dBeaver)

This is done using the :parameter syntax

from dataclasses import dataclass, asdict
import pandas as pd
from sqlalchemy import Engine, create_engine

_URL = "postgresql+psycopg2://postgres:postgres@localhost:5432/data"

_COUNTS = (0, 10, 30, 10 ** 9)

_PERIOD_DAYS = 20

_QUERY = """
WITH parameters AS (
	SELECT
		CURRENT_DATE AS start_date,
		:validated AS transation_status_ok,
	  :transaction_count_low AS transaction_count_low,
	  :transaction_count_high AS transaction_count_high,
	  :account_period_days AS account_period_days 
)

,grouped AS ( ... )

SELECT ...
"""

@dataclass
class CustomersRevenueParams:
	  transaction_count_low: int
	  transaction_count_high: int
	  account_period_days: int

def customer_transactions(
		engine: Engine, crp: CustomersRevenueParams
) -> pd.DataFrame:
		params = asdict(crp) | {"transation_status_ok": "validated"}
    return pd.read_sql(_QUERY, con=engine, params=params)

_PARAMS = (
  CustomersRevenueParams(c_low, c_high, _PERIOD_DAYS), 
	for c_low, c_high in zip(_COUNTS[:-1], _COUNTS[1:])
)

engine = create_engine(_URL)

for i, crp in enumerate(_PARAMS):
    d = customer_transactions(engine, crp)
    d.to_csv(f"customer_{i:02d}.csv"


DBT

You can mix it with configuration, references, and variables in customer_transaction.sql

{{ config(materialized='table') }}

WITH parameters AS (
	SELECT
		CURRENT_DATE AS start_date,
		'validated' AS transation_status_ok,
	  {{ var('customer_transaction_count')[0] }} AS transaction_count_medium,
	  {{ var('customer_transaction_count')[1] }} AS transaction_count_high,
	  {{ var('customer_account_period_days') }} AS account_period_days 
),

customers AS (
    SELECT * FROM {{ ref('staging_customers') }}
),

transactions AS (
    select * from {{ ref('staging_transactions') }}
),

,grouped AS ( ... )

SELECT ...

Set variables in dbt_project.yml configuration file as such:

name: bi
version: 1.0.0

config-version: 2

vars:
  bi_queries:
		customer_transaction_count: [10, 30]
    customer_account_period_days: 20

models:
    ...


Airflow

You can use a sql operator such as PostgresOperator in conjunction with:

  • a SQL file dags/bi/customer_transaction/transform.sql

  • a parameters file dags/bi/customer_transaction/parameters.py

  • a file loader helper dags/load_file.py

  • a DAG definition file dags/customers.py

Define the source.sql file with parameters placeholders

WITH parameters AS (
	SELECT
		CURRENT_DATE AS start_date,
		{{ params.validated }} AS transation_status_ok,
		{{ params.transaction_count_low }} AS transaction_count_low,
		{{ params.transaction_count_high }} AS transaction_count_high,
		{{ params.account_period_days }} AS account_period_days 
)

,grouped AS ( ... )

SELECT ...

Define the [parameters.py](<http://parameters.py>) file with constant values:

_BASE = {
		"validated": "validated",
		"account_period_days": 20
}

_COUNTS = (0, 10, 30, 10 ** 9)

def _params(count_low: int, count_high: int) -> dict:
		transactions = {
				"transaction_count_low": count_low,
				"transaction_count_low": count_high,
		}
			return _BASE | transactions

PARAMETERS = {
		"low": _params(_COUNTS[0], _COUNTS[1]),
		"medium": _params(_COUNTS[1], _COUNTS[2]),
		"high": _params(_COUNTS[2], _COUNTS[3]),
}

Add the file loader load_file.py

from os import path as op, getcwd
from typing import Optional

_LOCATION = op.realpath(op.join(getcwd(), op.dirname(__file__)))

def load_file(path: str) -> str:
    """accepts a relative path to load from file"""
    filepath = op.join(_LOCATION, path)

    with open(filepath, "r") as f:
        return f.read()

You can feed parameters at task invoke the DAG customers.py

from datetime import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

from load_file import load_file
from bi.customer_transaction.parameters import PARAMS

DAG_ID = "bi_customers"
START_DATE = datetime(2021, 4, 27)
OWNER = "bi_team"

SQL = {
	"customers": load_file("bi/customer_transaction/customers.sql"),
	"transactions": load_file("bi/customer_transaction/transactions.sql"),
	"transform": load_file("bi/customer_transaction/transform.sql"),
}

def load_sql(

with DAG(
    dag_id=DAG_ID,
    start_date=START_DATE,
    schedule_interval="@daily",
    default_args={"owner": OWNER},
    catchup=False,
) as dag:
		# load data tasks
    loads: dict[str, PostgresOperator] = {}
		for source in ("customers", "transactions"):
			loads[source] = PostgresOperator(
					task_id=f"bi_customers_load_{source}",
					postgres_conn_id="postgres",
					sql=SQL[source],				
			)
		# transform data tasks
		transform_customers = dict()
		for name, params in PARAMS.items():
		    transform_customers[name] = PostgresOperator(
		        task_id=f"bi_customers_load_{name}",
		        postgres_conn_id="postgres",
		        sql=SQL["transform"]
						params=params
		    )
				transform_customers.append(task)
		
		# declare dependencies
		loads["customers"] >> loads["transactions"]
		for n in PARAMS:
			loads["transactions"] >> transform_customers[n]

See documentation


Metabase

Metabase allow you using variables in queries using {{ }}

WITH parameters AS (
	SELECT
		CURRENT_DATE AS start_date,
		{{ status }} AS transation_status_ok,
	  {{ count_low }} AS transaction_count_low,
	  {{ count_high }} AS transaction_count_high,
	  {{ period_days }} AS account_period_days 
)

,grouped AS ( ... )

SELECT ...

With the following parameters

  • status as Text

  • count_low, count_high and period_days as Number


Finding Data

Writing and maintaining complex and efficient SQL queries requires experience and skills. It requires wrapping your mind around growing data assets. You might want to leverage the most of your data knowledge.


I am a co-founder at Castor, a data discovery platform to help anyone find, understand and use data assets across the whole company. We developed a SQL query history feature to help data people find and share queries. Check it out.