Source code for wolframclient.serializers.wxfencoder.streaming
from __future__ import absolute_import, print_function, unicode_literals
from wolframclient.utils.api import zlib
from wolframclient.utils.decorators import decorate
from wolframclient.utils.encoding import concatenate_bytes, force_bytes
[docs]
class ZipCompressedWriter:
def __init__(self, writer):
"""Write zip compressed data to a given buffer writer."""
self._compressor = zlib.compressobj()
self._writer = writer
[docs]
def flush(self):
"""Must be called when closing or destroying the object."""
self._writer.write(self._compressor.flush())
[docs]
def write(self, data):
"""Write the compression of `data` to the underlying buffer writer."""
self._writer.write(self._compressor.compress(force_bytes(data)))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.flush()
[docs]
class ExactSizeReader:
"""Read exactly the amount of bytes requested and fails otherwise."""
def __init__(self, reader):
self._reader = reader
[docs]
def read(self, size=-1):
"""Read from an underlying readable object.
If size is negative the data is read until EOF.
If it is 0 then b'' is returned.
Otherwise the exact amount of bytes is read from the source. More than
one call may be necessary.
"""
data = self._reader.read(size)
# Negative values read until EOF and 0 returns b''. Both remain unchanged.
# Also a fast path when the requested amount of bytes is returned in one go.
if size <= 0 or len(data) == size:
return data
return self._read_rest(data, size)
@decorate(concatenate_bytes)
def _read_rest(self, data, size=-1):
# need an intermediary buffer
out_len = len(data)
while out_len < size:
chunk = self._reader.read(size - out_len)
if not chunk:
raise EOFError("Not enough data to read.")
yield chunk
out_len += len(chunk)
[docs]
class ZipCompressedReader:
"""A buffer implementation reading zip compressed data from a source buffer and returning uncompressed data.
This class is instantiated from a reader, any object implementing a :meth:`~io.BufferedIOBase.read` method.
"""
CHUNK_SIZE = 8192
def __init__(self, reader):
"""Read zip compressed data from a given buffer reader."""
self._compressor = zlib.decompressobj()
self._reader = reader
[docs]
@decorate(concatenate_bytes)
def read(self, size=-1):
"""Read from a compressed stream of bytes and return the inflated byte sequence.
Parameter `size` specifies the amount of bytes to return at most. If `size` is set to -1
then the reader is read until EOF is reached.
"""
if size is None or size < 0:
chunk_size = -1
size = -1
else:
chunk_size = ZipCompressedReader.CHUNK_SIZE
out_len = 0
while True:
# first step find try to find some data to uncompress.
# sometimes some bytes are left over. We have to send them first to zlib.
if self._compressor.unconsumed_tail:
data_in = self._compressor.unconsumed_tail
else:
# read more data from input reader. Read in chunk since we can't guess how
# big the inflated result is.
data_in = self._reader.read(chunk_size)
# no more data is available.
if not data_in:
break
# second step, decompress the new chunk
if size > 0:
chunk = self._compressor.decompress(data_in, size - out_len)
else:
chunk = self._compressor.decompress(data_in)
# increment output len.
out_len += len(chunk)
# write to buffer
yield chunk
# check requested size against output length.
if size > 0 and out_len == size:
break