Creating Data Pipelines With Elixir

Semaphore
8 min readMar 14, 2023

Data pipelines are a series of processes that move data from one stage of processing to the next. These processes can include collecting, cleaning, transforming, and storing or analyzing data.

The goal of a data pipeline is to efficiently and effectively move data through these stages to make it available for further analysis or use.

As developers, being aware of how to use and implement data pipelines can be invaluable in today’s ecosystem; for example:

  • Data pipelines are an effective way to manage and process large amounts of data.
  • Data pipelines are essential for building data-driven applications. Many applications need to implement things like personalization, analytics, etc.
  • Data pipelines are a key component in distributed systems and are becoming increasingly common as the industry adopts this architecture more frequently.

When implementing data pipelines, there are many options available to developers. This article doesn’t aim to be a full guide of all the options, rather it focuses on understanding how Elixir and its ecosystem can be used to implement data pipelines directly without relying on complex systems like Kafka.

Why Elixir?

Elixir is a good language for implementing data pipelines because it is designed for concurrent and parallel processing. This means that it can handle multiple tasks simultaneously, making it well-suited for handling large amounts of data.

Additionally, Elixir has built-in support for functional programming, which allows for clear and expressive code that is easy to reason about and test. The language also has a robust ecosystem of libraries and frameworks that can be leveraged for data processing and pipeline management tasks.

Furthermore, Elixir runs on the Erlang VM, which is known for its reliability and fault-tolerance, making it ideal for data pipelines that require high availability and performance. Additionally, Elixir’s message-passing concurrency model allows for easy distribution and scaling of data pipelines, enabling them to handle large amounts of data and traffic.

In this tutorial, we will show you how to use the Flow library in Elixir to build a data pipeline. We will cover the following topics:

  • Setting up the environment and installing the necessary dependencies
  • Creating a new Elixir project and configuring it for data processing
  • Defining the stages of the pipeline using the Flow library
  • Building and running the pipeline
  • Testing and debugging the pipeline

Setting up a data pipeline

Before we begin, it is assumed that you have a basic understanding of Elixir and functional programming concepts. If you are new to Elixir, it is recommended that you familiarize yourself with the basics before proceeding.

The complete code for this tutorial can be found in the GitHub repository.

Setting up the environment

To use the Flow library, you will need to have Elixir and Erlang installed on your machine. You can download the latest versions of these languages from the official Elixir website.

Once you have Elixir and Erlang installed, you can create a new Elixir project by running the following command in your terminal:

$ mix new catalog_pipeline

This will create a new directory called “catalog_pipeline” that contains the basic structure of an Elixir project.

Next, you will need to add the Flow library; and a few extra libraries to handle web requests as dependencies to your project. You can do this by ensuring the deps function of your mix.exs file looks like:

defp deps do
[
{:flow, "~> 1.0"},
{:httpoison, "~> 1.7"},
{:poison, "~> 4.0"},
]
end

Then, run the following command in your terminal to install the dependency:

$ mix deps.get

Defining the stages of the pipeline

Now that we have the Flow library installed, we can start defining the stages of our pipeline. The Flow library provides a set of pre-built stages for common data processing tasks such as filtering, mapping, and reducing. These stages can be used out of the box, or developers can create their own custom stages for more specific tasks.

For the purposes of this tutorial, we will use the Flow library to build a data pipeline that extracts product data from dummyjson.com, transforms it into a tuple with description, title, and price, validates certain fields and finally prepare the data to be loaded into a database or a csv. The pipeline will consist of several stages, as detailed in the following sections

Data Extraction

The first step in building a data pipeline using Elixir and the Flow library is to extract the data that we want to process. This can involve a variety of tasks, such as fetching data from a database, web service, or file system.

One way to extract data using Flow is to use the Flow.from_enumerable/2 function, which allows you to create a flow from any Enumerable data structure, such as a list or map. For example, you could extract data from a database table by querying it and then passing the results to Flow.from_enumerable/2.

Let’s define the first stage and load product data from dummyjson.com by overwriting your lib/catalog_pipeline.ex file to look like:

defmodule CatalogPipeline do
use Flow
  def get_products do
product = extract_product_data('https://dummyjson.com/products')
end
defp extract_product_data(url) do
extracted_data =
HTTPoison.get!(url)
|> Map.get(:body)
|> Poison.decode!()
|> Map.get("products")
|> Flow.from_enumerable()
end
end

The first step is to define a module that will contain the pipeline; we will call it CatalogPipeline. Next, we will define a function called get_products that will return the product data. This function will call the extract_product_datafunction, which will use the Flow library to extract data from the dummyjson.com API.

The extract_product_data function will use the HTTPoison library to make a GET request to the dummyjson.com API and then use the Poison library to decode the JSON response. The decoded data will be passed to the Flow.from_enumerable/2 function, which will create a flow from the data.

You can already see the products within the Flow enumerable object by running $ mix compile and then $ iex -S mix. Within the interactive prompt, issueiex(1)> CatalogPipeline.get_products. Remember to Ctrl-C and (a)bort to get back to the shell.

