Alteryx IO Discussions

Customize and extend the power of Alteryx with SDKs, APIs, custom tools, and more.

Source code for the Python SDK Examples?

1npo
5 - Atom

Hello,

 

Where can I view the source code of the Python SDK Example Tools? I can load the tools themselves, but am struggling to find any part of the interface that will allow me to view the Python that these tools are built from.

 

Thanks,

Nick

3 REPLIES 3
PeterOtt
Alteryx
Alteryx

Hey Nick! Are you looking for the Python SDK example tools in Designer, or the Platform SDK Example tools here: https://help.alteryx.com/developer-help/ayx-python-sdk-v2-example-tools ?

1npo
5 - Atom

Hi Peter!

 

I'm new to Alteryx and not sure what the difference is between "Python SDK Example Tools in Designer" and the "Platform SDK Example Tools". The page you linked is listed under "Platform SDK", but it lists Example Tools that can be dragged onto a workflow in Designer.

 

All of our current pipelines are implemented in pure Python, and I'm working on migrating them to Alteryx. It would make sense for some steps in some of those pipelines to be implemented as Custom Python Tools that I can drag onto a workflow. So I'm looking for the Python source code that the Custom Tools in your link are implemented from, to help me get started.

 

Thanks,

mkhtran
9 - Comet

I've finally found the source code for these examples! They are found in the `ayx_python_sdk` package under `/examples`. This is usually

  • C:\Users\<YourUsername>\AppData\Local\Programs\Python\Python38\Lib\site-packages\ayx_python_sdk\examples

Here they are with comments removed:

 

AyxSdkInput

class AyxSdkInput(PluginV2):
    def __init__(self, provider: AMPProviderV2) -> None:
        self.provider = provider
        self.config_value = 0.42
        self.provider.io.info("Plugin initialized.")

    def on_incoming_connection_complete(self, anchor: namedtuple) -> None:
        raise NotImplementedError("Input tools don't receive batches.")

    def on_record_batch(self, batch: "Table", anchor: namedtuple) -> None:
        raise NotImplementedError("Input tools don't receive batches.")

    def on_complete(self) -> None:
        import pandas as pd
        import pyarrow as pa

        df = pd.DataFrame(
            {
                "x": [1, 2, 3],
                "y": ["hello", "world", "from ayx_python_sdk!"],
                "z": [self.config_value, self.config_value, self.config_value],
            }
        )

        packet = pa.Table.from_pandas(df)

        self.provider.write_to_anchor("Output", packet)
        self.provider.io.info("AyxSdkInput tool done.")

 

AyxSdkMultiConnectionsMultiOutputAnchor

class AyxSdkMultiConnectionsMultiOutputAnchor(PluginV2):
    def __init__(self, provider: "AMPProviderV2") -> None:
        self.provider = provider
        self.provider.io.info("Plugin initialized.")
        try:
            self.conn_to_output = self.get_conn_map()
        except Exception as e:
            self.provider.io.error(f"Failed to set conn map {repr(e)}")

    def get_conn_map(self):
        connections = self.provider.incoming_anchors["Input"].keys()
        conn_to_output = {}
        for name in connections:
            conn_num = int(name.strip("#"))
            if conn_num >= 5:
                conn_to_output[name] = f"Output5"
            else:
                conn_to_output[name] = f"Output{conn_num}"
        return conn_to_output

    def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None:
        try:
            out_anchor = self.conn_to_output[anchor.connection]
            self.provider.write_to_anchor(out_anchor, table)
        except Exception as e:
            self.provider.io.warn(
                f"Failed to write batch to output anchor. \n{repr(e)}"
            )

    def on_complete(self) -> None:
        self.provider.io.info("AyxSdkMultiConnectionsMultiOutputAnchor tool done.")

 

AyxAdkMultipleInputAnchors 

