Tweag
Technical groups
Dropdown arrow
Open source
Careers
Research
Blog
Contact
Consulting services
Technical groups
Dropdown arrow
Open source
Careers
Research
Blog
Contact
Consulting services

Functional data pipelines with funflow2

23 September 2021 — by Dorran Howell, Guillaume Desforges, , Vince Reuter

As the data science and machine learning fields have grown over the past decade, so has the number of data pipelining frameworks. What started out largely as a set of tools for extract-transform-load (ETL) processes has expanded into a diverse ecosystem of libraries, all of which aim to provide data teams with the ability to move and process their data. Apache Airflow, Beam, Luigi, Azkaban — the list goes on and on.

As users of several of the above frameworks, Tweag has a special interest in data pipelines. While working with them, we have observed a few common shortcomings which make writing and debugging data pipelines more complex than it needs to be. These shortcomings include limited composability of pipeline components, as well as minimal support for static analysis of pipelines. This second issue can be especially annoying when executing pipelines in a machine learning context, where pipelines are often defined as a Directed Acyclic Graph (DAG) of long-running tasks (e.g. training a model). In these situations, early identification of a pipeline doomed to fail due problems like configuration errors can spare great waste of compute time and resources.

Additionally, while many data pipeline frameworks provide some support for choosing the execution environment of the pipeline (e.g. running locally vs. on a Spark or Kubernetes cluster), not all provide control over the execution logic of the tasks themselves. This is a common limitation of workflow-oriented frameworks, like Apache Airflow, that couple a task’s definition to its execution logic by combining them in a single class. This lack of modularity makes it difficult to do things like write integration tests for pipeline components.

Enter: funflow2

funflow2 is the successor of funflow, a Haskell library for writing workflows in a functional style. funflow2 makes use of kernmantle to offer several advantages over the original funflow including greater extensibility, additional type-level controls, and a more feature-rich interpretation layer for analyzing pipelines before execution.

An ICFP Haskell Symposium 2020 paper provides a deep dive into kernmantle’s design and features.

funflow2 aims to address the limitations we have observed in other data pipeline frameworks while providing a simple, high-level API to express pipelines in a functional style. Let’s take a closer look.

Composability

In funflow2, a pipeline is represented by a Flow data type. Flows have the nice property of being composable, which allows pipeline subcomponents to be recombined with other components to create new pipelines.

To illustrate this point, we will compare two simple Apache Airflow and funflow2 pipelines, each with three sequential tasks that execute a function. In Airflow, this can be written as follows.

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
   "example1",
   schedule_interval=None,
   default_args={"start_date": datetime(2021, 10, 1)},
)

gen_data = PythonOperator(
   python_callable=lambda: 1,
   op_args=None,
   task_id="task-gen-data",
   dag=dag,
)

add_one = PythonOperator(
   python_callable=lambda x: x + 1,
   op_args=[gen_data.output],
   task_id="task-add-one",
   dag=dag,
)

print_result = PythonOperator(
   python_callable=lambda x: print(x),
   op_args=[add_one.output],
   task_id="task-print-result",
   dag=dag,
)

In Airflow it’s simple to define a DAG of tasks, but it’s tricky to re-use it among pipelines. Airflow provides support for a SubDagOperator to trigger other DAGs, but in the end we end up with two completely separate DAG objects, forcing us to manage them individually.

With Funflow2 reusability is much simpler. Since Flows are designed to be composable with Arrows, we can just write:

{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}

import Funflow (pureFlow, ioFlow)
import Control.Arrow ((>>>))

-- Note: the >>> operator is used to chain together two Flows sequentially
genData = pureFlow (const 1)
addOne = pureFlow (\x -> x + 1)
printResult = ioFlow (\x -> print x)
example1 = genData >>> addOne >>> printResult

anotherFlow = example1 >>> pureFlow (\_ -> "anotherFlow")

It’s as simple as that!

Identifying errors early

One of the most frustrating things that can happen when executing a pipeline of long-running tasks is to discover that one of the tasks towards the end of the pipeline was misconfigured - leading to wasted developer time and compute resources. Our original funflow package sought to alleviate this pain point by emphasizing resumability. Resumability was achieved through caching results as pipeline execution proceeded. funflow2 supports resumability using the same caching approach as the original funflow.

