Engine Works

Under the hood of Alteryx: tips, tricks and how-tos.
SydneyF
Alteryx Alumni (Retired)

SnakePlane might be the best thing to ever happen to the Python SDK. Ask anyone that’s used it. We are very excited to announce its public release on Github, making the development of Python SDK tools easier and more accessible than ever before.

snakeplane.jpgWhat is SnakePlane?

 

SnakePlane is an abstraction layer that simplifies using the Python SDK.

 

If you’ve ever worked with the Python SDK, you know that the experience can be… confusing. There are lines and lines of code with unfamiliar functions. A lot of these functions are wrappers for processes conducted in the Alteryx Engine… written in C++.

 

SnakePlane takes the SDK code and wraps it into concise utility functions that are accessed through decorators. With SnakePlane, using the Python SDK is a much more intuitive and streamlined experience.

 

For example, this tool that converts Hexidecimal to Unicode with the original Python SDK:

Spoiler
import AlteryxPythonSDK as Sdk
import xml.etree.ElementTree as Et

class AyxPlugin:
	"""
	Implements the plugin interface methods, to be utilized by the Alteryx engine to communicate with a plugin.
	Prefixed with "pi", the Alteryx engine will expect the below five interface methods to be defined.
	"""

	def __init__(self, n_tool_id: int, alteryx_engine: object, output_anchor_mgr: object):
		"""
		Constructor is called whenever the Alteryx engine wants to instantiate an instance of this plugin.
		:param n_tool_id: The assigned unique identification for a tool instance.
		:param alteryx_engine: Provides an interface into the Alteryx engine.
		:param output_anchor_mgr: A helper that wraps the outgoing connections for a plugin.
		"""

		# Default properties
		self.n_tool_id = n_tool_id
		self.alteryx_engine = alteryx_engine
		self.output_anchor_mgr = output_anchor_mgr
		
		self.output = "unicode_conv"
		self.output_type = Sdk.FieldType.wstring
		self.output_size = 1000

	def pi_init(self, str_xml: str):
		"""
		Handles building out the sort info, to pass into pre_sort() later on, from the user configuration.
		Called when the Alteryx engine is ready to provide the tool configuration from the GUI.
		:param str_xml: The raw XML from the GUI.
		"""

		if Et.fromstring(str_xml).find('FieldSelect') is not None:
			self.field_selection = Et.fromstring(str_xml).find('FieldSelect').text
		else:
			self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.error, 'Please select field to analyze')

		self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.info, self.field_selection)

		self.output_anchor = self.output_anchor_mgr.get_output_anchor('Output')  # Getting the output anchor from the XML file.
		
	def pi_add_incoming_connection(self, str_type: str, str_name: str) -> object:
		"""
		The IncomingInterface objects are instantiated here, one object per incoming connection, also pre_sort() is called here.
		Called when the Alteryx engine is attempting to add an incoming data connection.
		:param str_type: The name of the input connection anchor, defined in the Config.xml file.
		:param str_name: The name of the wire, defined by the workflow author.
		:return: The IncomingInterface object(s).
		"""

		self.single_input = IncomingInterface(self)
		return self.single_input

	def pi_add_outgoing_connection(self, str_name: str) -> bool:
		"""
		Called when the Alteryx engine is attempting to add an outgoing data connection.
		:param str_name: The name of the output connection anchor, defined in the Config.xml file.
		:return: True signifies that the connection is accepted.
		"""

		return True
	def pi_push_all_records(self, n_record_limit: int) -> bool:
		"""
		Called when a tool has no incoming data connection.
		:param n_record_limit: Set it to <0 for no limit, 0 for no records, and >0 to specify the number of records.
		:return: True for success, False for failure.
		"""

		self.alteryx_engine.output_message(self.n_tool_id, Sdk.EngineMessageType.error, self.xmsg('Missing Incoming Connection'))
		return False

	def pi_close(self, b_has_errors: bool):
		"""
		Called after all records have been processed..
		:param b_has_errors: Set to true to not do the final processing.
		"""

		self.output_anchor.assert_close()  # Checks whether connections were properly closed.	 
		
