Source code for hamilton.experimental.decorators.parameterize_frame

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


import pandas as pd

from hamilton.function_modifiers import (
    UpstreamDependency,
    base,
    parameterize_extract_columns,
    source,
    value,
)
from hamilton.function_modifiers.expanders import ParameterizedExtract


def _get_dep_type(dep_type: str) -> UpstreamDependency:
    """Converts dependency type to the type known by function_modifier"""
    if dep_type == "out":
        return None
    if dep_type == "value":
        return value
    if dep_type == "source":
        return source
    raise ValueError(f"Invalid dep type: {dep_type}")


def _get_index_levels(index: pd.MultiIndex) -> list[list]:
    out = [[] for _ in index[0]]
    for specific_index in index:
        for i, key in enumerate(specific_index):
            out[i].append(key)
    return out


def _validate_df_parameterization(parameterization: pd.DataFrame):
    # TODO -- validate that its a multi-index
    columns = _get_index_levels(parameterization.columns)
    if (not len(columns) == 2) or "out" not in columns[1]:
        raise base.InvalidDecoratorException(
            "Decorator must have a double-index -- first index should be a "
            "list of {output, source, value} strs. Second must be a list of "
            "arguments in your function."
        )


def _convert_params_from_df(parameterization: pd.DataFrame) -> list[ParameterizedExtract]:
    _validate_df_parameterization(parameterization)
    args, dep_types = _get_index_levels(parameterization.columns)
    dep_types_converted = [_get_dep_type(val) for val in dep_types]
    out = []
    for _, column_set in parameterization.iterrows():
        parameterization = {
            arg: dep_type(col_value)
            for arg, col_value, dep_type in zip(args, column_set, dep_types_converted, strict=False)
            if dep_type is not None
        }
        extracted_columns = [
            col for col, dep_type in zip(column_set, dep_types, strict=False) if dep_type == "out"
        ]
        out.append(ParameterizedExtract(tuple(extracted_columns), parameterization))
    return out


[docs] class parameterize_frame(parameterize_extract_columns): """EXPERIMENTAL! Instantiates a parameterize_extract decorator using a dataframe to specify a set of extracts + \ parameterizations. This is an experimental decorator and the API may change in the future; please provide feedback \ whether this API does or does not work for you. :param parameterization: Parameterization dataframe. See below. This is of a specific shape: 1. Index - Level 0: list of parameter names 2. Index - Level 1: types of things to inject, either: - "out" (meaning this is an output), - "value" (meaning this is a literal value) - "source" (meaning this node comes from an upstream value) 3. Contents: - Each row corresponds to the index. Each of these corresponds to an output node from this. Note your function has to take in the column-names and output a dataframe with those names -- \ we will likely change it so that's not the case, and it can just use the position of the columns. Example usage: .. code-block:: python from hamilton.experimental.decorators.parameterize_frame import parameterize_frame df = pd.DataFrame( [ ["outseries1a", "outseries2a", "inseries1a", "inseries2a", 5.0], ["outseries1b", "outseries2b", "inseries1b", "inseries2b", 0.2], ], # specify column names corresponding to function arguments and # if outputting multiple columns, output dataframe columns. columns=[ ["output1", "output2", "input1", "input2", "input3"], ["out", "out", "source", "source", "value"], ]) @parameterize_frame(df) def my_func( input1: pd.Series, input2: pd.Series, input3: float ) -> pd.DataFrame: ... """
[docs] def __init__(self, parameterization: pd.DataFrame): super(parameterize_frame, self).__init__(*_convert_params_from_df(parameterization))
# Examples below if __name__ == "__main__": df = pd.DataFrame( [ ["outseries1a", "outseries2a", "inseries1a", "inseries2a", 5.0], ["outseries1b", "outseries2b", "inseries1b", "inseries2b", 0.2], # ... ], # Have to switch as indices have to be unique columns=[ [ "output1", "output2", "input1", "input2", "input3", ], # configure whether column is source or value and also whether it's input ("source", "value") or output ("out") ["out", "out", "source", "source", "value"], ], ) # specify column names (corresponding to function arguments and (if outputting multiple columns) output dataframe columns) @parameterize_frame(df) def my_func(input1: pd.Series, input2: pd.Series, input3: float) -> pd.DataFrame: return pd.DataFrame( [input1 * input2 * input3, input1 + input2 + input3] ) # if there's a single column it could maybe just return a series instead and pick up the name from the first column of the dataframe @parameterize_extract_columns( ParameterizedExtract( ("outseries1a", "outseries2a"), {"input1": source("inseries1a"), "input2": source("inseries2a"), "input3": value(5.0)}, ), ParameterizedExtract( ("outseries1b", "outseries2b"), {"input1": source("inseries1b"), "input2": source("inseries2b"), "input3": value(0.2)}, ), ) def my_func_parameterized_extract( input1: pd.Series, input2: pd.Series, input3: float ) -> pd.DataFrame: print("running my_func_parameterized_extract") return pd.concat([input1 * input2 * input3, input1 + input2 + input3], axis=1) my_func_parameterized_extract.decorated = "false" # Test by running the @parameterized_extract decorator from hamilton.ad_hoc_utils import create_temporary_module from hamilton.driver import Driver dr = Driver({}, create_temporary_module(my_func_parameterized_extract)) dr.visualize_execution( final_vars=["outseries1a", "outseries1b", "outseries2a", "outseries2b"], output_file_path="./out1.pdf", render_kwargs={}, inputs={ "inseries1a": pd.Series([1, 2]), "inseries1b": pd.Series([2, 3]), "inseries2a": pd.Series([3, 4]), "inseries2b": pd.Series([4, 5]), }, ) df_1 = dr.execute( final_vars=["outseries1a", "outseries1b", "outseries2a", "outseries2b"], # final_vars=["outseries1a", "outseries2a"], inputs={ "inseries1a": pd.Series([1, 2]), "inseries1b": pd.Series([2, 3]), "inseries2a": pd.Series([3, 4]), "inseries2b": pd.Series([4, 5]), }, ) print(df_1) # Test by running the @parameterized_extract decorator dr = Driver({}, create_temporary_module(my_func)) dr.visualize_execution( final_vars=["outseries1a", "outseries1b", "outseries2a", "outseries2b"], output_file_path="./out2.pdf", render_kwargs={}, inputs={ "inseries1a": pd.Series([1, 2]), "inseries1b": pd.Series([2, 3]), "inseries2a": pd.Series([3, 4]), "inseries2b": pd.Series([4, 5]), }, ) df_2 = dr.execute( final_vars=["outseries1a", "outseries1b", "outseries2a", "outseries2b"], # final_vars=["outseries1a", "outseries2a"], inputs={ "inseries1a": pd.Series([1, 2]), "inseries1b": pd.Series([2, 3]), "inseries2a": pd.Series([3, 4]), "inseries2b": pd.Series([4, 5]), }, ) print(df_2)