Source code for the Python SDK Examples?
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Notify Moderator
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Notify Moderator
Hey Nick! Are you looking for the Python SDK example tools in Designer, or the Platform SDK Example tools here: https://help.alteryx.com/developer-help/ayx-python-sdk-v2-example-tools ?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Notify Moderator
Hi Peter!
I'm new to Alteryx and not sure what the difference is between "Python SDK Example Tools in Designer" and the "Platform SDK Example Tools". The page you linked is listed under "Platform SDK", but it lists Example Tools that can be dragged onto a workflow in Designer.
All of our current pipelines are implemented in pure Python, and I'm working on migrating them to Alteryx. It would make sense for some steps in some of those pipelines to be implemented as Custom Python Tools that I can drag onto a workflow. So I'm looking for the Python source code that the Custom Tools in your link are implemented from, to help me get started.
Thanks,
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Notify Moderator
I've finally found the source code for these examples! They are found in the `ayx_python_sdk` package under `/examples`. This is usually
- C:\Users\<YourUsername>\AppData\Local\Programs\Python\Python38\Lib\site-packages\ayx_python_sdk\examples
Here they are with comments removed:
AyxSdkInput
class AyxSdkInput(PluginV2):
def __init__(self, provider: AMPProviderV2) -> None:
self.provider = provider
self.config_value = 0.42
self.provider.io.info("Plugin initialized.")
def on_incoming_connection_complete(self, anchor: namedtuple) -> None:
raise NotImplementedError("Input tools don't receive batches.")
def on_record_batch(self, batch: "Table", anchor: namedtuple) -> None:
raise NotImplementedError("Input tools don't receive batches.")
def on_complete(self) -> None:
import pandas as pd
import pyarrow as pa
df = pd.DataFrame(
{
"x": [1, 2, 3],
"y": ["hello", "world", "from ayx_python_sdk!"],
"z": [self.config_value, self.config_value, self.config_value],
}
)
packet = pa.Table.from_pandas(df)
self.provider.write_to_anchor("Output", packet)
self.provider.io.info("AyxSdkInput tool done.")
AyxSdkMultiConnectionsMultiOutputAnchor
class AyxSdkMultiConnectionsMultiOutputAnchor(PluginV2):
def __init__(self, provider: "AMPProviderV2") -> None:
self.provider = provider
self.provider.io.info("Plugin initialized.")
try:
self.conn_to_output = self.get_conn_map()
except Exception as e:
self.provider.io.error(f"Failed to set conn map {repr(e)}")
def get_conn_map(self):
connections = self.provider.incoming_anchors["Input"].keys()
conn_to_output = {}
for name in connections:
conn_num = int(name.strip("#"))
if conn_num >= 5:
conn_to_output[name] = f"Output5"
else:
conn_to_output[name] = f"Output{conn_num}"
return conn_to_output
def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None:
try:
out_anchor = self.conn_to_output[anchor.connection]
self.provider.write_to_anchor(out_anchor, table)
except Exception as e:
self.provider.io.warn(
f"Failed to write batch to output anchor. \n{repr(e)}"
)
def on_complete(self) -> None:
self.provider.io.info("AyxSdkMultiConnectionsMultiOutputAnchor tool done.")
AyxAdkMultipleInputAnchors
class AyxSdkMultipleInputAnchors(PluginV2):
def __init__(self, provider: "AMPProviderV2") -> None:
self.provider = provider
self.output_anchor_name = "Output"
self.provider.io.info("Plugin initialized.")
def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_record_batch(self, table: "pa.Table", anchor: "Anchor") -> None:
try:
self.provider.write_to_anchor(self.output_anchor_name, table)
except Exception as e:
self.provider.io.warn(
f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}"
)
def on_complete(self) -> None:
self.provider.io.info("AyxSdkMultipleInputAnchors tool done.")
AyxSdkMultipleOutputAnchors
class AyxSdkMultipleOutputAnchors(PluginV2):
def __init__(self, provider: "AMPProviderV2"):
self.provider = provider
self.provider.io.info("AyxSdkMultipleOutputAnchors tool started.")
def on_record_batch(self, batch: "Table", anchor: "Anchor") -> None:
metadata = batch.schema
if not any([field_name == "Value" for field_name in metadata.names]):
raise RuntimeError(
"Incoming data must contain a column with the name 'Value'"
)
input_dataframe = batch.to_pandas()
if not is_integer_dtype(input_dataframe["Value"]):
raise RuntimeError("'Value' column must be of 'int' data type")
grouped = input_dataframe.groupby("Value")
odds = grouped.filter(lambda row: (row["Value"] % 2 == 1).any())
evens = grouped.filter(lambda row: (row["Value"] % 2 == 0).any())
odd_batch = RecordBatch.from_pandas(odds, preserve_index=False)
even_batch = RecordBatch.from_pandas(evens, preserve_index=False)
self.provider.write_to_anchor("Output1", odd_batch)
self.provider.write_to_anchor("Output2", even_batch)
def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_complete(self) -> None:
self.provider.io.info("AyxSdkMultipleOutputAnchors tool done.")
AyxSdkOptionalInputAnchor
class AyxSdkOptionalInputAnchor(PluginV2):
def __init__(self, provider: "AMPProviderV2") -> None:
self.provider = provider
self.config_value = 0.42
self.output_anchor = self.provider.outgoing_anchors["Output"]
self.input_anchor = self.provider.incoming_anchors["Input"]
self.provider.io.info("Plugin initialized.")
def on_incoming_connection_complete(self, anchor: "Anchor") -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_record_batch(self, batch: "pa.Table", anchor: "Anchor") -> None:
try:
self.provider.write_to_anchor("Output", batch)
except Exception as e:
self.provider.io.warn(
f"Error Occured while writing to anchor {anchor.name} \n {repr(e)}"
)
def on_complete(self) -> None:
if len(self.input_anchor.keys()) == 0:
import pandas as pd
df = pd.DataFrame({"OptionalField": [self.config_value]})
batch_to_send = pa.RecordBatch.from_pandas(df=df, preserve_index=False)
self.provider.write_to_anchor("Output", batch_to_send)
self.provider.io.info("AyxSdkOptionalInputAnchor tool done.")
AyxSdkOutput
class AyxSdkOutput(PluginV2):
def __init__(self, provider: AMPProviderV2):
self.provider = provider
self.provider.io.info("AyxSdkOutput tool started")
def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None:
self.provider.io.info(batch.to_string().replace("\n", " | "))
def on_incoming_connection_complete(self, anchor: Anchor) -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_complete(self) -> None:
self.provider.io.info(f"AyxSdkOutput tool done.")
AyxSdkPassThrough
class AyxSdkPassThrough(PluginV2):
def __init__(self, provider: AMPProviderV2):
self.name = "AyxSdkPassThrough"
self.provider = provider
self.provider.io.info(f"{self.name} tool started")
def on_record_batch(self, batch: "pa.Table", anchor: Anchor) -> None:
self.provider.write_to_anchor("Output", batch)
def on_incoming_connection_complete(self, anchor: Anchor) -> None:
self.provider.io.info(
f"Received complete update from {anchor.name}:{anchor.connection}."
)
def on_complete(self) -> None:
self.provider.io.info(f"{self.name} tool done.")