In our first blog post about streaming, we discussed how streaming libraries strengthen types to catch more errors. However, when we strengthen types, we need to be careful to not hinder program composition. After all, precise type information can make it more onerous to convince the compiler that a particular program is well formed. In this post, we show that streaming libraries handle this issue well, as they allow programs to be composed conveniently. To illustrate this point, we will discuss the most important composition forms for streaming programs and illustrate the explained concepts at the example of four concrete streaming libraries, namely pipes, conduit, streaming, and io-stream. This ensures that the discussed concepts are of a general nature and not specific to just one particular library.
Composition forms
Generally speaking, streaming libraries offer two flavors of composition: appending and pipelining. When appending, the values produced by one program are being followed by the values produced by another. When pipelining, some values produced by one streaming program are processed by another while the first program still has more values to produce.
Different libraries implement these forms of composition in different
ways. Some streaming libraries have a first-class notion of stream
processors, not streams. Generally, a stream processor
SP i o m r
gets a stream of values of type
i
, yields a stream of values of type o
, and finally yields a value of
type r
when it terminates. The parameter m
represents the monad
in which the effects of the processors are sequenced. For example, we have,
conduit: ConduitM i o m r
pipes: Pipe i o m r
pipes
and conduit
define the monad bind operation (>>=
) as an appending
composition. In p >>= \r -> q
, the input stream is first passed to
p
, and when p
completes the unconsumed elements from the input
stream are passed to q
.
As a result, the output stream of p
is followed by the output stream
of q
.
In contrast to pipes
and conduit
, the
streaming
and io-stream
packages manipulate streams directly.
Consequently, there is no notion of an input stream. Still, monadic
composition works in a similar manner in the case of the streaming
package. The
stream p >>= \r -> q
starts with the output stream of values produced
by p
, followed by the output stream of values produced by q
. The involved types are
streaming: Stream (Of a) m r
io-streams: InputStream a
Values of type Stream (Of a) m r
are streams of values of type a
,
which might be produced by performing effectful computations in the
monad m
. When there are no more elements of type a
, a value of type
r
is produced. Values of type InputStream a
are also streams of
values of type a
, and values might be produced by performing
computations in the monad IO
.
Although the package io-stream
does not define a monad instance (and hence, does not overload (>>=)
) for
InputStream
, it provides a separate function for the purpose of concatenating streams:
appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a)
The second form of composition, namely pipelining, is realised by the following two combinators in the libraries based on stream processors:
conduit: (.|) :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
pipes: (>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
In p .| q
, the output stream of p
is fed into q
as its input
stream and so it happens with p >-> q
as well. But there is a
difference in the types of the left operand.
In conduit
, the left operand yields ()
when it terminates. The right
operand is free to continue producing values in the output stream for as
long as it wants. Not so in pipes
, where the termination of the left
operand produces a value of type r
and causes the composition to
terminate and yield the same value.
In streaming
and io-streams
, pipelines are constructed
by composing functions which transform streams. For instance
streaming: map :: (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
io-streams: decompress :: InputStream ByteString -> IO (InputStream ByteString)
The function decompress takes an input stream of bytestrings carrying compressed data in the zlib format and outputs a stream of bytestrings carrying the result of decompression.
Leftovers and parsing
Most streaming libraries offer the ability to push back a value from the input. The returned inputs are sometimes called leftovers. This facilitates looking ahead in the input stream to peek at some elements needed to decide on alternative behaviors of the program.
To illustrate the need for looking ahead, consider the following conduit
program, which acts as a parser. It yields the number
of 'a'
characters in the input followed by the number of 'b'
characters.
abConduit :: Monad m => Consumer Char m (Int, Int)
abConduit =
(,) <$> (takeWhileC (== 'a') .| lengthC)
<*> (takeWhileC (== 'b') .| lengthC)
As conduit
implements leftovers, the first takeWhileC
can return an
element to the input stream upon discovering that it isn’t an 'a'
character. Then the following takeWhileC
has the opportunity to
examine that same element again.
However, it is important to note that pipelining composition only allows the left operand to put values
back. That is, in p .| q
, only p
can return values to
the input stream; q
’s leftovers are ignored. This is a constraint
stemming from the type of the composition:
(.|) :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
The input stream provides values of type a
, but the leftovers of the
right operand are of type b
. To address this, conduit
offers the
following composition operator that resolves the type mismatch with
a conversion function.
fuseLeftovers :: Monad m => ([b] -> [a]) -> ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
In contrast to conduit
, the pipes
package does not implement leftovers, but there is an additional
package pipes-parse,
which defines Parser
s in terms of the core
abstraction of Pipe
s. Parser
s in turn use lenses of the input to
return values back. This is what an implementation with pipes-parse
looks like for our example.
abPipes :: Monad m => Pipes.Parse.Parser Text m (Int, Int)
abPipes =
(,) <$> zoom (Pipes.Text.span (== 'a')) lengthP
<*> zoom (Pipes.Text.span (== 'b')) lengthP
lengthP :: (Monad m, Num n) => Pipes.Parse.Parser Text m n
lengthP =
Pipes.Parse.foldAll (\n txt -> n + fromIntegral (Text.length txt)) 0 id
The function zoom
is a primitive offered by packages implementing
lenses. It changes the state of a stateful computation in a given scope,
and a lens explains how the state is changed.
zoom :: Monad m => Lens' a b -> StateT b m c -> StateT a m c
In the case of zoom (Pipes.Text.span (== 'a')) p
, it changes the input
fed to the parser p
by only giving the greatest prefix containing
'a'
s. The rest of the input, even the first character failing the
predicate, is left for the next parser.
The parser lengthP
is defined to count the amount of characters in the
input. Thus
zoom (Pipes.Text.span (== 'a')) lengthP :: Parser Text m Int
stands for the same computation as
takeWhileC (== 'a') .| lengthC :: Consumer Char m Int
Turning our attention to the packages directly manipulating streams,
Stream
s from streaming
provide a function cons
to push values
back into a stream. The package io-streams
provides unRead
, for
the same purpose. While the cons
function just constructs a new
stream, the function unRead
modifies the input stream. We have,
cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r
unRead :: a -> InputStream a -> IO ()
More generally, you may wonder, whether streaming libraries should try to do the job of parsers, for which the
Haskell ecosystem already provides plenty of alternatives? To answer this question, consider that parsing
libraries like
parsec or
attoparsec
do not place any bounds on their memory consumption. In particular, the parsers of neither library
return a result value before the entire input has been parsed. For instance,
if a parser is parsing a list, it can’t yield the first elements of the
list as they are discovered. Instead, the list is handed back to the caller only
when all of it resides in memory. The attoparsec
package is even more
problematic, as it
retains all of the input,
no matter what the parser does.
More sophisticated parsing libraries can do better; e.g., uu-parsinglib. This package, however, returns the parser results lazily and so it is a challenge to test whether specific parts of the result are available without blocking the program. Overall, this means that to parse unbounded streams of input in a bounded amount of memory, we do actually require the streaming libraries to provides the discussed parser functionality. After all, they provide a crucial resource usage guarantee that we cannot obtain from conventional parser libraries.
Folds and non-linear uses of streams
Finally, we need to discuss reductions, where we consume a stream to compute a non-stream result. For example, consider the following naive and incorrect attempt at computing the average of a stream of values:
average0 :: Stream (Of Double) IO () -> IO Double
average0 xs = (/) <$> Streaming.Prelude.sum_ xs
<*> (fromIntegral <$> Streaming.Prelude.length_ xs)
The problem with this program is that the same stream is traversed twice. Even if the stream is not effectful, this program does not run in bounded memory. If the stream is, however, effectful, the situation is even worse, as each traversal produces different results leading to an inconsistent computation. To fix these problems, we need to compute the length and the average in a single pass.
average1 :: Stream (Of Double) IO () -> IO Double
average1 xs = uncurry (/) <$>
Streaming.Prelude.fold_ (\(!s, !c) d -> (s + 1, c + 1)) (0, 0) id xs
With conduit
and pipes
both the mistake as well as the required fix are
similar. Thus, streaming libraries seem to require that the programmer takes care to only
consume streaming sources in linear manner, that is, at most once in
a program.
Interestingly, conduit
offers an alternative solution, where existing
stream processors can be reused and combined using ZipSink
s instead of
explicitly using a fold
. A ZipSink
is a combination of stream
processors, all of which are fed the same input stream, and a final
result is produced from the outputs of all the output streams. This results in the following solution to our example,
which is quite close to the original naive attempt to solving the problem:
average2 :: Monad m => Consumer Double m Double
average2 = toConsumer $
getZipSink ((/) <$> ZipSink sumC <*> fmap fromIntegral (ZipSink lengthC))
Facilities, such as this, are one of advantages of libraries based on stream processors as opposed to those manipulating
streams directly.
ZipSink
s are an incarnation of the ideas in the
foldl package for stream
processors, while the ecosystem of pipes
offers the
pipes-transduce package.
Summary
While strengthening types to catch more errors, streaming libraries are still sufficiently
expressive to allow for many data flow patterns of streaming programs.
The basic compositional forms are complemented by additional concepts to cover even more
patterns. Leftovers, for instance, support parsing
challenges that cannot be easily delegated to standard parsing libraries while still keeping
an upper bound on the consumed memory. In turn, ZipSink
s simplify the sharing of a single input stream
by multiple stream processors.
About the author
Facundo is a software engineer supporting development and research projects at Tweag. Prior to joining Tweag, he worked in academia and in industry, on a varied assortment of domains, with an overarching interest in programming languages.
If you enjoyed this article, you might be interested in joining the Tweag team.