Source code for hamilton.plugins.h_narwhals
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
import narwhals as nw
from hamilton.lifecycle import api
[docs]
class NarwhalsAdapter(api.NodeExecutionMethod):
"""Adapter to make it simpler to use narwhals with Hamilton.
.. code-block:: python
from hamilton import base, driver
from hamilton.plugins import h_narwhals
import example
# pandas
dr = (
driver.Builder()
.with_config({"load": "pandas"})
.with_modules(example)
.with_adapters(
h_narwhals.NarwhalsAdapter(),
h_narwhals.NarwhalsDataFrameResultBuilder(
base.PandasDataFrameResult()
),
)
.build()
)
result = dr.execute(
[example.group_by_mean, example.example1],
inputs={"col_name": "a"}
)
"""
[docs]
def run_to_execute_node(
self,
*,
node_name: str,
node_tags: dict[str, Any],
node_callable: Any,
node_kwargs: dict[str, Any],
task_id: str | None,
**future_kwargs: Any,
) -> Any:
"""This method is responsible for executing the node and returning the result.
It uses `nw_kwargs` from the node tags to know if any special flags should be passed to the narwhals
decorator function.
:param node_name: Name of the node.
:param node_tags: Tags of the node.
:param node_callable: Callable of the node.
:param node_kwargs: Keyword arguments to pass to the node.
:param task_id: The ID of the task, none if not in a task-based environment
:param future_kwargs: Additional keyword arguments -- this is kept for backwards compatibility
:return: The result of the node execution -- up to you to return this.
"""
nw_kwargs = {}
if "nw_kwargs" in node_tags:
nw_kwargs = {k: True for k in node_tags["nw_kwargs"]}
nw_func = nw.narwhalify(node_callable, **nw_kwargs)
return nw_func(**node_kwargs)
[docs]
class NarwhalsDataFrameResultBuilder(api.ResultBuilder):
"""Builds the result. It unwraps the narwhals parts of it and delegates to the passed in result builder.
.. code-block:: python
from hamilton import base, driver
from hamilton.plugins import h_narwhals, h_polars
import example
# polars
dr = (
driver.Builder()
.with_config({"load": "polars"})
.with_modules(example)
.with_adapters(
h_narwhals.NarwhalsAdapter(),
h_narwhals.NarwhalsDataFrameResultBuilder(
h_polars.PolarsDataFrameResult()
),
)
.build()
)
result = dr.execute(
["group_by_mean", "example1"],
inputs={"col_name": "a"}
)
"""
[docs]
def __init__(self, result_builder: api.ResultBuilder | api.LegacyResultMixin):
self.result_builder = result_builder
[docs]
def build_result(self, **outputs: Any) -> Any:
"""Given a set of outputs, build the result.
:param outputs: the outputs from the execution of the graph.
:return: the result of the execution of the graph.
"""
de_narwhaled_outputs = {}
for key, value in outputs.items():
if isinstance(value, (nw.DataFrame, nw.Series)):
de_narwhaled_outputs[key] = nw.to_native(value)
else:
de_narwhaled_outputs[key] = value
return self.result_builder.build_result(**de_narwhaled_outputs)
[docs]
def output_type(self) -> type:
"""Returns the output type of this result builder
:return: the type that this creates
"""
return self.result_builder.output_type()