class IncomingInterface:
	"""
	This optional class is returned by pi_add_incoming_connection, and it implements the incoming interface methods, to
	be utilized by the Alteryx engine to communicate with a plugin when processing an incoming connection.
	Prefixed with "ii", the Alteryx engine will expect the below four interface methods to be defined.
	"""

	def __init__(self, parent: object):
		"""
		Constructor for IncomingInterface.
		:param parent: AyxPlugin
		"""

		# Default properties
		self.parent = parent
 
		# Custom properties
		self.record_copier = None
		self.record_creator = None

	def ii_init(self, record_info_in: object) -> bool:
		"""
		Called to report changes of the incoming connection's record metadata to the Alteryx engine.
		:param record_info_in: A RecordInfo object for the incoming connection's fields.
		:return: True for success, otherwise False.
		"""

		# Returns a new, empty RecordCreator object that is identical to record_info_in.
		record_info_out = record_info_in.clone()

		# Adds field to record with specified name and output type.
		#record_info_out.add_field(self.parent.out_name, self.parent.out_type, self.parent.out_size)

		record_info_out.add_field(self.parent.output, self.parent.output_type, self.parent.output_size)

		# Lets the downstream tools know what the outgoing record metadata will look like, based on record_info_out.
		self.parent.output_anchor.init(record_info_out)

		# Creating a new, empty record creator based on record_info_out's record layout.
		self.record_creator = record_info_out.construct_record_creator()

		# Instantiate a new instance of the RecordCopier class.
		self.record_copier = Sdk.RecordCopier(record_info_out, record_info_in)

		# Map each column of the input to where we want in the output.
		for index in range(record_info_in.num_fields):
			# Adding a field index mapping.
			self.record_copier.add(index, index)

		# Let record copier know that all field mappings have been added.
		self.record_copier.done_adding()

		# Grab the index of our new field in the record, so we don't have to do a string lookup on every push_record.
		#self.parent.out_field = record_info_out[record_info_out.get_field_num(self.parent.out_name)]

		self.parent.output = record_info_out[record_info_out.get_field_num(self.parent.output)]

		# Grab the index of our input field in the record, so we don't have to do a string lookup on every push_record.
		self.parent.input_field = record_info_out[record_info_out.get_field_num(self.parent.field_selection)]

		return True

	def ii_push_record(self, in_record: object) -> bool:
		"""
		Responsible for pushing records out
		Called when an input record is being sent to the plugin.
		:param in_record: The data for the incoming record.
		:return: False if method calling limit (record_cnt) is hit.
		"""
		# Copy the data from the incoming record into the outgoing record.
		self.record_creator.reset()
		self.record_copier.copy(self.record_creator, in_record)
		
		if self.parent.input_field.get_as_string(in_record) is not None:
			
			def unicodeTrans(msg):
				new_msg = []
				for char in msg:
					try:
						char = chr(ord(char))
					except ValueError:
						char = '?'
					new_msg.append(char)
				return ''.join(new_msg)
			
			text = self.parent.input_field.get_as_string(in_record)
			plz = bytes(text, 'utf-8').decode("unicode_escape")
			result = unicodeTrans(plz)
			self.parent.output.set_from_string(self.record_creator, result)
			out_record = self.record_creator.finalize_record()
		 
		# Push the record downstream and quit if there's a downstream error.
		if not self.parent.output_anchor.push_record(out_record):
			return False

		return True

	def ii_update_progress(self, d_percent: float):
		"""
		Called by the upstream tool to report what percentage of records have been pushed.
		:param d_percent: Value between 0.0 and 1.0.
		"""

		self.parent.alteryx_engine.output_tool_progress(self.parent.n_tool_id, d_percent)  # Inform the Alteryx engine of the tool's progress.
		self.parent.output_anchor.update_progress(d_percent)  # Inform the downstream tool of this tool's progress.

	def ii_close(self):
		"""
		Called when the incoming connection has finished passing all of its records.
		"""

		self.parent.output_anchor.output_record_count(True)  # True: Let Alteryx engine know that all records have been sent downstream.
		self.parent.output_anchor.close()  # Close outgoing connections.
		

With Snakeplane, the tool is transformed:

Spoiler
# Copyright (C) 2019 Alteryx, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

# 3rd Party Libraries
import AlteryxPythonSDK as sdk
from snakeplane.plugin_factory import PluginFactory

# Initialization of the plug in factory, used for making the AyxPlugin class
factory = PluginFactory("HexToUni")

@factory.initialize_plugin
def init(input_mgr, user_data, logger):
"""Initialize the example tool."""

# Get the selected value from the GUI and save it for later use in the user_data
user_data.input = (input_mgr.workflow_config["FieldSelect"])

initSuccess = True

if user_data.input is None:
logger.display_error_msg("Please select field to analyze")
initSuccess = False

return initSuccess


