Source code for hamilton.plugins.h_slack
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import traceback
from typing import Any
from slack_sdk import WebClient
from hamilton.execution.graph_functions import create_input_string
from hamilton.lifecycle import NodeExecutionHook
[docs]
class SlackNotifier(NodeExecutionHook):
"""This is a adapter that sends a message to a slack channel when a node is executed & fails.
Note: you need to have slack_sdk installed for this to work.
If you don't have it installed, you can install it with `pip install slack_sdk`
(or `pip install apache-hamilton[slack]` -- use quotes if you're using zsh).
.. code-block:: python
from hamilton.plugins import h_slack
dr = (
driver.Builder()
.with_config({})
.with_modules(some_modules)
.with_adapters(h_slack.SlackNotifier(api_key="YOUR_API_KEY", channel="YOUR_CHANNEL"))
.build()
)
# and then when you call .execute() or .materialize() you'll get a message in your slack channel!
"""
[docs]
def __init__(self, api_key: str, channel: str, **kwargs):
"""Constructor.
:param api_key: API key to use for sending messages.
:param channel: Channel to send messages to.
"""
self.slack_client = WebClient(api_key)
self.channel = channel
self.kwargs = kwargs
def _send_message(self, message: str):
"""Sends a message to the slack channel."""
if self.slack_client is not None:
self.slack_client.chat_postMessage(channel=self.channel, text=message)
[docs]
def run_before_node_execution(
self,
node_name: str,
node_tags: dict[str, Any],
node_kwargs: dict[str, Any],
node_return_type: type,
**future_kwargs: Any,
):
"""Placeholder required to subclass `NodeExecutionMethod`"""
pass
[docs]
def run_after_node_execution(
self,
node_name: str,
node_tags: dict[str, Any],
node_kwargs: dict[str, Any],
node_return_type: type,
result: Any,
error: Exception | None,
success: bool,
task_id: str | None,
run_id: str,
**future_kwargs: Any,
):
"""Sends a message to the slack channel after a node is executed."""
if error:
message = (
f"*Error Executing Node: `{node_name}`*\n"
f"> *Run ID:* {run_id}\n"
f"> *Task ID:* {task_id}\n"
f"> *Error:* {str(error)}\n"
f"> *Stack Trace:*\n> ```\n{''.join(traceback.format_exception(type(error), error, error.__traceback__))}```\n"
f"> *Node Tags:* `{node_tags}`\n"
f"> *Node Kwargs:* ```{create_input_string(node_kwargs)}```\n"
f"> *Return Type:* `{node_return_type}`\n"
)
self._send_message(message)