Internal Data Ingestion¶
Dask contains internal tools for extensible data ingestion in the
dask.bytes
package. These functions are developer-focused rather than for
direct consumption by users. These functions power user facing functions like
``dd.read_csv`` and ``db.read_text`` which are probably more useful for most
users.
read_bytes (urlpath[, delimiter, not_zero, …]) |
Given a path or paths, return delayed objects that read from those paths. |
open_files (urlpath[, mode, compression, …]) |
Given a path or paths, return a list of OpenFile objects. |
These functions are extensible in their output formats (bytes, file objects), their input locations (file system, S3, HDFS), line delimiters, and compression formats.
These functions provide data as dask.delayed
objects. These objects either
point to blocks of bytes (read_bytes
) or open file objects
(open_files
). They can handle different compression formats by prepending
protocols like s3://
or hdfs://
. They handle compression formats
listed in the dask.bytes.compression
module.
These functions are not used for all data sources. Some data sources like HDF5 are quite particular and receive custom treatment.
Delimiters¶
The read_bytes
function takes a path (or globstring of paths) and produces
a sample of the first file and a list of delayed objects for each of the other
files. If passed a delimiter such as delimiter=b'\n'
it will ensure that
the blocks of bytes start directly after a delimiter and end directly before a
delimiter. This allows other functions, like pd.read_csv
, to operate on
these delayed values with expected behavior.
These delimiters are useful both for typical line-based formats (log files, CSV, JSON) as well as other delimited formats like Avro, which may separate logical chunks by a complex sentinel string.
Locations¶
These functions dispatch to other functions that handle different storage backends, like S3 and HDFS. These storage backends register themselves with protocols and so are called whenever the path is prepended with a string like the following:
s3://bucket/keys-*.csv
The various back-ends accept optional extra keywords, detailing authentication and other parameters, see remote data services
Compression¶
These functions support widely available compression technologies like gzip
,
bz2
, xz
, snappy
, and lz4
. More compressions can be easily
added by inserting functions into dictionaries available in the
dask.bytes.compression
module. This can be done at runtime and need not be
added directly to the codebase.
However, not all compression technologies are available for all functions. In
particular, compression technologies like gzip
do not support efficient
random access and so are useful for streaming open_files
but not useful for
read_bytes
which splits files at various points.
Functions¶
-
dask.bytes.
read_bytes
(urlpath, delimiter=None, not_zero=False, blocksize=134217728, sample=True, compression=None, **kwargs)¶ Given a path or paths, return delayed objects that read from those paths.
The path may be a filename like
'2015-01-01.csv'
or a globstring like'2015-*-*.csv'
.The path may be preceded by a protocol, like
s3://
orhdfs://
if those libraries are installed.This cleanly breaks data by a delimiter if given, so that block boundaries start directly after a delimiter and end on the delimiter.
Parameters: urlpath : string or list
Absolute or relative filepath(s). Prefix with a protocol like
s3://
to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.delimiter : bytes
An optional delimiter, like
b'\n'
on which to split blocks of bytes.not_zero : bool
Force seek of start-of-file delimiter, discarding header.
blocksize : int (=128MB)
Chunk size in bytes
compression : string or None
String like ‘gzip’ or ‘xz’. Must support efficient random access.
sample : bool or int
Whether or not to return a header sample. If an integer is given it is used as sample size, otherwise the default sample size is 10kB.
**kwargs : dict
Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.
Returns: sample : bytes
The sample header
blocks : list of lists of
dask.Delayed
Each list corresponds to a file, and each delayed object computes to a block of bytes from that file.
Examples
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n') >>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')
-
dask.bytes.
open_files
(urlpath, mode='rb', compression=None, encoding='utf8', errors=None, name_function=None, num=1, **kwargs)¶ Given a path or paths, return a list of
OpenFile
objects.Parameters: urlpath : string or list
Absolute or relative filepath(s). Prefix with a protocol like
s3://
to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol.mode : ‘rb’, ‘wt’, etc.
compression : string
Compression to use. See
dask.bytes.compression.files
for options.encoding : str
For text mode only
errors : None or str
Passed to TextIOWrapper in text mode
name_function : function or None
if opening a set of files for writing, those files do not yet exist, so we need to generate their names by formatting the urlpath for each sequence number
num : int [1]
if writing mode, number of files we expect to create (passed to name+function)
**kwargs : dict
Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.
Returns: List of
OpenFile
objects.Examples
>>> files = open_files('2015-*-*.csv') >>> files = open_files('s3://bucket/2015-*-*.csv.gz', compression='gzip')