The potential time and resource savings provided by caching are limited, however, in at least two ways. First, since pipeline tasks are structured as atomic units, a failed pipeline cannot recover the work that was successfully completed within a failing task before the failure occurred. Perhaps even more importantly, with resumability a failed pipeline may still cause lost efficiency due to mental context switching. For instance, maybe you start a pipeline run, switch to another project (perhaps over multiple days), and then find that your pipeline has failed. You might find yourself asking the question, “what was I trying to achieve in the task that failed?” Earlier discovery of errors is less wasteful with mental resources and less stressful for a pipeline user.

As a pipeline author it is useful to identify misconfigurations early in the development process, either through compilation errors or through early failures at run-time. While many pipelining frameworks divide a program’s lifecycle into compile-time and run-time, kernmantle (and thus funflow) distinguishes three phases: compile-time, config-time and run-time. In the intermediate config-time phase, the pipeline is interpreted and configured which allow for static analysis. As we’ll see below, funflow2 makes use of both compile- and execution-time phases to ensure early failure.

Type errors, detectable at compilation time

There are a number of ways in which a pipeline author can accidentally misconfigure a pipeline. One example of misconfiguration is when there’s a mismatch between a task’s input type and the type of argument passed by an upstream task. This kind of issue can plague pipelines built with a library written in a dynamically typed language like Python (e.g. Airflow). It’s less common of an issue, though, for pipelines written in statically typed languages like Java or Scala (e.g. scio). Since funflow2 pipelines are written in Haskell, the language’s static type checking allows us to catch issues like this at compile time.

For example, the following pipeline will fail to compile since flow3 attempts to pass the string output of flow1 as an input to flow2, which expects an integer.

-- Note: `const` is a built in function which just returns the
-- specified value
flow1 :: Flow () String
flow1 = pureFlow (const "42")

flow2 :: Flow Integer Integer
flow2 = pureFlow (+1)
flow3 = flow1 >>> flow2
Couldn't match type ‘Integer’ with ‘String’

Value errors, detectable at config time

A more subtle and less easily detectable kind of misconfiguration occurs when a task is configured in a way that passes compile-time type checks but is guaranteed to be invalid at runtime. Funflow2 is built on top of kernmantle, which provides us with a convenient layer for extracting static information from tasks in a pre-execution phase called config-time, after the pipeline has been loaded but before any task has run. This layer can be used for early detection of errors in a pipeline and ensuring early failure if something is awry. For example, let’s try running a Flow which attempts to run a Docker container with an image tag that does not exist. This pipeline can be compiled without complaint but is doomed to fail at run-time.

