Testing Apache Hamilton code¶
A common question on Slack
is “how do I test my Hamilton functions?” – often with a worry that decorators
will get in the way. The good news: a Hamilton function is just a Python
function, so the standard pytest patterns you already know apply directly.
This guide walks through four cases, in order of increasing scope:
Unit-testing a plain function.
Unit-testing a decorated function.
Integration-testing the full DAG with the
Driver, includinginputs=andoverrides=.Driving an in-memory module for self-contained tests (e.g. of custom materializers).
The complete runnable code lives in
examples/testing.
Every code block on this page is a literalinclude from that folder, so the
docs and the example can never drift out of sync.
Prerequisites¶
Install the example’s dependencies and run it:
cd examples/testing
pip install -r requirements.txt
pytest
You should see all 13 tests pass.
1. Unit-testing plain functions¶
Hamilton encourages you to put your transformation logic in ordinary modules that don’t import the Driver. That makes them trivial to unit-test:
examples/testing/my_functions.py¶"""A small marketing dataflow we will test.
Each public function below becomes a node in the Hamilton DAG. Functions are
ordinary Python -- nothing about them depends on the driver -- which is what
makes them straightforward to unit-test.
"""
import pandas as pd
def signups(raw_signups: pd.Series) -> pd.Series:
"""Drop the first row (which is always a header sentinel in our source)."""
return raw_signups.iloc[1:].reset_index(drop=True)
def spend(raw_spend: pd.Series) -> pd.Series:
"""Drop the first row to align with `signups`."""
return raw_spend.iloc[1:].reset_index(drop=True)
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3-week average spend."""
return spend.rolling(3).mean()
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
"""Cost per signup, in dollars."""
return spend / signups
def spend_mean(spend: pd.Series) -> float:
"""Mean of the spend column."""
return spend.mean()
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
"""Spend with the mean subtracted off."""
return spend - spend_mean
def spend_std_dev(spend: pd.Series) -> float:
"""Standard deviation of the spend column."""
return spend.std()
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
"""Standard-scaled spend (zero mean, unit variance)."""
return spend_zero_mean / spend_std_dev
Tests are just calls to the function:
examples/testing/test_my_functions.py¶"""Unit tests for the plain (un-decorated) Hamilton functions.
The point of these tests: a Hamilton function is just a Python function. You
can import it and call it directly -- no Driver, no DAG, no fixtures required.
"""
import my_functions
import pandas as pd
import pytest
def test_avg_3wk_spend_returns_rolling_mean() -> None:
spend = pd.Series([10.0, 20.0, 30.0, 40.0])
result = my_functions.avg_3wk_spend(spend)
# The first two entries are NaN (window not full), then rolling mean of 3.
expected = pd.Series([float("nan"), float("nan"), 20.0, 30.0])
pd.testing.assert_series_equal(result, expected)
def test_spend_per_signup_divides_elementwise() -> None:
spend = pd.Series([100.0, 200.0])
signups = pd.Series([10.0, 50.0])
result = my_functions.spend_per_signup(spend=spend, signups=signups)
pd.testing.assert_series_equal(result, pd.Series([10.0, 4.0]))
def test_spend_zero_mean_centres_the_series() -> None:
spend = pd.Series([10.0, 20.0, 30.0])
spend_mean = 20.0
result = my_functions.spend_zero_mean(spend=spend, spend_mean=spend_mean)
pd.testing.assert_series_equal(result, pd.Series([-10.0, 0.0, 10.0]))
@pytest.mark.parametrize(
("raw", "expected_first"),
[
(pd.Series([0, 1, 2]), 1), # header sentinel dropped
(pd.Series([99, 5, 5, 5]), 5),
],
)
def test_signups_drops_header_row(raw: pd.Series, expected_first: int) -> None:
"""`pytest.mark.parametrize` is a clean way to cover edge cases."""
result = my_functions.signups(raw_signups=raw)
assert result.iloc[0] == expected_first
assert len(result) == len(raw) - 1
Notes¶
No Driver is required. You import the module under test and call its functions like any other Python code.
pytest.mark.parametrizeis a clean way to cover edge cases without copy-pasting test bodies.Use
pd.testing.assert_series_equal(orassert_frame_equal) for pandas outputs – it gives readable diffs on failure.
2. Unit-testing decorated functions¶
Hamilton’s function modifiers (@tag, @parameterize, @extract_columns,
…) tell Hamilton how to wire the function into the DAG. They do not
change what the function does when you call it directly. You can therefore
mix two complementary techniques:
Call the underlying function in a unit test (cheap, fast).
Build a Driver and assert on the expanded DAG, to verify the wiring (slower, but the only way to catch decorator misuse).
The decorated module:
examples/testing/decorated_functions.py¶"""Functions that use Hamilton decorators.
Decorators are a common source of confusion when testing. The point of this
module is to show that decorators do not get in the way of unit testing -- the
function below the decorator is still a plain Python callable, so you can call
it directly from a test. To test what the decorator *expands to*, drive the
function through a Driver instead (see ``test_decorated_functions.py``).
"""
import pandas as pd
from hamilton.function_modifiers import extract_columns, parameterize, source, tag, value
@tag(owner="growth-team", pii="false")
def total_signups(signups: pd.Series) -> int:
"""Sum of signups across the time window."""
return int(signups.sum())
@parameterize(
spend_in_thousands={"raw_value": source("spend"), "divisor": value(1000.0)},
signups_in_hundreds={"raw_value": source("signups"), "divisor": value(100.0)},
)
def scaled(raw_value: pd.Series, divisor: float) -> pd.Series:
"""Scale a series by a constant divisor.
`@parameterize` produces one node per entry above. The function itself is
still a normal callable, so a unit test can call ``scaled(some_series, 1000)``
directly without a Driver.
"""
return raw_value / divisor
@extract_columns("scaled_spend", "scaled_signups")
def scaled_features(spend_in_thousands: pd.Series, signups_in_hundreds: pd.Series) -> pd.DataFrame:
"""Bundle the two scaled series into a frame, then expose each column as a node."""
return pd.DataFrame({"scaled_spend": spend_in_thousands, "scaled_signups": signups_in_hundreds})
The tests:
examples/testing/test_decorated_functions.py¶"""Testing functions that use Hamilton decorators.
There are two complementary techniques here:
1. **Test the underlying callable.** Decorators such as ``@tag``,
``@parameterize`` and ``@extract_columns`` do not change what the function
*does* when you call it directly -- they change how Hamilton wires it into
the DAG. A direct call is the cheapest unit test.
2. **Test the expanded DAG.** Build a Driver, run it, and assert on the
generated nodes (e.g. ``spend_in_thousands``, ``scaled_spend``). This is the
only way to verify that the decorator wiring is correct.
"""
import decorated_functions
import pandas as pd
from hamilton import driver
def test_decorated_function_is_callable_directly() -> None:
"""`@parameterize` does not stop the function from being called directly."""
result = decorated_functions.scaled(raw_value=pd.Series([1000.0, 2000.0]), divisor=1000.0)
pd.testing.assert_series_equal(result, pd.Series([1.0, 2.0]))
def test_total_signups_ignores_tag() -> None:
"""`@tag` is metadata only -- the function still computes a sum."""
result = decorated_functions.total_signups(signups=pd.Series([1, 2, 3]))
assert result == 6
def test_parameterize_expands_into_two_nodes() -> None:
"""Build a Driver to verify `@parameterize` produced both nodes correctly."""
dr = driver.Builder().with_modules(decorated_functions).build()
inputs = {
"spend": pd.Series([1000.0, 2000.0, 3000.0]),
"signups": pd.Series([100.0, 200.0, 300.0]),
}
result = dr.execute(["spend_in_thousands", "signups_in_hundreds"], inputs=inputs)
pd.testing.assert_series_equal(
result["spend_in_thousands"], pd.Series([1.0, 2.0, 3.0]), check_names=False
)
pd.testing.assert_series_equal(
result["signups_in_hundreds"], pd.Series([1.0, 2.0, 3.0]), check_names=False
)
def test_extract_columns_exposes_each_column_as_a_node() -> None:
"""`@extract_columns` should make `scaled_spend` and `scaled_signups` queryable."""
dr = driver.Builder().with_modules(decorated_functions).build()
inputs = {
"spend": pd.Series([1000.0, 2000.0]),
"signups": pd.Series([100.0, 200.0]),
}
result = dr.execute(["scaled_spend", "scaled_signups"], inputs=inputs)
pd.testing.assert_series_equal(result["scaled_spend"], pd.Series([1.0, 2.0]), check_names=False)
pd.testing.assert_series_equal(
result["scaled_signups"], pd.Series([1.0, 2.0]), check_names=False
)
3. Integration-testing the DAG¶
For end-to-end tests, build a Driver from the module(s) under test and call
execute(...) with controlled inputs.
Two arguments are especially useful:
inputs=injects test data at the inputs of the DAG – the parameter names that aren’t produced by any function.overrides=short-circuits an intermediate node by pinning its value. This is the integration-test sweet spot: instead of fabricating realistic raw inputs and re-deriving every intermediate, hand the DAG a known value forspend(or any other node) and assert on the downstream logic.
examples/testing/test_driver.py¶"""Integration tests that exercise the full DAG via the Driver.
These show three patterns:
1. Build a Driver from one or more modules and assert on its outputs.
2. Use ``inputs=`` to inject test data at the DAG inputs.
3. Use ``overrides=`` to short-circuit an intermediate node, so you can test
downstream logic in isolation without recomputing upstream nodes.
"""
import my_functions
import pandas as pd
from hamilton import driver
def _build_driver() -> driver.Driver:
"""Helper: construct a Driver pointed at our module under test."""
return driver.Builder().with_modules(my_functions).build()
def test_driver_executes_full_pipeline() -> None:
dr = _build_driver()
inputs = {
"raw_signups": pd.Series([0, 1, 10, 50, 100, 200, 400]),
"raw_spend": pd.Series([0, 10, 10, 20, 40, 40, 50]),
}
result = dr.execute(["spend_per_signup", "spend_mean"], inputs=inputs)
# spend after dropping the header row: [10, 10, 20, 40, 40, 50] -> mean 28.333...
assert result["spend_mean"] == pd.Series([10, 10, 20, 40, 40, 50]).mean()
pd.testing.assert_series_equal(
result["spend_per_signup"],
pd.Series([10.0, 1.0, 0.4, 0.4, 0.2, 0.125]),
check_names=False,
)
def test_overrides_short_circuit_upstream_nodes() -> None:
"""`overrides=` lets us pin a node's value, so upstream code is skipped.
This is the integration-test sweet spot: instead of fabricating realistic
raw inputs and re-deriving every intermediate, we hand the DAG a known
`spend` and assert on the *downstream* arithmetic.
"""
dr = _build_driver()
result = dr.execute(
["spend_zero_mean_unit_variance"],
# No `inputs=` needed because every upstream dependency is overridden.
overrides={"spend": pd.Series([10.0, 20.0, 30.0])},
)
scaled = result["spend_zero_mean_unit_variance"]
# Standardised series: mean ~0, std ~1.
assert abs(scaled.mean()) < 1e-9
assert abs(scaled.std(ddof=1) - 1.0) < 1e-9
def test_what_is_upstream_of() -> None:
"""The Driver itself can be inspected -- handy for asserting graph shape."""
dr = _build_driver()
upstream_node_names = {n.name for n in dr.what_is_upstream_of("spend_per_signup")}
# `spend_per_signup` depends (transitively) on the raw inputs and on `spend`/`signups`.
assert {"spend", "signups", "raw_spend", "raw_signups"} <= upstream_node_names
Tip: Driver exposes a number of inspection methods –
what_is_upstream_of, what_is_downstream_of, list_available_variables
– that are handy for asserting on graph shape, not just values.
4. In-memory modules for self-contained tests¶
Sometimes you want a test that defines its own tiny Hamilton module inline
– to exercise a custom materializer, regression-test a data-quality bug,
or demonstrate a pattern in a doctest. You don’t need to create a new
.py file; hamilton.ad_hoc_utils.create_temporary_module packages
inline-defined functions into a real module that the Driver can consume:
examples/testing/test_ad_hoc_module.py¶"""Testing patterns that don't need a separate module on disk.
Sometimes you want to define a tiny set of functions inside the test itself --
to exercise a custom materializer, a graph adapter, or a regression case --
without creating a whole new ``.py`` file. ``hamilton.ad_hoc_utils`` exposes
``create_temporary_module`` for exactly that.
"""
from hamilton import ad_hoc_utils, driver
def test_temporary_module_can_drive_a_dag() -> None:
"""Define functions inline, package them into a module, run the Driver."""
def base(value: int) -> int:
return value + 1
def squared(base: int) -> int:
return base * base
temp_module = ad_hoc_utils.create_temporary_module(base, squared)
dr = driver.Builder().with_modules(temp_module).build()
result = dr.execute(["squared"], inputs={"value": 4})
# base = 4 + 1 = 5; squared = 25
assert result["squared"] == 25
This is also how Hamilton itself tests several of its built-in materializers, so it scales up to fairly involved scenarios. See tests/test_ad_hoc_utils.py in the Hamilton source for more usage examples.
Where to go from here¶
Read the Code Organization page – the module structure it recommends is the same one that makes tests easy to write.
Browse the Hamilton test suite for ideas; the same patterns work for user code.
Have a testing pattern that isn’t covered here? Share it on Slack – we’d love to add it.