Source code for wolframclient.serializers.encoders.pyarrow
from __future__ import absolute_import, print_function, unicode_literals
from wolframclient.utils.api import pyarrow
from wolframclient.utils.dispatch import Dispatch
encoder = Dispatch()
def _encode_batches(serializer, batches, schema):
buffer = pyarrow.BufferOutputStream()
stream = pyarrow.ipc.new_stream(buffer, schema)
for batch in batches or (
pyarrow.RecordBatch.from_arrays([[] for _ in schema], schema=schema),
):
stream.write_batch(batch)
return serializer.serialize_function(
serializer.serialize_symbol(b"ImportByteArray"),
(
serializer.serialize_bytes(buffer.getvalue()),
serializer.serialize_string("ArrowIPC"),
),
)
[docs]
@encoder.dispatch(pyarrow.RecordBatch)
def encoder_pyarrow_batch(serializer, batch):
return _encode_batches(serializer, (batch,), batch.schema)
[docs]
@encoder.dispatch(pyarrow.Table)
def encoder_pyarrow_table(serializer, table):
return _encode_batches(serializer, table.to_batches(), table.schema)