@factory.process_data(mode="stream", input_type="dataframe")
def process_data(input_mgr, output_mgr, user_data, logger😞
"""Run alpha-beta filtering and stream the results."""
# Get the input anchor
input_anchor = input_mgr["Input"][0]

# Get the input data. Since we're streaming, this is only a single row
data = input_anchor.data

#extract hexidecimal text
text = data.iloc[0][user_data.input]

def unicodeTrans(msg):
new_msg = []
for char in msg:
try:
char = chr(ord(char))
except ValueError:
char = '?'
new_msg.append(char)
return ''.join(new_msg)

plz = bytes(text, 'utf-8').decode("unicode_escape")

# Append our value to the data
data['unicode_conv'] = unicodeTrans(plz)

# Assign to the output
output_anchor = output_mgr["Output"]
output_anchor.data = data


@factory.build_metadata
def build_metadata(input_mgr, output_mgr, user_data, logger😞
"""Build metadata for this example tool."""
# Set up the metadata, we're adding a column of floats
input_anchor = input_mgr["Input"][0]
metadata = input_anchor.metadata

# Add the new column
metadata.add_column("unicode_conv",sdk.FieldType.wstring)

# Assign the metadata to the output
output_anchor = output_mgr["Output"]
output_anchor.metadata = metadata

AyxPlugin = factory.generate_plugin() 

Why is it called SnakePlane?

 

Short answer: Because it’s funny.

 

Have you ever wondered how the movie Snakes on a Plane was able to get approved to get made? Or how it was able to keep its goofy name? The short  answer is Samuel Jackson. Samuel Jackson saw the script, and wanted to be a part of it, and refused to do it unless the original title was kept.

 

Silly title. Silly movie. But clearly Samuel Jackson is awesome, and that’s what made it work.

 

snakes.jpg

 

SnakePlane is an abstraction layer wrapped around the Python SDK. Pythons are a type of snake. Planes are things that hold things inside of them (at least I am guessing that was the logical connection). SnakePlane holds the Python SDK inside of it. A + B + Samuel Jackson = SnakePlane. There you go. The only difference is that SnakePlane is not a silly thing, it just has a silly name.

 

How does SnakePlane work?

 

SnakePlane uses a framework similar to Flask. The developer uses a PluginFactory class to build their plugin, and through interfaces to the plugin factory can specify their choice of options and custom functionality.

 

There are three functions that a developer must define when using SnakePlane: initialize_plugin defines the behavior that happens when the tool is initialized; process_data is where the central logic of the tool goes; and build_metadata is where you can define the schema of the tool’s output, allowing tools downstream to be configured in Designer without running the workflow.

 

These functions allow you to specify the Python SDK’s behavior, including whether records are read in row-by-row and processed one at a time (stream mode) or read in and processed all at once (batch mode), and if the input records are read in as a list or a pandas data frame.

 

Why would I want to build a Python SDK tool instead of a macro with a Python tool?

 

There are pros and cons to each approach. The benefits of creating a tool with the Python SDK include typically faster processing times and a more native-like tool experience for end-users. It is easy and straightforward to package a Python SDK tool as a .yxi to share with other users. Possible cons are that the Python SDK does require more familiarity with Python programming, and the pre-SnakePlane SDK was a little less than intuitive and not necessarily easy to use.

 

How do I get started using SnakePlane?

 

The first step to working with SnakePlane is to get SnakePlane Pilot (a development environment for SnakePlane) set up on your machine. A tutorial for getting SnakePlane Pilot installed on your machine can be found here.

 

Let’s get going already!

 

Alright, I hear you, you’re ready to go for it! To help you get started, there is a three-part tutorial on getting SnakePlane Pilot set up on your machine, building a tool with SnakePlane, and using SnakePlane Pilot to debug and package your tool.

Comments
cam_w
11 - Bolide

I like a funny name as much as the next guy ... I can't wait to try this out! :)

mark_oshea
7 - Meteor

Firstly, Snakeplane is great! It has made creating custom python tools so much faster for me, and the tutorials you made were extremely helpful. I do have one issue though, which I can't seem to find the answer to anywhere in the documentation:

 

If I use a workflow with multiple Alteryx tools and only 1 custom tool, it works. 

But if I use 2 custom tools together in the same workflow, they both produce an error.

If I use a blocking tool between 2 custom tools, they both work again.

 

It seems that the output of custom tools do not have any form of default inbuilt blocking. I was wondering if you are aware of this, and if there is an easy fix that I am missing? I could just tell my users to always place blocking tools after each custom tool, but I would love to have the tools take care of that themselves.

JPKa
Alteryx Alumni (Retired)

@mark_oshea 

 

What version of Alteryx are you using?  There is a known issue in 2019.1 with multiple Python SDK tools stepping on each other.  

 

This is not a problem in 2018.4 or the upcoming 2019.2 release.  

mark_oshea
7 - Meteor

That makes sense, I am on 2019.1. Good to know it will be fixed soon. I can use blocking tools and/or a previous release until 2019.2 comes out. Thanks for clearing that up!