{-# LANGUAGE OverloadedStrings #-}

import Funflow (dockerFlow)
import Funflow.Tasks.Docker (DockerTaskConfig (..), DockerTaskInput (..))


failingFlow = dockerFlow $
  DockerTaskConfig {
    image = "alpine:this-tag-does-not-exist",
    command = "echo",
    args = ["hello"]
  }

-- Flows created with dockerFlow expect a DockerTaskInput,
-- and we can just create an empty value for the sake of this
-- example.
emptyInput = DockerTaskInput {inputBindings = [], argsVals = mempty}


flow = ioFlow (\_ -> print "start of flow")
  >>> pureFlow (const emptyInput)          -- Constructs dockerFlow input
  >>> failingFlow
  >>> ioFlow (\_ -> print "end of flow")

Attempting to run this pipeline gives us the following error.

runFlow flow ()
Found docker images, pulling...
Pulling docker image: alpine:this-tag-does-not-exist
ImagePullError "Request failed with status 404: \"Not Found\" …"

Note that “start of flow” is never printed; the first task in the pipeline, which would print that, never actually executes. This is because funflow2’s default pipeline runner extracts the identifiers of all images required to run a pipeline at config-time and attempts to pull them before starting the actual pipeline run.

For another example of this fail-fast behavior when a pipeline is misconfigured, have a look at the configuration tutorial.

Aside: workflows and build systems

The connection between build systems and the kinds of task-based workflows / pipelines mentioned here is no secret and is a topic we have mentioned in previous blog posts. Despite serving different domains, workflow tools like funflow2 and build tools like Bazel or mill all seek to provide users with the ability to execute a graph of tasks in a repeatable way. Here we briefly consider a couple of key parallels between build systems and funflow2.

Early cutoff is a common property of build systems and refers to a build system’s ability to halt once it has determined that no dependency of a downstream task has changed since the most recent build1. While funflow2 is not a build system, it can be used to mimic one and even exhibits early cutoff, owing to its caching strategy using content-addressable storage. If the hash determined by the hash of a task and its inputs has not changed since a previous pipeline run, work need not be repeated.

Another property which distinguishes build systems is whether or not they support dynamic dependencies, or those for which the relationships themselves may vary with task output(s). This property depends on the build systems’ approach to modeling graphs of tasks, and in a functional paradigm is determined by whether or not tasks are modeled using monadic effects1. funflow2 uses arrows and not monads for composing its task graph and therefore falls within the camp of tools which do not support dynamic dependencies such as Bazel. The use of a static task graph is the key to pre-execution dependency collection and is what allows funflow2 to support the execution-free interpretation of tasks to detect invalid configuration. If tasks were linked in a monadic fashion, this would be impossible.

Modularity

In addition to limited composability and fail-late behavior, tight coupling between a task and its execution logic is another common weakness among existing task-based data pipelining frameworks: the data defining a task is inseparably bound to the specific logic for how the task is executed. In a closely coupled context, altering task execution logic requires entire redefinition of the task itself, e.g. by subclassing. This confounds granular testing and nudges pipeline validation toward an end-to-end style. The resulting coarse resolution view of a test failure complicates diagnosis and remedy of a bug.

In funflow2, an interpreter defines task execution logic and is separate from the task definition itself. Effectively, an interpreter transforms a task, which is treated as data, into an executable action. Separating concerns this way allows development of custom interpreters for integration testing, project-specific execution requirements, and more. Using interpreters, you can do things like flip between test and deployment execution contexts while working with the same pipeline definition. Furthermore, this modularity allows for a greater static analysis, yielding enhanced fail-fast behavior.

While an example implementation is out of scope for this post, an example of custom tasks and interpreters is available in the extensibility tutorial on the funflow2 project website.

Similarities with funflow

funflow2 uses a completely new backend (kernmantle) and therefore has major API differences from the original funflow. Nonetheless, many of the useful features of funflow are available in funflow2:

Arrow syntax

The most immediately evident similarity between the original funflow and funflow2 is syntactic. Since funflow2 still models flows using arrows, Arrow syntax can still be used to define and compose flows.

Caching

Like its predecessor, the funflow2 project uses the cas-store package which provides caching functionality using content-addressable storage.

Compatibility

While funflow2’s API differs significantly from funflow, it provides a module with aliases for some of the core functions from funflow such as step and stepIO to help simplify migration. Many of the core example from funflow have also been ported to funflow2 such as the C compilation tutorial or custom make example discussed in an earlier blog post.

Next steps

Interested in trying out some hands-on examples? We’ve prepared a set of tutorials on the funflow2 website. Alternatively, each tutorial notebook can be run locally using the provided Nix shell. You may also initialize a funflow2 project using our cookiecutter template.

funflow2 is still in its early stages, and so far most of the development on it has focused on building the Flow API, along with a few tasks and interpreters. One area in which other data pipeline frameworks excel is in providing a high level of interoperability through a wide range of predefined tasks for common operations, including running database queries and uploading data to cloud storage. We would welcome external contributions to provide these and other common task types, so please open an issue or pull request if this interests you.

Thanks for reading, and stay tuned for future updates on funflow2!


  1. Refer to the paper “Build Systems a la Carte” for much more discussion of minimality, early cutoff, and other properties of build systems and pipelines.

About the authors

Dorran Howell

Guillaume Desforges

Guillaume is a versatile engineer based in Paris, with fluency in machine learning, data engineering, web development and functional programming.

Vince Reuter

Vince works on software development projects, some internal and some for clients. Initially self-taught in basic Java programming, Vince was led by course of study and general curiosity to programming in other languages. Eventually he was hooked on Scala and interested by Haskell, which of course naturally led him to Tweag. At Tweag Vince enjoys interacting with colleagues who come from broad range of places of origin and current residence, and the array of perspectives that follow naturally from that.

If you enjoyed this article, you might be interested in joining the Tweag team.

This article is licensed under a Creative Commons Attribution 4.0 International license.

Company

AboutOpen SourceCareersContact Us

Connect with us

© 2024 Modus Create, LLC

Privacy PolicySitemap