I've built out an optimization algorithm in Python and I'm preparing to integrate it into the Python SDK in Alteryx. So far, the documentation I've seen seems to be geared towards row-by-row processing of data, but my algorithm processes data in a batch format (all data must be present first). Does Alteryx have any examples/best practice suggestions for how to handle this using the SDK?
Solved! Go to Solution.
@jchadwick @SteveA whats your take on this?
Hi @jraad,
Great observation! The reference to each record will only exist in the ii_push_records method. The only way to preserve the incoming data in here would be to store it in memory in a data structure. Then you could apply your algorithm to this data and push the new records to the output anchor in the ii_close() method. Hope this helps!
Ozzie has it right. As records come in via ii_push_record you can store them either in memory or in a temporary file. After all the records from an input have been sent through, ii_close will be called, and at that time you can then run your records through your batch process.
Does anyone have an example of writing/reading from a temp file in this context?
Thanks
Greg
@gbonnette I don't know that a temp file is a good solution. Writing and Reading from disk can be expensive in terms processing time. Best to use a collection object like list or deque. Slightly more than pseudo code reduced to just the relevant stuff for buffering input records:
class IncomingInterface:
def __init__(self, parent: object):
self.records = deque()
self.record_info_in = None
def ii_init(self, record_info_in: object) -> bool:
self.record_info_in = record_info_in
self.record_info_in_clone = record_info_in.clone()
# Instantiate a new instance of the RecordCopier class.
self.record_copier = Sdk.RecordCopier(self.record_info_in_clone, self.record_info_in)
# Map each column of the input to where we want in the output.
for index in range(self.record_info_in.num_fields):
# Adding a field index mapping.
self.record_copier.add(index, index)
# Let record copier know that all field mappings have been added.
self.record_copier.done_adding()
return True
def ii_push_record(self, in_record: object) -> bool:
# Creating a new, empty record creator based on record_info_out's record layout.
record_creator = self.record_info_in_clone.construct_record_creator()
# Copy the data from the incoming record into the outgoing record.
record_creator.reset()
self.record_copier.copy(record_creator, in_record)
# Append the object
self.records.appendleft(record_creator)
return True
def ii_close(self):
# Process the records in a loop
while len(self.out_records) > 0:
buffer_record = self.records.pop().finalize_record()
#do something with the record
It is true that writing and reading from disk can be expensive. But keep in mind there is no real upper bound on the number of records that might pass through a tool. Trying to keep them all in memory may be prohibitive or even impossible. The ideal solution is to keep the records in memory up to a certain threshold and then start writing/reading from disk instead. We have some utilities internally that do this all seamlessly, and I've spoken with @TashaA about making it available in the Python SDK.
True enough. Disk is also a finite resource though in most physical or virtual environments. To realize "no real upper bound" requires other technology like Snowflake or Redshift Spectrum and set based operations instead of the cursors that we are effectively talking about here.
Exposing those utilities to the SDK would be awesome @MichaelCh! I say with 99% confidence that you guys will be able to handle memory management much better than we ever would...and that's not something I want to be good at, anyway.