Source code for wolframclient.serializers.wxfencoder.streaming

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

from wolframclient.utils import six
from wolframclient.utils.api import zlib
from wolframclient.utils.encoding import force_bytes


[docs]class ZipCompressedWriter(object): def __init__(self, writer): """ Write zip compressed data to a given buffer writer. """ self._compressor = zlib.compressobj() self._writer = writer
[docs] def flush(self): """ Must be called when closing or destroying the object.""" self._writer.write(self._compressor.flush())
[docs] def write(self, data): """ Write the compression of `data` to the underlying buffer writer. """ self._writer.write(self._compressor.compress(force_bytes(data)))
def __enter__(self): return self def __exit__(self, type, value, traceback): self.flush()
[docs]class ExactSizeReader(object): """ Read exactly the amount of bytes requested and fails otherwise.""" def __init__(self, reader): self._reader = reader
[docs] def read(self, size=-1): """Read from an underlying readable object. If size is negative the data is read until EOF. If it is 0 then b'' is returned. Otherwise the exact amount of bytes is read from the source. More than one call may be necessary. """ data = self._reader.read(size) # Negative values read until EOF and 0 returns b''. Both remain unchanged. # Also a fast path when the requested amount of bytes is returned in one go. if size <= 0 or len(data) == size: return data # need an intermediary buffer out_len = len(data) data = six.BytesIO(data) while out_len < size: chunk = self._reader.read(size - out_len) if chunk == b"": raise EOFError("Not enough data to read.") data.write(chunk) out_len = out_len + len(chunk) return data.getvalue()
[docs]class ZipCompressedReader(object): """A buffer implementation reading zip compressed data from a source buffer and returning uncompressed data. This class is instantiated from a reader, any object implementing a :meth:`~io.BufferedIOBase.read` method. """ CHUNK_SIZE = 8192 def __init__(self, reader): """ Read zip compressed data from a given buffer reader.""" self._compressor = zlib.decompressobj() self._reader = reader
[docs] def read(self, size=-1): """Read from a compressed stream of bytes and return the inflated byte sequence. Parameter `size` specifies the amount of bytes to return at most. If `size` is set to -1 then the reader is read until EOF is reached. """ if size is None or size < 0: chunk_size = -1 size = -1 else: chunk_size = ZipCompressedReader.CHUNK_SIZE out_data = six.BytesIO() out_len = 0 while True: # first step find try to find some data to uncompress. # sometimes some bytes are left over. We have to send them first to zlib. if self._compressor.unconsumed_tail != b"": data_in = self._compressor.unconsumed_tail else: # read more data from input reader. Read in chunk since we can't guess how # big the inflated result is. data_in = self._reader.read(chunk_size) # no more data is available. if data_in == b"": break # second step, decompress the new chunk if size > 0: chunk = self._compressor.decompress(data_in, size - out_len) else: chunk = self._compressor.decompress(data_in) # increment output len. out_len = out_len + len(chunk) # write to buffer out_data.write(chunk) # check requested size against output length. if size > 0 and out_len == size: break return out_data.getvalue()