Data Transformation

The next step in building a data pipeline is to transform the data that we’ve extracted. This can involve a variety of tasks, such as cleaning, filtering, or aggregating the data.

One way to transform data using Flow is to use the Flow.map/2 function, which allows you to apply a function to each item in a flow and return a new flow with the results. For example, you could clean up the data by removing any fields that are not needed.

To continue our example, we will define a new function called transform_product_data that will transform the data into a tuple with a description, title, and price:

defmodule CatalogPipeline do
use Flow
  def get_products do
product = extract_product_data('https://dummyjson.com/products')
|> transform_product_data()
end
defp extract_product_data(url) do
extracted_data =
HTTPoison.get!(url)
|> Map.get(:body)
|> Poison.decode!()
|> Map.get("products")
|> Flow.from_enumerable()
end
defp transform_product_data(extracted_data) do
transformed_data = extracted_data
|> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
end
end

This stage of the pipeline uses the following steps:

  • Flow.map to extract the title, description and price of each product and return them as a tuple.

Data Quality Assurance

Finally, it is important to check the quality of the data before it is loaded into the target system. In Elixir, we can use Flow.filter to filter out any invalid or incomplete data, or even remove any rows that do not have a valid price and description.

defmodule CatalogPipeline do
use Flow
  def get_products do
product = extract_product_data('https://dummyjson.com/products')
|> transform_product_data()
|> validate_product_data()
end
defp extract_product_data(url) do
extracted_data =
HTTPoison.get!(url)
|> Map.get(:body)
|> Poison.decode!()
|> Map.get("products")
|> Flow.from_enumerable()
end
defp transform_product_data(extracted_data) do
transformed_data = extracted_data
|> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
end
defp validate_product_data(transformed_data) do
validated_data = transformed_data
|> Flow.filter(fn {description, title, price} -> description != "" and price != "" end)
end
end

Adding this stage for filtering the transformed data where the price is not available ensures that the final data is clean and usable, reducing the risk of errors downstream.

Data Loading

The final step in our pipeline is to load the transformed data into a target system. This can involve a variety of tasks, such as inserting the data into a database, writing it to a file, or publishing it to a message queue.

For this last part of the example, we will parse the list of products into a list that we can either insert into a database or write to a file.

defmodule CatalogPipeline do
use Flow
  def get_products do
product = extract_product_data('https://dummyjson.com/products')
|> transform_product_data()
|> validate_product_data()
|> load_product_data()
|> IO.inspect()
end
defp extract_product_data(url) do
extracted_data =
HTTPoison.get!(url)
|> Map.get(:body)
|> Poison.decode!()
|> Map.get("products")
|> Flow.from_enumerable()
end
defp transform_product_data(extracted_data) do
transformed_data = extracted_data
|> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
end
defp validate_product_data(transformed_data) do
validated_data =
transformed_data
|> Flow.filter(fn {description, title, price} -> description != "" and price != "" end)
end
defp load_product_data(validated_data) do
product_list = validated_data
|> Flow.map(fn {description, title, price} -> %{title: title, price: price, description: description} end)
|> Enum.to_list()
end
end

With this last stage, we are loading the data into a list, which depending on the application, can be used to insert the data into a database or write it to a file.

If everything goes well, you should see a list of products printed on the console.

[
%{
description: "An apple mobile which is nothing like apple",
price: 549,
title: "iPhone 9"
},
%{
description: "SIM-Free, Model A19211 6.5-inch Super Retina HD display with OLED technology A12 Bionic chip with ...",
price: 899,
title: "iPhone X"
},
%{
description: "Samsung's new variant which goes beyond Galaxy to the Universe",
price: 1249,
title: "Samsung Universe 9"
},
%{
description: "OPPO F19 is officially announced on April 2021.",
price: 280,
title: "OPPOF19"
},
%{
description: "Huawei’s re-badged P30 Pro New Edition was officially unveiled yesterday in Germany and now the device has made its way to the UK.",
price: 499,
title: "Huawei P30"
},
...
]

Conclusion

This article taught you how to build a data pipeline using Elixir and Flow. You have seen how to extract data from an API, transform it, validate it, and load it into a target system. We have also seen how to use Flow to build a pipeline that can be easily extended and reused.

However, we have only scratched the surface of what is possible to do with Elixir and Flow; there are many more features and capabilities than can be covered in this article. If you want to learn more about Elixir and Flow, you can check out the following resources:

Finally, as you continue on your journey to use Elixir and Flow to build data pipelines, it is important to keep in mind that there are some key differences between data pipelines and ETL (Extract, Transform, Load) pipelines:

  • Data Pipelines is an umbrella term, and ETL pipelines are just a subset of data pipelines.
  • An ETL pipeline will always involve transforming the data, while a data pipeline may not.
  • Data pipelines run continuously, while ETL pipelines run only once or in batches.

Originally published at https://semaphoreci.com on March 14, 2023.

--

--

Semaphore

Supporting developers with insights and tutorials on delivering good software. · https://semaphoreci.com