AsyncDriver¶
Use this driver in an async context. E.g. for use with FastAPI.
- class hamilton.async_driver.AsyncDriver(config, *modules, result_builder: ResultMixin | None = None, adapters: List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePostTaskGroup | BasePostTaskExpand | BasePreTaskSubmission | BasePostTaskReturn | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] = None, allow_module_overrides: bool = False)¶
Async driver. This is a driver that uses the AsyncGraphAdapter to execute the graph.
dr = async_driver.AsyncDriver({}, async_module, result_builder=base.DictResult()) df = await dr.execute([...], inputs=...)
- __init__(config, *modules, result_builder: ResultMixin | None = None, adapters: List[BasePreDoAnythingHook | BaseDoCheckEdgeTypesMatch | BaseDoValidateInput | BaseValidateNode | BaseValidateGraph | BasePostGraphConstruct | BasePostGraphConstructAsync | BasePreGraphExecute | BasePreGraphExecuteAsync | BasePostTaskGroup | BasePostTaskExpand | BasePreTaskSubmission | BasePostTaskReturn | BasePreTaskExecute | BasePreTaskExecuteAsync | BasePreNodeExecute | BasePreNodeExecuteAsync | BaseDoNodeExecute | BaseDoNodeExecuteAsync | BasePostNodeExecute | BasePostNodeExecuteAsync | BasePostTaskExecute | BasePostTaskExecuteAsync | BasePostGraphExecute | BasePostGraphExecuteAsync | BaseDoBuildResult] = None, allow_module_overrides: bool = False)¶
Instantiates an asynchronous driver.
You will also need to call ainit to initialize the driver if you have any hooks/adapters.
Note that this is not the desired API – you should be using the
hamilton.async_driver.Builder
class to create the driver.This will only (currently) work properly with asynchronous lifecycle hooks, and does not support methods or validators. You can still pass in synchronous lifecycle hooks, but they may behave strangely.
- Parameters:
config – Config to build the graph
modules – Modules to crawl for fns/graph nodes
result_builder – Results mixin to compile the graph’s final results. TBD whether this should be included in the long run.
adapters – Adapters to use for lifecycle methods.
allow_module_overrides – Optional. Same named functions get overridden by later modules. The order of listing the modules is important, since later ones will overwrite the previous ones. This is a global call affecting all imported modules. See https://github.com/apache/hamilton/tree/main/examples/module_overrides for more info.
- async ainit() AsyncDriver ¶
Initializes the driver when using async. This only exists for backwards compatibility. In Hamilton 2.0, we will be using an asynchronous constructor. See https://dev.to/akarshan/asynchronous-python-magic-how-to-create-awaitable-constructors-with-asyncmixin-18j5.
- capture_constructor_telemetry(error: str | None, modules: Tuple[ModuleType], config: Dict[str, Any], adapter: HamiltonGraphAdapter)¶
Ensures we capture constructor telemetry the right way in an async context.
This is a simpler wrapper around what’s in the driver class.
- Parameters:
error – sanitized error string, if any.
modules – tuple of modules to build DAG from.
config – config to create the driver.
adapter – adapter class object.
- async execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Any ¶
Executes computation.
- Parameters:
final_vars – the final list of variables we want to compute.
overrides – values that will override “nodes” in the DAG.
display_graph – DEPRECATED. Whether we want to display the graph being computed.
inputs – Runtime inputs to the DAG.
- Returns:
an object consisting of the variables requested, matching the type returned by the GraphAdapter. See constructor for how the GraphAdapter is initialized. The default one right now returns a pandas dataframe.
- async raw_execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None, _fn_graph: FunctionGraph = None) Dict[str, Any] ¶
Executes the graph, returning a dictionary of strings (node keys) to final results.
- Parameters:
final_vars – Variables to execute (+ upstream)
overrides – Overrides for nodes
display_graph – whether or not to display graph – this is not supported.
inputs – Inputs for DAG runtime calculation
_fn_graph – Function graph for compatibility with superclass – unused
- Returns:
A dict of key -> result
Async Builder¶
Builds a driver in an async context – use await builder....build()
.
- class hamilton.async_driver.Builder¶
Builder for the async driver. This is equivalent to the standard builder, but has a more limited API. Note this does not support dynamic execution or materializers (for now).
Here is an example of how you might use it to get the tracker working:
from hamilton_sdk import tracker tracker_async = adapters.AsyncHamiltonTracker( project_id=1, username="elijah", dag_name="async_tracker", ) dr = ( await async_driver.Builder() .with_modules(async_module) .with_adapters(tracking_async) .build() )
- __init__()¶
Constructs a driver builder. No parameters as you call methods to set fields.
- async build()¶
Builds the async driver. This also initializes it, hence the async definition. If you don’t want to use async, you can use build_without_init and call ainit later, but we recommend using this in an asynchronous lifespan management function (E.G. in fastAPI), or something similar.
- Returns:
The fully
- build_without_init() AsyncDriver ¶
Allows you to build the async driver without initialization. Use this at your own risk – we highly recommend calling .ainit on the final result.
- Returns:
- enable_dynamic_execution(*, allow_experimental_mode: bool = False) Builder ¶
Enables the Parallelizable[] type, which in turn enables: 1. Grouped execution into tasks 2. Parallel execution :return: self
- with_adapter(adapter: HamiltonGraphAdapter) Builder ¶
Sets the adapter to use.
- Parameters:
adapter – Adapter to use.
- Returns:
self