h_ray.RayGraphAdapter

The graph adapter to delegate execution of the individual nodes in a Apache Hamilton graph to Ray.

class hamilton.plugins.h_ray.RayGraphAdapter(result_builder: ResultMixin, ray_init_config: Dict[str, Any] = None, shutdown_ray_on_completion: bool = False)

Class representing what’s required to make Hamilton run on Ray.

This walks the graph and translates it to run onto Ray.

Use pip install sf-hamilton[ray] to get the dependencies required to run this.

Use this if:

  • you want to utilize multiple cores on a single machine, or you want to scale to larger data set sizes with a Ray cluster that you can connect to. Note (1): you are still constrained by machine memory size with Ray; you can’t just scale to any dataset size. Note (2): serialization costs can outweigh the benefits of parallelism so you should benchmark your code to see if it’s worth it.

Notes on scaling:

  • Multi-core on single machine ✅

  • Distributed computation on a Ray cluster ✅

  • Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer 💻.

Function return object types supported:

  • Works for any python object that can be serialized by the Ray framework. ✅

Pandas?

  • ⛔️ Ray DOES NOT do anything special about Pandas.

CAVEATS

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(result_builder: ResultMixin, ray_init_config: Dict[str, Any] = None, shutdown_ray_on_completion: bool = False)

Constructor

You have the ability to pass in a ResultMixin object to the constructor to control the return type that gets produce by running on Ray.

Parameters:
  • result_builder – Required. An implementation of base.ResultMixin.

  • ray_init_config – allows to connect to an existing cluster or start a new one with custom configuration (https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html)

  • shutdown_ray_on_completion – by default we leave the cluster open, but we can also shut it down

do_build_result(outputs: Dict[str, Any]) Any

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static do_check_edge_types_match(type_from: Type, type_to: Type) bool

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_remote_execute(*, execute_lifecycle_for_node: Callable, node: Node, **kwargs: Dict[str, Any]) Any

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • execute_lifecycle_for_node – wrapper function that executes lifecycle hooks and methods

  • kwargs – the arguments that should be passed to it.

Returns:

returns a ray object reference.

static do_validate_input(node_type: Type, input_value: Any) bool

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

post_graph_execute(*args, **kwargs)

We have the option to close the cluster down after execution.