Source code for hamilton.plugins.h_narwhals

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Any

import narwhals as nw

from hamilton.lifecycle import api


[docs] class NarwhalsAdapter(api.NodeExecutionMethod): """Adapter to make it simpler to use narwhals with Hamilton. .. code-block:: python from hamilton import base, driver from hamilton.plugins import h_narwhals import example # pandas dr = ( driver.Builder() .with_config({"load": "pandas"}) .with_modules(example) .with_adapters( h_narwhals.NarwhalsAdapter(), h_narwhals.NarwhalsDataFrameResultBuilder( base.PandasDataFrameResult() ), ) .build() ) result = dr.execute( [example.group_by_mean, example.example1], inputs={"col_name": "a"} ) """
[docs] def run_to_execute_node( self, *, node_name: str, node_tags: dict[str, Any], node_callable: Any, node_kwargs: dict[str, Any], task_id: str | None, **future_kwargs: Any, ) -> Any: """This method is responsible for executing the node and returning the result. It uses `nw_kwargs` from the node tags to know if any special flags should be passed to the narwhals decorator function. :param node_name: Name of the node. :param node_tags: Tags of the node. :param node_callable: Callable of the node. :param node_kwargs: Keyword arguments to pass to the node. :param task_id: The ID of the task, none if not in a task-based environment :param future_kwargs: Additional keyword arguments -- this is kept for backwards compatibility :return: The result of the node execution -- up to you to return this. """ nw_kwargs = {} if "nw_kwargs" in node_tags: nw_kwargs = {k: True for k in node_tags["nw_kwargs"]} nw_func = nw.narwhalify(node_callable, **nw_kwargs) return nw_func(**node_kwargs)
[docs] class NarwhalsDataFrameResultBuilder(api.ResultBuilder): """Builds the result. It unwraps the narwhals parts of it and delegates to the passed in result builder. .. code-block:: python from hamilton import base, driver from hamilton.plugins import h_narwhals, h_polars import example # polars dr = ( driver.Builder() .with_config({"load": "polars"}) .with_modules(example) .with_adapters( h_narwhals.NarwhalsAdapter(), h_narwhals.NarwhalsDataFrameResultBuilder( h_polars.PolarsDataFrameResult() ), ) .build() ) result = dr.execute( ["group_by_mean", "example1"], inputs={"col_name": "a"} ) """
[docs] def __init__(self, result_builder: api.ResultBuilder | api.LegacyResultMixin): self.result_builder = result_builder
[docs] def build_result(self, **outputs: Any) -> Any: """Given a set of outputs, build the result. :param outputs: the outputs from the execution of the graph. :return: the result of the execution of the graph. """ de_narwhaled_outputs = {} for key, value in outputs.items(): if isinstance(value, (nw.DataFrame, nw.Series)): de_narwhaled_outputs[key] = nw.to_native(value) else: de_narwhaled_outputs[key] = value return self.result_builder.build_result(**de_narwhaled_outputs)
[docs] def output_type(self) -> type: """Returns the output type of this result builder :return: the type that this creates """ return self.result_builder.output_type()