with_columns¶

We support the with_columns operation that appends the results as new columns to the original dataframe for several libraries:

Pandas¶

Reference Documentation

class hamilton.plugins.h_pandas.with_columns(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe.

Here’s an example of calling it – if you’ve seen @subdag, you should be familiar with the concepts:

# my_module.py
def a(a_from_df: pd.Series) -> pd.Series:
    return _process(a)

def b(b_from_df: pd.Series) -> pd.Series:
    return _process(b)

def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series:
    return (a_from_df + b_from_df) / 2
# with_columns_module.py
def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:
    return a + b


# the with_columns call
@with_columns(
    *[my_module], # Load from any module
    *[a_plus_b], # or list operations directly
    columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
    # the subdag
    select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe
)
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
    # process, or just return unprocessed
    ...

In this instance the initial_df would get two columns added: a_plus_b and a_b_average.

The operations are applied in topological order. This allows you to express the operations individually, making it easy to unit-test and reuse.

Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe.

If the function takes multiple dataframes, the dataframe input to process will always be the first argument. This will be passed to the subdag, transformed, and passed back to the function. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.

You can read it as:

“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”

In case you need more flexibility you can alternatively use on_input, for example,

# with_columns_module.py
def a_from_df(initial_df: pd.Series) -> pd.Series:
    return initial_df["a_from_df"] / 100

def b_from_df(initial_df: pd.Series) -> pd.Series:
    return initial_df["b_from_df"] / 100


# the with_columns call
@with_columns(
    *[my_module],
    *[a_from_df],
    on_input="initial_df",
    select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"],
)
def final_df(initial_df: pd.DataFrame, ...) -> pd.DataFrame:
    # process, or just return unprocessed
    ...

the above would output a dataframe where the two columns a_from_df and b_from_df get overwritten.

__init__(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Instantiates a @with_columns decorator.

Parameters:
  • load_from – The functions or modules that will be used to generate the group of map operations.

  • columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with on_input.

  • on_input – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out on the first parameter for you.

  • select – The end nodes that represent columns to be appended to the original dataframe via with_columns. Existing columns will be overridden. The selected nodes need to have the corresponding column type, in this case pd.Series, to be appended to the original dataframe.

  • namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)

  • config_required – the list of config keys that are required to resolve any functions. Pass in None if you want the functions/modules to have access to all possible config.

Polar (Eager)¶

Reference Documentation

class hamilton.plugins.h_polars.with_columns(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Initializes a with_columns decorator for polars.

This allows you to efficiently run groups of map operations on a dataframe. We support both eager and lazy mode in polars. In case of using eager mode the type should be pl.DataFrame and the subsequent operations run on columns with type pl.Series.

Here’s an example of calling in eager mode – if you’ve seen @subdag, you should be familiar with the concepts:

# my_module.py
def a_b_average(a: pl.Series, b: pl.Series) -> pl.Series:
    return (a + b) / 2
# with_columns_module.py
def a_plus_b(a: pl.Series, b: pl.Series) -> pl.Series:
    return a + b


# the with_columns call
@with_columns(
    *[my_module], # Load from any module
    *[a_plus_b], # or list operations directly
    columns_to_pass=["a", "b"], # The columns to pass from the dataframe to
    # the subdag
    select=["a_plus_b", "a_b_average"], # The columns to append to the dataframe
)
def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:
    # process, or just return unprocessed
    ...

In this instance the initial_df would get two columns added: a_plus_b and a_b_average.

Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe.

If the function takes multiple dataframes, the dataframe input to process will always be the first argument. This will be passed to the subdag, transformed, and passed back to the function. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.

You can read it as:

“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”

In case you need more flexibility you can alternatively use on_input, for example,

# with_columns_module.py
def a_from_df() -> pl.Expr:
    return pl.col(a).alias("a") / 100

def b_from_df() -> pl.Expr:
    return pl.col(b).alias("b") / 100


# the with_columns call
@with_columns(
    *[my_module],
    on_input="initial_df",
    select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"],
)
def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:
    # process, or just return unprocessed
    ...

the above would output a dataframe where the two columns a and b get overwritten.

__init__(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Instantiates a @with_columns decorator.

Parameters:
  • load_from – The functions or modules that will be used to generate the group of map operations.

  • columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with on_input.

  • on_input – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out on the first parameter for you.

  • select – The end nodes that represent columns to be appended to the original dataframe via with_columns. Existing columns will be overridden. The selected nodes need to have the corresponding column type, in this case pl.Series, to be appended to the original dataframe.

  • namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)

  • config_required – the list of config keys that are required to resolve any functions. Pass in None if you want the functions/modules to have access to all possible config.

Polars (Lazy)¶

Reference Documentation

class hamilton.plugins.h_polars_lazyframe.with_columns(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Initializes a with_columns decorator for polars.

This allows you to efficiently run groups of map operations on a dataframe. We support both eager and lazy mode in polars. For lazy execution, use pl.LazyFrame and the subsequent operations should be typed as pl.Expr. See examples/polars/with_columns for a practical implementation in both variations.

The lazy execution would be:

# my_module.py
def a_b_average(a: pl.Expr, b: pl.Expr) -> pl.Expr:
    return (a + b) / 2
# with_columns_module.py
def a_plus_b(a: pl.Expr, b: pl.Expr) -> pl.Expr:
    return a + b


# the with_columns call
@with_columns(
    *[my_module], # Load from any module
    *[a_plus_b], # or list operations directly
    columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
    # the subdag
    select=["a_plus_b", "a_b_average"], # The columns to append to the dataframe
)
def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame:
    # process, or just return unprocessed
    ...

Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe.

If the function takes multiple dataframes, the dataframe input to process will always be the first argument. This will be passed to the subdag, transformed, and passed back to the function. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.

You can read it as:

“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”

In case you need more flexibility you can alternatively use on_input, for example,

# with_columns_module.py
def a_from_df() -> pl.Expr:
    return pl.col(a).alias("a") / 100

def b_from_df() -> pd.Expr:
    return pl.col(a).alias("b") / 100


# the with_columns call
@with_columns(
    *[my_module],
    on_input="initial_df",
    select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"],
)
def final_df(initial_df: pl.LazyFrame) -> pl.LazyFrame:
    # process, or just return unprocessed
    ...

the above would output a dataframe where the two columns a and b get overwritten.

__init__(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, config_required: List[str] = None)¶

Instantiates a @with_columns decorator.

Parameters:
  • load_from – The functions or modules that will be used to generate the group of map operations.

  • columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with on_input.

  • on_input – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out on the first parameter for you.

  • select – The end nodes that represent columns to be appended to the original dataframe via with_columns. Existing columns will be overridden. The selected nodes need to have the corresponding column type, in this case pl.Expr, to be appended to the original dataframe.

  • namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)

  • config_required – the list of config keys that are required to resolve any functions. Pass in None if you want the functions/modules to have access to all possible config.

PySpark¶

This is part of the hamilton pyspark integration. To install, run:

pip install sf-hamilton[pyspark]

Reference Documentation

class hamilton.plugins.h_spark.with_columns(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append', config_required: List[str] = None)¶
__init__(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, on_input: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append', config_required: List[str] = None)¶
Initializes a with_columns decorator for spark. This allows you to efficiently run

groups of map operations on a dataframe, represented as pandas/primitives UDFs. This effectively “linearizes” compute – meaning that a DAG of map operations can be run as a set of .withColumn operations on a single dataframe – ensuring that you don’t have to do a complex extract then join process on spark, which can be inefficient.

Here’s an example of calling it – if you’ve seen @subdag, you should be familiar with the concepts:

# my_module.py
def a(a_from_df: pd.Series) -> pd.Series:
    return _process(a)

def b(b_from_df: pd.Series) -> pd.Series:
    return _process(b)

def a_plus_b(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series:
    return a + b


# the with_columns call
@with_columns(
    load_from=[my_module], # Load from any module
    columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
    # the subdag
    select=["a", "b", "a_plus_b"], # The columns to select from the dataframe
)
def final_df(initial_df: ps.DataFrame) -> ps.DataFrame:
    # process, or just return unprocessed
    ...

You can think of the above as a series of withColumn calls on the dataframe, where the operations are applied in topological order. This is significantly more efficient than extracting out the columns, applying the maps, then joining, but also allows you to express the operations individually, making it easy to unit-test and reuse.

Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe. We will likely add an option to have this be either “select” or “append” mode.

If the function takes multiple dataframes, the dataframe input to process will always be the first one. This will be passed to the subdag, transformed, and passed back to the functions. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.

You can read it as:

“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”

Parameters:
  • load_from – The functions that will be used to generate the group of map operations.

  • columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with pass_dataframe_as.

  • pass_dataframe_as – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you.

  • select – Outputs to select from the subdag, i.e. functions/module passed int. If this is left blank it will add all possible columns from the subdag to the dataframe.

  • namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)

  • mode – The mode of the operation. This can be either “append” or “select”. If it is “append”, it will keep all original columns in the dataframe, and append what’s in select. If it is “select”, it will do a global select of columns in the dataframe from the select parameter. Note that, if the select parameter is left blank, it will add all columns in the dataframe that are in the subdag. This defaults to append. If you’re using select, use the @select decorator instead.

  • config_required – the list of config keys that are required to resolve any functions. Pass in None if you want the functions/modules to have access to all possible config.