Collections API¶
Collections¶
|
Superclass for streaming collections |
|
Map a function across all batch elements of this stream |
|
Accumulate a function with state across batch elements |
Verify elements that pass through this stream |
Batch¶
|
A Stream of tuples or lists |
|
Filter elements by a predicate |
|
Map a function across all elements |
|
Pick a field out of all elements |
Convert to a streaming dataframe |
|
Concatenate batches and return base Stream |
Dataframes¶
|
A Streaming Dataframe |
|
Groupby aggregations |
|
Compute rolling aggregations |
|
Assign new columns to this dataframe |
|
Sum frame. |
|
Average frame |
Cumulative sum |
|
Cumulative product |
|
Cumulative minimum |
|
Cumulative maximum |
|
Groupby aggregations on streaming dataframes |
|
Groupby-count |
|
Groupby-mean |
Groupby-size |
|
|
Groupby-std |
|
Groupby-sum |
|
Groupby-variance |
|
Rolling aggregations |
|
Rolling aggregation |
|
Rolling count |
Rolling maximum |
|
Rolling mean |
|
Rolling median |
|
Rolling minimum |
|
|
Rolling quantile |
|
Rolling standard deviation |
Rolling sum |
|
|
Rolling variance |
|
Sliding window operations |
|
Apply an arbitrary function over each window of data |
Count elements within window |
|
|
Groupby-aggregations within window |
Sum elements within window |
|
Number of elements within window |
|
|
Compute standard deviation of elements within window |
|
Compute variance of elements within window |
|
Rolling aggregation |
|
Rolling count |
Rolling maximum |
|
Rolling mean |
|
Rolling median |
|
Rolling minimum |
|
|
Rolling quantile |
|
Rolling standard deviation |
Rolling sum |
|
|
Rolling variance |
|
A streaming dataframe using the asyncio ioloop to poll a callback fn |
|
PeriodicDataFrame providing random values by default |
Details¶
- class streamz.collection.Streaming(stream=None, example=None, stream_type=None)¶
Superclass for streaming collections
Do not create this class directly, use one of the subclasses instead.
- Parameters
- stream: streamz.Stream
- example: object
An object to represent an example element of this stream
See also
streamz.dataframe.StreamingDataFrame
streamz.dataframe.StreamingBatch
- Attributes
- current_value
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements
map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream
register_api
([modifier, attribute_name])Add callable to Stream API
verify
(x)Verify elements that pass through this stream
emit
register_plugin_entry_point
start
stop
- accumulate_partitions(func, *args, **kwargs)¶
Accumulate a function with state across batch elements
See also
- static map_partitions(func, *args, **kwargs)¶
Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
- verify(x)¶
Verify elements that pass through this stream
- class streamz.batch.Batch(stream=None, example=None)¶
A Stream of tuples or lists
This streaming collection manages batches of Python objects such as lists of text or dictionaries. By batching many elements together we reduce overhead from Python.
This library is typically used at the early stages of data ingestion before handing off to streaming dataframes
Examples
>>> text = Streaming.from_file(myfile) >>> b = text.partition(100).map(json.loads)
- Attributes
- current_value
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements
filter
(predicate)Filter elements by a predicate
map
(func, **kwargs)Map a function across all elements
map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream
pluck
(ind)Pick a field out of all elements
register_api
([modifier, attribute_name])Add callable to Stream API
sum
()Sum elements
Convert to a streaming dataframe
Concatenate batches and return base Stream
verify
(x)Verify elements that pass through this stream
emit
register_plugin_entry_point
start
stop
- accumulate_partitions(func, *args, **kwargs)¶
Accumulate a function with state across batch elements
See also
Streaming.map_partitions
- filter(predicate)¶
Filter elements by a predicate
- map(func, **kwargs)¶
Map a function across all elements
- static map_partitions(func, *args, **kwargs)¶
Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
Streaming.accumulate_partitions
- pluck(ind)¶
Pick a field out of all elements
- classmethod register_api(modifier=<function identity>, attribute_name=None)¶
Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
You can also provide an optional
attribute_name
argument to control the name of the attribute your callable will be attached as.>>> @Stream.register_api(attribute_name="bar") ... class foo(Stream): ... ...
>> Stream().bar(…) # foo was actually attached as bar
- sum()¶
Sum elements
- to_dataframe()¶
Convert to a streaming dataframe
This calls
pd.DataFrame
on all list-elements of this stream
- to_stream()¶
Concatenate batches and return base Stream
Returned stream will be composed of single elements
- verify(x)¶
Verify elements that pass through this stream
- class streamz.dataframe.DataFrame(*args, **kwargs)¶
A Streaming Dataframe
This is a logical collection over a stream of Pandas dataframes. Operations on this object will translate to the appropriate operations on the underlying Pandas dataframes.
See also
Series
- Attributes
- columns
- current_value
- dtypes
- index
- plot
size
size of frame
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements
assign
(**kwargs)Assign new columns to this dataframe
count
([start])Count of frame
cummax
()Cumulative maximum
cummin
()Cumulative minimum
cumprod
()Cumulative product
cumsum
()Cumulative sum
groupby
(other)Groupby aggregations
map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream
mean
([start])Average frame
register_api
([modifier, attribute_name])Add callable to Stream API
Reset Index
rolling
(window[, min_periods, with_state, start])Compute rolling aggregations
round
([decimals])Round elements in frame
set_index
(index, **kwargs)Set Index
sum
([start])Sum frame.
tail
([n])Round elements in frame
to_frame
()Convert to a streaming dataframe
verify
(x)Verify consistency of elements that pass through this stream
window
([n, value, with_state, start])Sliding window operations
aggregate
astype
emit
ewm
expanding
map
query
register_plugin_entry_point
start
stop
- accumulate_partitions(func, *args, **kwargs)¶
Accumulate a function with state across batch elements
See also
Streaming.map_partitions
- assign(**kwargs)¶
Assign new columns to this dataframe
Alternatively use setitem syntax
Examples
>>> sdf = sdf.assign(z=sdf.x + sdf.y) >>> sdf['z'] = sdf.x + sdf.y
- count(start=None)¶
Count of frame
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- cummax()¶
Cumulative maximum
- cummin()¶
Cumulative minimum
- cumprod()¶
Cumulative product
- cumsum()¶
Cumulative sum
- from_periodic = <function PeriodicDataFrame>¶
- groupby(other)¶
Groupby aggregations
- static map_partitions(func, *args, **kwargs)¶
Map a function across all batch elements of this stream
The output stream type will be determined by the action of that function on the example
See also
Streaming.accumulate_partitions
- mean(start=None)¶
Average frame
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- random = <function Random>¶
- classmethod register_api(modifier=<function identity>, attribute_name=None)¶
Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
You can also provide an optional
attribute_name
argument to control the name of the attribute your callable will be attached as.>>> @Stream.register_api(attribute_name="bar") ... class foo(Stream): ... ...
>> Stream().bar(…) # foo was actually attached as bar
- reset_index()¶
Reset Index
- rolling(window, min_periods=1, with_state=False, start=())¶
Compute rolling aggregations
When followed by an aggregation method like
sum
,mean
, orstd
this produces a new Streaming dataframe whose values are aggregated over that window.The window parameter can be either a number of rows or a timedelta like ``”2 minutes”` in which case the index should be a datetime index.
This operates by keeping enough of a backlog of records to maintain an accurate stream. It performs a copy at every added dataframe. Because of this it may be slow if the rolling window is much larger than the average stream element.
- Parameters
- window: int or timedelta
Window over which to roll
- with_state: bool (False)
Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.
- start: () or resulting Python object type from the operation
Accepts a valid start state.
- Returns
- Rolling object
See also
DataFrame.window
more generic window operations
- round(decimals=0)¶
Round elements in frame
- set_index(index, **kwargs)¶
Set Index
- property size¶
size of frame
- sum(start=None)¶
Sum frame.
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- tail(n=5)¶
Round elements in frame
- to_frame()¶
Convert to a streaming dataframe
- verify(x)¶
Verify consistency of elements that pass through this stream
- window(n=None, value=None, with_state=False, start=None)¶
Sliding window operations
Windowed operations are defined over a sliding window of data, either with a fixed number of elements:
>>> df.window(n=10).sum() # sum of the last ten elements
or over an index value range (index must be monotonic):
>>> df.window(value='2h').mean() # average over the last two hours
Windowed dataframes support all normal arithmetic, aggregations, and groupby-aggregations.
- Parameters
- n: int
Window of number of elements over which to roll
- value: str
Window of time over which to roll
- with_state: bool (False)
Whether to return the state along with the result as a tuple (state, result). State may be needed downstream for a number of reasons like checkpointing.
- start: None or resulting Python object type from the operation
Accepts a valid start state.
See also
DataFrame.rolling
mimic’s Pandas rolling aggregations
Examples
>>> df.window(n=10).std() >>> df.window(value='2h').count()
>>> w = df.window(n=100) >>> w.groupby(w.name).amount.sum() >>> w.groupby(w.x % 10).y.var()
- class streamz.dataframe.Rolling(sdf, window, min_periods, with_state, start)¶
Rolling aggregations
This intermediate class enables rolling aggregations across either a fixed number of rows or a time window.
Examples
>>> sdf.rolling(10).x.mean() >>> sdf.rolling('100ms').x.mean()
Methods
aggregate
(*args, **kwargs)Rolling aggregation
count
(*args, **kwargs)Rolling count
max
()Rolling maximum
mean
()Rolling mean
median
()Rolling median
min
()Rolling minimum
quantile
(*args, **kwargs)Rolling quantile
std
(*args, **kwargs)Rolling standard deviation
sum
()Rolling sum
var
(*args, **kwargs)Rolling variance
- aggregate(*args, **kwargs)¶
Rolling aggregation
- count(*args, **kwargs)¶
Rolling count
- max()¶
Rolling maximum
- mean()¶
Rolling mean
- median()¶
Rolling median
- min()¶
Rolling minimum
- quantile(*args, **kwargs)¶
Rolling quantile
- std(*args, **kwargs)¶
Rolling standard deviation
- sum()¶
Rolling sum
- var(*args, **kwargs)¶
Rolling variance
- class streamz.dataframe.Window(sdf, n=None, value=None, with_state=False, start=None)¶
Windowed aggregations
This provides a set of aggregations that can be applied over a sliding window of data.
See also
DataFrame.window
contains full docstring
- Attributes
- columns
- dtypes
- example
- index
size
Number of elements within window
Methods
apply
(func)Apply an arbitrary function over each window of data
count
()Count elements within window
groupby
(other)Groupby-aggregations within window
mean
()Average elements within window
std
([ddof])Compute standard deviation of elements within window
sum
()Sum elements within window
Count groups of elements within window
var
([ddof])Compute variance of elements within window
aggregate
full
map_partitions
reset_index
- apply(func)¶
Apply an arbitrary function over each window of data
- count()¶
Count elements within window
- groupby(other)¶
Groupby-aggregations within window
- mean()¶
Average elements within window
- property size¶
Number of elements within window
- std(ddof=1)¶
Compute standard deviation of elements within window
- sum()¶
Sum elements within window
- value_counts()¶
Count groups of elements within window
- var(ddof=1)¶
Compute variance of elements within window
- class streamz.dataframe.GroupBy(root, grouper, index=None)¶
Groupby aggregations on streaming dataframes
Methods
count
([start])Groupby-count
mean
([with_state, start])Groupby-mean
size
()Groupby-size
std
([ddof])Groupby-std
sum
([start])Groupby-sum
var
([ddof])Groupby-variance
- count(start=None)¶
Groupby-count
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- mean(with_state=False, start=None)¶
Groupby-mean
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- size()¶
Groupby-size
- std(ddof=1)¶
Groupby-std
- sum(start=None)¶
Groupby-sum
- Parameters
- start: None or resulting Python object type from the operation
Accepts a valid start state.
- var(ddof=1)¶
Groupby-variance
- class streamz.dataframe.Random(freq='100ms', interval='500ms', dask=False, start=True, datafn=<function random_datablock>)¶
PeriodicDataFrame providing random values by default
Accepts same parameters as PeriodicDataFrame, plus freq, a string that will be converted to a pd.Timedelta and passed to the ‘datafn’.
Useful mainly for examples and docs.
- Attributes
- columns
- current_value
- dtypes
- index
- plot
size
size of frame
Methods
accumulate_partitions
(func, *args, **kwargs)Accumulate a function with state across batch elements
assign
(**kwargs)Assign new columns to this dataframe
count
([start])Count of frame
cummax
()Cumulative maximum
cummin
()Cumulative minimum
cumprod
()Cumulative product
cumsum
()Cumulative sum
from_periodic
groupby
(other)Groupby aggregations
map_partitions
(func, *args, **kwargs)Map a function across all batch elements of this stream
mean
([start])Average frame
random
register_api
([modifier, attribute_name])Add callable to Stream API
reset_index
()Reset Index
rolling
(window[, min_periods, with_state, start])Compute rolling aggregations
round
([decimals])Round elements in frame
set_index
(index, **kwargs)Set Index
sum
([start])Sum frame.
tail
([n])Round elements in frame
to_frame
()Convert to a streaming dataframe
verify
(x)Verify consistency of elements that pass through this stream
window
([n, value, with_state, start])Sliding window operations
aggregate
astype
emit
ewm
expanding
map
query
register_plugin_entry_point
start
stop