Dev Space

Customize and extend the power of Alteryx with SDKs, APIs, custom tools, and more.

RecordRef blob

tlarsen7572
11 - Bolide
11 - Bolide

Is there a fast way of obtaining the underlying blob of RecordRef objects, and then re-creating a RecordRef object from a single blob?

 

I am playing around with caching incoming records, but am running into issues with the I/O part of the process.  My initial thought was to use the Field.get_as_  methods to obtain the underlying data, but these are proving to be very slow for the datasets I am working with (a prototype in C# was significantly faster than its Python counterpart...like 1 second vs 21 seconds for 10,000 records).  Probably something about crossing the barrier between c++ and Python?

 

I also tried creating a RecordInfo object with a single blob field hoping that it would give me a blob of the entire record, but these field objects seem to be smart enough to know that the underlying data is broken up into multiple fields.

 

Ideally, I just want to take the record byte data, save it into a file, read it back, and push it to downstream records. Is it possible to do this quickly in pure Python?

7 REPLIES 7
AshleyO
6 - Meteoroid

Hi @tlarsen7572, I've noticed something similar: crossing from Python<->Alteryx is relatively slow.

 

I'm still investigating but I've found that iterating a RecordInfo object to get the Fields is part of the problem.

As a workaround, in ii_init cache the list of Fields: self.record_info_in_list = list(record_info_in). Then in ii_push_record iterate on record_info_in_list rather than record_info_in.

 

Furthermore every field.get_* and field.set* method is actually a wrapper around an Alteryx get/set call which may slow things down.

If you try to push data that requires a poor conversion you get data conversion errors in the Alteryx message log.

There is actually a bug in these error messages: there isn't a limit on the number of messages (unlike most tools that stop reporting after 10messages) and as such it just floods the console and slows down processing.

tlarsen7572
11 - Bolide
11 - Bolide

Hi @AshleyO, thanks for sharing!

 

I really like the idea of caching the fields...I had a nagging feeling looking up the fields and checking the field type was an expensive operation.  I am curious to see if there is any improvement to be had by popping them into a list during initialization.

 

I haven't yet noticed any data conversion errors.  But I get the feeling that all of the field.get_* methods are actually copying the data from C++ across the bridge to Python, rather than simply giving us a reference to the underlying data.  Which makes sense now that you mention conversion errors. But copying all of that data must be incredibly slow...not to mention the cost of conversion.  I wish we could get a reference to the byte array/blob in C++ so we can reference the data without any conversions or transformations.  This means I could feed this reference to my I/O operations and let them operate without having to copy the data until it is written to the disk.

AshleyO
6 - Meteoroid

I'm just going to leave this here. Good luck to anybody who finds it useful. Don't call me if it breaks.

Spoiler
import AlteryxPythonSDK as Sdk

class AyxPlugin:
    def __init__(self, n_tool_id: int, alteryx_engine: object, output_anchor_mgr: object):
        self.n_tool_id = n_tool_id
        self.alteryx_engine = alteryx_engine
        self.output_anchor_mgr = output_anchor_mgr
        
        self.is_initialized = True

    def pi_init(self, str_xml: str):
        self.output_anchor = self.output_anchor_mgr.get_output_anchor('Output')

    def pi_add_incoming_connection(self, str_type: str, str_name: str) -> object:
        self.single_input = IncomingInterface(self)
        return self.single_input

    def pi_add_outgoing_connection(self, str_name: str) -> bool:
        return True

    def pi_push_all_records(self, n_record_limit: int) -> bool:
        return False #must always have an input

    def pi_close(self, b_has_errors: bool):
        self.output_anchor.assert_close()

import ctypes

class PyVarObjectStruct(ctypes.Structure):
    _fields_ = [('ob_refcnt', ctypes.c_ssize_t),
                ('ob_type', ctypes.c_void_p),
                ('ob_size', ctypes.c_ssize_t)]
class RecordRefStruct(PyVarObjectStruct):
    _fields_ = [("n1", ctypes.c_void_p),#0
                ("n2", ctypes.c_void_p),#0
                ("p1", ctypes.c_void_p),#&p2
                ("p2", ctypes.c_void_p),
                ("n3", ctypes.c_void_p),#0
                ("p3", ctypes.c_void_p),#dig here
                ("p4", ctypes.c_void_p) ]
    def __repr__(self):
        return (f"RecordRefStruct(p1={self.p1},p2={self.p2},p3={self.p3},p4={self.p4} ,n1={self.n1},n2={self.n2},n3={self.n3})")
    @classmethod
    def wrap(cls, obj):
        assert isinstance(obj, Sdk.RecordRef)
        return cls.from_address(id(obj))

class RecordRefWrapper():
    def __init__(self,record):
        record_struct = RecordRefStruct.wrap(record)
        self.data_addr = ctypes.c_uint64.from_address(record_struct.p3).value
    
    def read_int64(self):
        v = ctypes.c_uint64.from_address(self.data_addr).value
        self.data_addr += 8+1
        return v
    
    def read_string(self):
        v = ctypes.cast(self.data_addr, ctypes.c_char_p).value
        self.data_addr += len(v)+1
        return v.decode("ascii") 
        
class IncomingInterface:
    def __init__(self, parent: object):
        self.parent = parent

        self.record_creator = None
        self.record_info_out = None

    def ii_init(self, record_info_in: object) -> bool:
        if not self.parent.is_initialized:
            return False

        self.record_info_in = list(record_info_in)
        record_info_out = Sdk.RecordInfo(self.parent.alteryx_engine)
        for idx,field in enumerate(self.record_info_in):
            record_info_out.add_field(field.name, field.type, field.size, field.scale)
        self.parent.output_anchor.init(record_info_out)
        self.record_creator = record_info_out.construct_record_creator()
        return True

    def ii_push_record(self, record: object) -> bool:
        if not self.parent.is_initialized:
            return False
        
        rc = self.record_creator
        rc.reset()
        ri = self.record_info_in
        
        r = RecordRefWrapper(record)
        ri[0].set_from_int64(rc,r.read_int64()) #assumes input record is int64,int64,int64,int64,string,string,string
        ri[1].set_from_int64(rc,r.read_int64()) #if your data format changes then good luck
        ri[2].set_from_int64(rc,r.read_int64())
        ri[3].set_from_int64(rc,r.read_int64())
        ri[4].set_from_string(rc,r.read_string())
        ri[5].set_from_string(rc,r.read_string())
        ri[6].set_from_string(rc,r.read_string())
        
        record_out = rc.finalize_record()
        
        return self.parent.output_anchor.push_record(record_out) #propagate error flag

    def ii_update_progress(self, d_percent: float):
        self.parent.alteryx_engine.output_tool_progress(self.parent.n_tool_id, d_percent)
        self.parent.output_anchor.update_progress(d_percent)

    def ii_close(self):
        self.parent.output_anchor.close()
tlarsen7572
11 - Bolide
11 - Bolide

Thanks for sharing @AshleyO!  That was one helluva rabbit hole you sent me down :)

 

I was able to get my caching logic working using your example.  Even got it handling any combination of incoming fields (except blobs, those fields are a bit weird).  Unfortunately, I am not seeing any performance improvement.  There is a performance penalty with this barrier that even this ctypes wrapper is not helping with.  And I don't know enough about C++ or ctypes to take it much further.

 

I will await a more official caching solution.  In the meantime, if anyone wants to look at the code I generated it is available here.  Maybe there is some low-hanging fruit I just cannot see.

cam_w
11 - Bolide

Upvoting this issue.

 

It would be good if we could cross the 'border' between Python data objects and the Alteryx engine faster. Perhaps using Python to map to Alteryx Engine Record objects needs to be substituted with a method in the Alteryx Engine that maps from the Python data structure (e.g. a python list in a specific format).

 

Ultimately, it'd be best to somehow reference the memory location of the python object, but not sure if this could be done.

cam_w
11 - Bolide

Perhaps Apache Arrow is the solution we need ... https://arrow.apache.org/

cam_w
11 - Bolide