Understanding Pipelines

Pipelines provide a user friendly way to take in input data, transform it and return a cleaned output. When the input changes, the data will flow through the pipeline and update the output.

Theoretical Example:

Let’s say we want to make a model that determines the effects of Elon Musk’s tweets on the price of Bitcoin. Further, we want the model to always be up to date with data.

We found a dataset online that has a list of all of Elon Musk’s tweets and when he tweeted. MiPasa has the Bitcoin Price Index.

We can then build a pipeline that takes these two datasets as inputs. Then, we can use the Pipeline UI to combine these datasets together and output a combined version of the data. The output can be saved to an Amazon S3 and/or MiPasa. The benefit of saving them to MiPasa is that we can then seamlessly write a notebook with the data.

Whenever one of the inputs is updated (say Elon musk writes a new tweet), the data will flow through the pipeline again and update the S3 and MiPasa dataset. This way, we will always have up to date, cleaned data. When we then write a notebook with the data, we will be working with the most up to date data!

Real Example:

Here is a link the the pipeline in case you want to follow along: https://app.mipasa.com/pipelines/fe670684-d2fd-4159-9778-186e2c951675

Let’s say we want to analyze a few popular Cryptos: Bitcoin, Ethereum and Dogecoin. Since these assets are volatile, it is important that our data stays up to date, and it will be helpful for us to have a CSV that contains the daily closing price of each of the coins.

Here download links to the data (the content of the links updates daily):

Import Step

Let’s begin by adding Bitcoin as an input. We will, therefore, use the HTTP step.

Click Add data source

Then: HTTP

Copy and Paste the Bitcoin link in.

Click Save changes, Run pipeline and then the Preview button that appears under the step. You should see:

Select Step

We want to compare the coins’ closing data so lets select just the Date and Close columns

Exit the preview. Click add step and then the Select step.

In the select step, we need to list the columns that we are selecting. Therefore, we should write:

Date,Close

When you save and run the pipeline and then click preview on the select step, you should see only the two columns selected:

Since we will have multiple closing columns, let’s rename the close column to the btc_close

Add a Rename Column step. Set old_column_name to Close and new_column_name btc_close.

Notice if you save and run and then preview, the column name has changed!

Merge Step

Let’s repeat this process for ETH and DOGE.

Whenever we merge/join steps, it is easiest to give the inputs custom headers.

Click here:

Title the inputs to Cleaned BTC, Cleaned ETH and Cleaned DOGE

Our next step is to join these three inputs to create a single input. This join should take place along the Date column. We are only able to binary join two inputs at a time so lets first join BTC and ETH.

Click Join steps:

And then select the two inputs that you intend to join.

Click Join Steps and select Binary join

We want both the left_id and right_id to be Date. Let’s also title the step: BTC&ETH

If you save, run and preview the new step, you should see:

Let’s now combine this step with the DOGE.

Click Join Step”, click Cleaned DOGE and BTC&ETH and then click Binary join

We want both the left_id and right_id to be Date.

If you save, run and preview the new step, you should see:

Custom Code Step

For sake of consistency with the rest of our MiPasa Data, let’s change the rename the Date column from Date to date.

Our dates currently have the format: 05/03/2021. It is easier to work with datetime objects so lets convert the dates.

[file] = files
file = file |> Enum.map(fn row ->
    [month, day, year] = String.split(row["date"], "/")
    date = year <> "-" <> month <> "-" <> day <> " 00:00:00"
    date_time = 
        date 
        |> NaiveDateTime.from_iso8601!() 
        |> NaiveDateTime.to_date()
    Map.put(row, "date", date_time)
 end)
[file]

Click Apply changes. If Apply changes it not green, add a space after

[file] 

on the last line.

If you save, run and preview the new step, you should see:

What if we want to also know the daily price differences in the given coins. Let’s add to the code.

[file] = files

file = file |> Enum.map(fn row ->
    [month, day, year] = String.split(row["date"], "/")
    date = year <> "-" <> month <> "-" <> day <> " 00:00:00"
    date_time = 
        date 
        |> NaiveDateTime.from_iso8601!() 
        |> NaiveDateTime.to_date()
    Map.put(row, "date", date_time)
 end)
 
 file = file |> Enum.reduce([], fn row, acc ->
    today = row["date"]
    tomorrow = Date.add(today, 1)
    tomorrow_row = Enum.find(file, fn t_row -> t_row["date"] == tomorrow end)
    if tomorrow_row do
    
        {btc, _} =
            row["btc_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {eth, _} =
            row["eth_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {doge, _} =
            row["doge_close"]
            |>String.replace(",", "")
            |>Float.parse()
        
        {t_btc, _} =
            tomorrow_row["btc_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {t_eth, _} =
            tomorrow_row["eth_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {t_doge, _} =
            tomorrow_row["doge_close"]
            |>String.replace(",", "")
            |>Float.parse()
        
        tomorrow_row = 
            tomorrow_row
            |> Map.put("btc_diff", t_btc - btc)
            |> Map.put("eth_diff", t_eth - eth)
            |> Map.put("doge_diff", t_doge - doge)

        
        [tomorrow_row | acc]
    else
        acc
    end
 end) 
[file] 

The new preview should show:

We can even calculate the percentage difference between days by adding

|> Map.put("btc_percent_diff", (t_btc - btc)/btc)
|> Map.put("eth_percent_diff", (t_eth - eth)/eth)
|> Map.put("doge_percent_diff", (t_doge - doge)/doge)

Such that the code becomes

[file] = files

file = file |> Enum.map(fn row ->
    [month, day, year] = String.split(row["date"], "/")
    date = year <> "-" <> month <> "-" <> day <> " 00:00:00"
    date_time = 
        date 
        |> NaiveDateTime.from_iso8601!() 
        |> NaiveDateTime.to_date()
    Map.put(row, "date", date_time)
 end)
 
 file = file |> Enum.reduce([], fn row, acc ->
    today = row["date"]
    tomorrow = Date.add(today, 1)
    tomorrow_row = Enum.find(file, fn t_row -> t_row["date"] == tomorrow end)
    if tomorrow_row do
    
        {btc, _} =
            row["btc_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {eth, _} =
            row["eth_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {doge, _} =
            row["doge_close"]
            |>String.replace(",", "")
            |>Float.parse()
        
        {t_btc, _} =
            tomorrow_row["btc_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {t_eth, _} =
            tomorrow_row["eth_close"]
            |>String.replace(",", "")
            |>Float.parse()
        {t_doge, _} =
            tomorrow_row["doge_close"]
            |>String.replace(",", "")
            |>Float.parse()
        
        tomorrow_row = 
            tomorrow_row
            |> Map.put("btc_diff", t_btc - btc)
            |> Map.put("eth_diff", t_eth - eth)
            |> Map.put("doge_diff", t_doge - doge)
|> Map.put("btc_percent_diff", (t_btc - btc)/btc)
|> Map.put("eth_percent_diff", (t_eth - eth)/eth)
|> Map.put("doge_percent_diff", (t_doge - doge)/doge)

        
        [tomorrow_row | acc]
    else
        acc
    end
 end) 
[file]

Here is the new preview:

Output Step

Let’s output this file to MiPasa.

Create a Dataset called Crypto in a new tab.

Now add a Dataset Output:

Select the Dataset Cryptos and title the file BTC_ETC_and_DOGE.csv

When the pipelines run, this will be added to the Dataset. You can then make models using the file in notebooks!

Here is what the file will look like in Datasets:

Here is a more complicated example using the same technique for those interested: https://app.mipasa.com/pipelines/81a970cd-2261-4f1b-99e4-811e28c1abb.