Hello,
Where can I view the source code of the Python SDK Example Tools? I can load the tools themselves, but am struggling to find any part of the interface that will allow me to view the Python that these tools are built from.
Thanks,
Nick
Hey Nick! Are you looking for the Python SDK example tools in Designer, or the Platform SDK Example tools here: https://help.alteryx.com/developer-help/ayx-python-sdk-v2-example-tools ?
Hi Peter!
I'm new to Alteryx and not sure what the difference is between "Python SDK Example Tools in Designer" and the "Platform SDK Example Tools". The page you linked is listed under "Platform SDK", but it lists Example Tools that can be dragged onto a workflow in Designer.
All of our current pipelines are implemented in pure Python, and I'm working on migrating them to Alteryx. It would make sense for some steps in some of those pipelines to be implemented as Custom Python Tools that I can drag onto a workflow. So I'm looking for the Python source code that the Custom Tools in your link are implemented from, to help me get started.
I've finally found the source code for these examples! They are found in the `ayx_python_sdk` package under `/examples`. This is usually
Here they are with comments removed:
AyxSdkInput
class AyxSdkInput(PluginV2): def __init__(self, provider: AMPProviderV2) -> None: self.provider = provider self.config_value = 0.42 self.provider.io.info("Plugin initialized.") def on_incoming_connection_complete(self, anchor: namedtuple) -> None: raise NotImplementedError("Input tools don't receive batches.") def on_record_batch(self, batch: "Table", anchor: namedtuple) -> None: raise NotImplementedError("Input tools don't receive batches.") def on_complete(self) -> None: import pandas as pd import pyarrow as pa df = pd.DataFrame( { "x": [1, 2, 3], "y": ["hello", "world", "from ayx_python_sdk!"], "z": [self.config_value, self.config_value, self.config_value], } ) packet = pa.Table.from_pandas(df) self.provider.write_to_anchor("Output", packet) self.provider.io.info("AyxSdkInput tool done.")
AyxSdkMultiConnectionsMultiOutputAnchor
class AyxSdkMultiConnectionsMultiOutputAnchor(PluginV2): def __init__(self, provider: "AMPProviderV2") -> None: self.provider = provider self.provider.io.info("Plugin initialized.") try: self.conn_to_output = self.get_conn_map() except Exception as e: self.provider.io.error(f"Failed to set conn map {repr(e)}") def get_conn_map(self): connections = self.provider.incoming_anchors["Input"].keys() conn_to_output = {} for name in connections: conn_num = int(name.strip("#")) if conn_num >= 5: conn_to_output[name] = f"Output5" else: conn_to_output[name] = f"Output{conn_num}" return conn_to_output def on_incoming_connection_complete(self, anchor: "Anchor") -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None: try: out_anchor = self.conn_to_output[anchor.connection] self.provider.write_to_anchor(out_anchor, table) except Exception as e: self.provider.io.warn( f"Failed to write batch to output anchor. \n{repr(e)}" ) def on_complete(self) -> None: self.provider.io.info("AyxSdkMultiConnectionsMultiOutputAnchor tool done.")
AyxAdkMultipleInputAnchors
class AyxSdkMultipleInputAnchors(PluginV2): def __init__(self, provider: "AMPProviderV2") -> None: self.provider = provider self.output_anchor_name = "Output" self.provider.io.info("Plugin initialized.") def on_incoming_connection_complete(self, anchor: "Anchor") -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None: try: self.provider.write_to_anchor(self.output_anchor_name, table) except Exception as e: self.provider.io.warn( f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}" ) def on_complete(self) -> None: self.provider.io.info("AyxSdkMultipleInputAnchors tool done.")
AyxSdkMultipleOutputAnchors
class AyxSdkMultipleOutputAnchors(PluginV2): def __init__(self, provider: "AMPProviderV2"): self.provider = provider self.provider.io.info("AyxSdkMultipleOutputAnchors tool started.") def on_record_batch(self, batch: "Table", anchor: "Anchor") -> None: metadata = batch.schema if not any([field_name == "Value" for field_name in metadata.names]): raise RuntimeError( "Incoming data must contain a column with the name 'Value'" ) input_dataframe = batch.to_pandas() if not is_integer_dtype(input_dataframe["Value"]): raise RuntimeError("'Value' column must be of 'int' data type") grouped = input_dataframe.groupby("Value") odds = grouped.filter(lambda row: (row["Value"] % 2 == 1).any()) evens = grouped.filter(lambda row: (row["Value"] % 2 == 0).any()) odd_batch = RecordBatch.from_pandas(odds, preserve_index=False) even_batch = RecordBatch.from_pandas(evens, preserve_index=False) self.provider.write_to_anchor("Output1", odd_batch) self.provider.write_to_anchor("Output2", even_batch) def on_incoming_connection_complete(self, anchor: "Anchor") -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_complete(self) -> None: self.provider.io.info("AyxSdkMultipleOutputAnchors tool done.")
AyxSdkOptionalInputAnchor
class AyxSdkOptionalInputAnchor(PluginV2): def __init__(self, provider: "AMPProviderV2") -> None: self.provider = provider self.config_value = 0.42 self.output_anchor = self.provider.outgoing_anchors["Output"] self.input_anchor = self.provider.incoming_anchors["Input"] self.provider.io.info("Plugin initialized.") def on_incoming_connection_complete(self, anchor: "Anchor") -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_record_batch(self, batch: "pa.Table", anchor: "Anchor") -> None: try: self.provider.write_to_anchor("Output", batch) except Exception as e: self.provider.io.warn( f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}" ) def on_complete(self) -> None: if len(self.input_anchor.keys()) == 0: import pandas as pd df = pd.DataFrame({"OptionalField": [self.config_value]}) batch_to_send = pa.RecordBatch.from_pandas(df=df, preserve_index=False) self.provider.write_to_anchor("Output", batch_to_send) self.provider.io.info("AyxSdkOptionalInputAnchor tool done.")
AyxSdkOutput
class AyxSdkOutput(PluginV2): def __init__(self, provider: AMPProviderV2): self.provider = provider self.provider.io.info("AyxSdkOutput tool started") def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None: self.provider.io.info(batch.to_string().replace("\n", " | ")) def on_incoming_connection_complete(self, anchor: Anchor) -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_complete(self) -> None: self.provider.io.info(f"AyxSdkOutput tool done.")
AyxSdkPassThrough
class AyxSdkPassThrough(PluginV2): def __init__(self, provider: AMPProviderV2): self.name = "AyxSdkPassThrough" self.provider = provider self.provider.io.info(f"{self.name} tool started") def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None: self.provider.write_to_anchor("Output", batch) def on_incoming_connection_complete(self, anchor: Anchor) -> None: self.provider.io.info( f"Received complete update from {anchor.name}:{anchor.connection}." ) def on_complete(self) -> None: self.provider.io.info(f"{self.name} tool done.")