class AyxSdkMultipleInputAnchors(PluginV2):
    def __init__(self, provider: "AMPProviderV2") -> None:
        self.provider = provider
        self.output_anchor_name = "Output"
        self.provider.io.info("Plugin initialized.")

    def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None:
        try:
            self.provider.write_to_anchor(self.output_anchor_name, table)
        except Exception as e:
            self.provider.io.warn(
                f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}"
            )

    def on_complete(self) -> None:
        self.provider.io.info("AyxSdkMultipleInputAnchors tool done.")

 

AyxSdkMultipleOutputAnchors

class AyxSdkMultipleOutputAnchors(PluginV2):
    def __init__(self, provider: "AMPProviderV2"):
        self.provider = provider
        self.provider.io.info("AyxSdkMultipleOutputAnchors tool started.")

    def on_record_batch(self, batch: "Table", anchor: "Anchor") -> None:
        metadata = batch.schema
        if not any([field_name == "Value" for field_name in metadata.names]):
            raise RuntimeError(
                "Incoming data must contain a column with the name 'Value'"
            )
        input_dataframe = batch.to_pandas()

        if not is_integer_dtype(input_dataframe["Value"]):
            raise RuntimeError("'Value' column must be of 'int' data type")

        grouped = input_dataframe.groupby("Value")
        odds = grouped.filter(lambda row: (row["Value"] % 2 == 1).any())
        evens = grouped.filter(lambda row: (row["Value"] % 2 == 0).any())

        odd_batch = RecordBatch.from_pandas(odds, preserve_index=False)
        even_batch = RecordBatch.from_pandas(evens, preserve_index=False)
        self.provider.write_to_anchor("Output1", odd_batch)
        self.provider.write_to_anchor("Output2", even_batch)

    def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_complete(self) -> None:
        self.provider.io.info("AyxSdkMultipleOutputAnchors tool done.")

 

AyxSdkOptionalInputAnchor

class AyxSdkOptionalInputAnchor(PluginV2):
    def __init__(self, provider: "AMPProviderV2") -> None:
        self.provider = provider
        self.config_value = 0.42
        self.output_anchor = self.provider.outgoing_anchors["Output"]
        self.input_anchor = self.provider.incoming_anchors["Input"]
        self.provider.io.info("Plugin initialized.")

    def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_record_batch(self, batch: "pa.Table", anchor: "Anchor") -> None:
        try:
            self.provider.write_to_anchor("Output", batch)
        except Exception as e:
            self.provider.io.warn(
                f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}"
            )

    def on_complete(self) -> None:
        if len(self.input_anchor.keys()) == 0:
            import pandas as pd

            df = pd.DataFrame({"OptionalField": [self.config_value]})
            batch_to_send = pa.RecordBatch.from_pandas(df=df, preserve_index=False)
            self.provider.write_to_anchor("Output", batch_to_send)

        self.provider.io.info("AyxSdkOptionalInputAnchor tool done.")

 

AyxSdkOutput

class AyxSdkOutput(PluginV2):
    def __init__(self, provider: AMPProviderV2):
        self.provider = provider
        self.provider.io.info("AyxSdkOutput tool started")

    def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None:
        self.provider.io.info(batch.to_string().replace("\n", " | "))

    def on_incoming_connection_complete(self, anchor: Anchor) -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_complete(self) -> None:
        self.provider.io.info(f"AyxSdkOutput tool done.")

 

AyxSdkPassThrough

class AyxSdkPassThrough(PluginV2):
    def __init__(self, provider: AMPProviderV2):
        self.name = "AyxSdkPassThrough"
        self.provider = provider
        self.provider.io.info(f"{self.name} tool started")

    def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None:
        self.provider.write_to_anchor("Output", batch)

    def on_incoming_connection_complete(self, anchor: Anchor) -> None:
        self.provider.io.info(
            f"Received complete update from {anchor.name}:{anchor.connection}."
        )

    def on_complete(self) -> None:
        self.provider.io.info(f"{self.name} tool done.")