Advanced Techniques for Architecting Flow in Elixir
In the last post we explored how we can use either |>
or with
to model how data flows through our program.
There is, however, a third concept to model flow in your applications: to hand down a “token” during the execution of your program. This token contains all the information necessary for your program to fulfil its use-case.
In Elixir, this token is usually a struct. Let’s look at two examples.
Plug.Conn
The most famous example for this in the Elixir ecosystem can be found in Plug.
A “plug” is basically a function that takes a Plug.Conn
struct as a first argument and returns a (modified) Plug.Conn
struct.
Each web request is processed by a Plug pipeline, a series of plugs that get invoked one after another.
The Plug.Conn
struct contains all information received in the web request and all information to be sent in the server’s response.
defmodule MyPlugPipeline do
use Plug.Builder
# You can plug modules, which implement the Plug behaviour
plug Plug.Logger
# You can plug local functions, which implement the Plug behaviour
plug :hello, my_param: 42
def hello(conn, opts) do
if opts[:my_param] == 42 do
send_resp(conn, 200, "The answer to all questions!")
else
send_resp(conn, 200, "Options are optional!")
end
end
# You can even plug functions from other modules,
# as long as they are imported into the current module
import SomeOtherModule, only: [my_other_plug: 2]
plug :my_other_plug
end
In each Plug, we can modify the Plug.Conn
struct, e.g. set the reponse’s content, add additional response HTTP headers or halt the connection, which causes all the remaining plugs in the pipeline to be skipped.
def hello(conn, opts) do
case prepare_response() do
{timing_in_ms, body} ->
conn
|> put_resp_content_type("text/plain")
|> put_resp_header("Server-Timing", "total;dur=#{timing_in_ms}")
|> send_resp(200, body)
_ ->
halt(conn)
end
end
Another example for a “token”, which is handed down in a business process, are changesets in Ecto.
Ecto.Changeset
Ecto.Changesets
are structs used to apply filters, validations and other constraints during the manipulation of structs.
import Ecto.Changeset
user = %User{}
user
|> cast(params, [:name, :email, :age])
|> validate_required([:name, :email])
|> validate_format(:email, ~r/@/)
|> validate_inclusion(:age, 18..100)
|> unique_constraint(:email)
Just like Plug.Conn
before, the Ecto.Changeset
struct in this example flows through a pipeline of transformations and provides a binding interface for all functions involved in its use-case, i.e. filtering, casting, validating and constraining the manipulation of data.
Contrary to Plug.Conn
, the scope of a changeset is not necessarily tied to any kind of request life cycle.
Let’s build our own!
Let’s use these insights to adapt this concept to an application of our own. We will stick with our example of converting images with a Mix task:
First, we introduce a struct to help us with handling given command-line arguments (green tasks in the image above).
We will call this struct Options
:
defmodule Converter.Options do
defstruct argv: nil,
glob: nil,
target_dir: nil,
format: nil
end
We will use this to convert the given command-line arguments into a structured form and validate them.
Later, we will prepare the conversion process using Options
.
defmodule Mix.Tasks.ConvertImages do
use Mix.Task
alias Converter.Options
@default_glob "./image_uploads/*"
@default_target_dir "./tmp"
@default_format "jpg"
def run(argv) do
validation =
%Options{argv: argv}
|> parse_options()
|> validate_options()
case validation do
{:ok, options} ->
filenames = prepare_conversion(options)
results = convert_images(filenames, options.target_dir, options.format)
report_results(options.target_dir, results)
{:error, error} ->
report_error(error)
end
end
# Each stage of the conversion process takes the `Options` as argument ...
defp parse_options(%Options{argv: argv} = options) do
{opts, args, _invalid} =
OptionParser.parse(argv, switches: [target_dir: :string, format: :string])
glob = List.first(args) || @default_glob
target_dir = opts[:target_dir] || @default_target_dir
format = opts[:format] || @default_format
%Options{options | glob: glob, target_dir: target_dir, format: format}
end
# ... we pattern match on the fields that are relevant to the current step!
defp validate_options(%Options{glob: glob, format: format} = options) do
filenames = Path.wildcard(glob)
cond do
Enum.empty?(filenames) ->
{:error, "No images found."}
!Enum.member?(~w[jpg png], format) ->
{:error, "Unrecognized format: #{format}"}
true ->
{:ok, options}
end
end
defp prepare_conversion(%Options{glob: glob, target_dir: target_dir}) do
File.mkdir_p!(target_dir)
Path.wildcard(glob)
end
defp convert_images(filenames, target_dir, format) do
results =
Enum.map(filenames, fn filename ->
Converter.convert_image(filename, target_dir, format)
end)
results
end
defp report_results(target_dir, results) do
IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
end
defp report_error(error) do
IO.puts("[error] #{error}")
end
end
Our new Options
struct takes a role similar to the one Ecto.Changeset
plays: It helps us to fulfil a specific task (parsing and validating options for the conversion process).
To achieve this, our run/1
function had to be restructured:
validation =
%Options{argv: argv}
|> parse_options()
|> validate_options()
case validation do
{:ok, options} ->
filenames = prepare_conversion(options)
results = convert_images(filenames, options.target_dir, options.format)
report_results(options.target_dir, results)
{:error, error} ->
report_error(error)
end
While that does not look overly complex, the same function from our first article looked like this:
argv
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()
Let’s try to gain back some of that clarity …
One Token to Rule Them All!
We can gain back clarity by using a single token for the fulfilment of our use-case from start to finish.
This is what Plug does with Plug.Conn
: each request and its response are represented as a single token, which accompanies the whole business process of answering a web request, from getting the original request all the way to sending out the response.
What would this look like for our example?
We are getting a “request” to convert images in a given directory to a given format and answer this “request” by returning the converted images and printing their names on the terminal (or presenting an error message if the “request” was malformed).
We will call our new struct Token
and let it flow through our program (green tasks):
defmodule Converter.Token do
defstruct argv: nil,
glob: nil,
target_dir: nil,
format: nil,
errors: nil,
halted: nil,
results: nil
end
Now we can pass a Token
in at the “top of the pipe” in our run/1
function.
defmodule Mix.Tasks.ConvertImages do
use Mix.Task
alias Converter.Token
@default_glob "./image_uploads/*"
@default_target_dir "./tmp"
@default_format "jpg"
def run(argv) do
%Token{argv: argv}
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()
end
# Each stage of the conversion process takes the `Token` as argument ...
defp parse_options(%Token{argv: argv} = token) do
{opts, args, _invalid} =
OptionParser.parse(argv, switches: [target_dir: :string, format: :string])
glob = List.first(args) || @default_glob
target_dir = opts[:target_dir] || @default_target_dir
format = opts[:format] || @default_format
%Token{token | glob: glob, target_dir: target_dir, format: format}
end
# ... we pattern match on the fields that are relevant to the current step ...
defp validate_options(%Token{filenames: filenames, format: format} = token) do
errors = [
if(Enum.empty?(filenames), do: "No images found."),
if(!Enum.member?(~w[jpg png], format), do: "Unrecognized format: #{format}")
]
%Token{token | errors: errors, halted: Enum.any?(errors)}
end
# ... we skip steps by matching on `halted: true` ...
defp prepare_conversion(%Token{halted: true} = token), do: token
# ... we put in new information gathered at the current stage ...
defp prepare_conversion(%Token{target_dir: target_dir} = token) do
File.mkdir_p!(target_dir)
filenames = Path.wildcard(glob)
%Token{token | filenames: filenames}
end
# ... we can skip steps by matching on `halted: true` ...
defp convert_images(%Token{halted: true} = token), do: token
# ... also, we don't have to pattern match on the `Token` necessarily ...
defp convert_images(token) do
results =
Enum.map(token.filenames, fn filename ->
Converter.convert_image(filename, token.target_dir, token.format)
end)
%Token{token | results: results}
end
# ... and at the end we can report errors by matching on `halted: true` ...
defp report_results(%Token{halted: true, errors: errors} = token) do
Enum.each(errors, fn error ->
IO.puts("- #{error}")
end)
token
end
# ... or report the results of the success program execution.
defp report_results(token) do
IO.puts("Wrote #{Enum.count(token.results)} images to #{token.target_dir}.")
token
end
end
Our new Token
interacts very much like Plug.Conn
does: It is handed down from function to function during the execution of our business process.
Let’s summarize the properties of this approach:
- Each step of the program’s execution is a function, which takes the
Token
struct as a first argument and we pattern match on the fields that are relevant to the current step. - We can easily skip steps by matching on
halted: true
. - We can put in new information gathered at the current stage by modifying the
Token
before returning it. - At the end, we can report errors by matching on
halted: true
or report the results of the successful program execution.
Comparison of the three approaches
Let’s look at the properties of each approach once again:
# Approach 1: using `|>`
def run(argv) do
argv
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()
end
The |>
approach forces the code to adopt Elixir’s idiomatic style of putting the to-be-transformed data as the first argument in any function. This provides a kind of natural “interface” or “contract”.
# Approach 2: using `with`
def run(argv) do
with {glob, target_dir, format} <- parse_options(argv),
:ok <- validate_options(glob, format),
filenames <- prepare_conversion(glob, target_dir),
results <- convert_images(filenames, target_dir, format) do
report_results(results, target_dir)
end
end
The with
approach shines when collaborating in a fast-moving environment: you might not want to be that dependent on another programmer’s return values early on and with
provides more flexibility in dealing with the called function’s result.
# Approach 3: using a `Token`
def run(argv) do
%Token{argv: argv}
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()
end
Finally, the Token
approach can combine the benefits of both worlds, if applied in the right place:
If you have a well defined, narrow use-case, where you want to provide a unified interface and data structure (like Ecto.Changeset
), you might find this approach benefical.
It is also a great idea for the most top-level flow of your program, where your use-case is the very purpose of your program and you might want to establish an explicit data contract between different interacting parts of your app (like Plug.Conn
does for web apps).
In these situations, we can can combine the benefits of the first two approaches, since it provides a binding contract between all parts of your app, but also allows teams to work independently.
We can also do “flowy things” like skipping steps by matching on the Token
:
# skip this step since execution is halted
defp prepare_conversion(%Token{halted: true} = token) do
token
end
# fulfil this step since execution is not halted
defp prepare_conversion(%Token{target_dir: target_dir} = token) do
File.mkdir_p!(target_dir)
token
end
We will take a more detailed look at these “flowy things” as well as the Pros and Cons of the Token
approach in future articles.