Performing Bulk Updates with Ecto And Elixir

Updating one database row in Ecto is straightforward. But figuring out how to bulk update rows using Ecto without updating one record at a time?

Now that is a pain.

What if there were a way to bulk update rows of data in your database using only ONE update call?

Well, there is, and in this post, I’ll show you how you can do it using Ecto without having to write a single line of SQL.

bulk update ecto

Ecto update/2 And Changesets

You can, of course, simply create changesets or structs for each row of data, group them together in something like a list, and then pass them all one-by-one using Enum.each/2 to Ecto.Repo.update/2.

defmodule Example.User do
  use Ecto.Schema
  import Ecto.Changeset

  def changeset(user, params) do
    cast(user, params, [:name, :deactivated])
  end

  schema "users" do
    field :name, :string
    field :deactivated, :boolean
    field :deactivated_on, :date

    timestamps()
  end
end

iex(1)> Enum.each(users, fn(user) ->
...(1)> changeset = Example.User.changeset(user, %{deactivated: true})
...(1)> Example.Repo.update(changeset)
...(1)> end)

...[debug] QUERY OK db=33.5ms
UPDATE "users" SET "deactivated" = $1, \
"updated_at" = $2 WHERE "id" = $3 [true, \
{{2018, 7, 10}, {1, 20, 19, 922659}}, 10]

...[debug] QUERY OK db=4.9ms
UPDATE "users" SET "deactivated" = $1, \
"updated_at" = $2 WHERE "id" = $3 [true, \
{{2018, 7, 10}, {1, 20, 19, 957723}}, 11]

...[debug] QUERY OK db=1.8ms
UPDATE "users" SET "deactivated" = $1, \
"updated_at" = $2 WHERE "id" = $3 [true, \
{{2018, 7, 10}, {1, 20, 19, 963006}}, 12]

...[debug] QUERY OK db=3.8ms
UPDATE "users" SET "deactivated" = $1, \
"updated_at" = $2 WHERE "id" = $3 [true, \
{{2018, 7, 10}, {1, 20, 19, 965138}}, 13]

...[debug] QUERY OK db=8.6ms queue=0.1ms
UPDATE "users" SET "deactivated" = $1, \
"updated_at" = $2 WHERE "id" = $3 [true, \
{{2018, 7, 10}, {1, 20, 19, 969666}}, 14]
:ok

Ecto and update_all

Obviously, performing individual updates can be time-prohibitive if you are performing a lot of updates. Ecto knows this, which is why it also provides the Ecto.Repo.update_all/3 function.

update_all/3 will actually do a one SQL UPDATE call on each of the rows and columns that you provide to it.

Here’s the catch:

The update_all/3 function works a bit differently than update/2.

“update_all” requires you to pass in a “queryable”. That is, something that implements the Queryable protocol.

Update_all and Schemas

Adding use Ecto.Schema and the schema/2 function to a module, automatically converts the current module into a queryable. Like the User example we saw above:

defmodule Example.User do
  use Ecto.Schema 
  import Ecto.Changeset

  def changeset(user, params) do
    cast(user, params, [:name, :deactivated])
  end

  # Queryable
  schema "users" do
    field :name, :string
    field :deactivated, :boolean
    field :deactivated_on, :date

    timestamps()
  end
end

Because Example.User implements uses the schema/2 macro, we can now pass it to update_all/3 as the first argument.

Ecto.Repo.update_all(Example.User,
  set: [deactivated_on: Date.utc_today()]
)

...[debug] QUERY OK source="users" db=21.0ms queue=0.1ms
UPDATE "users" AS u0 SET "deactivated_on" = $1 [{2018, 7, 10}]
{10, nil}

As you can see, Ecto does the update as one single UPDATE call. However, this example updates every single one of our Users.

What if we want to bulk update a sub-section of our Users?

Ecto and Queries

The other option to use with update_all/3 is to build a query directly using Ecto.Query and the from/2 function. Elixir calls these keyword-based queries.

import Ecto.Query

query = from(u in User, where: u.deactivated == false)

Ecto.Repo.update_all(query,
  set: [
    deactivated: true,
    deactivated_on: Date.utc_today()
  ]
)

However, there are a couple of issues with this approach.

First, update_all does not allow all options in queries the way that other query functions do, such as, Repo.all/1.

For example, you cannot use joins in your query or other modifiers like order_by. Ecto will throw an error if you try.

Second, there’s the issue of delay between queries.

Let’s take our example above. Let’s say you wanted to query a bunch of Users in your database, take some action on them (like send an email), and then update them in the database. Now, let’s say this could be 100s or 1000s of users.

You can build a query to grab the users from the database as structs which you can then use to do whatever-you-need-to-do. You could then use that same query you built before and pass that into the update_all/2 function (provided that query doesn’t have any exceptions noted above).

Here’s an example:

import Ecto.Query

query = from u in User, where: u.deactivated == false
users = Ecto.Repo.all(query)

do_something(users)

Ecto.Repo.update_all(query, 
  set: [
    deactivated: true, 
    deactivated_on: Date.utc_today()
  ]
)

The problem with this approach is, in the time it took us to query the database, take some actions, and update the database, what if another record was inserted which qualifies for our query? It will now get updated with the update_all/2 function without any actions being taken on it.

Use IDs for updating

If you wanted to use a complicated query for your updates, you can try this approach. Build a complicated query and retrieve the records you want using Ecto.

import Ecto.Query

query =
  from(u in User,
    join:
      p(
        from(Post,
          where: p.author_id = u.id,
          where: u.deactivated == false
        )
      )
   )

users = Ecto.Repo.all(query)

do_something(users)

Now, instead of passing the same query we used before (and we can’t since it contains a join clause), we create a list of the user ids that we retrieved.

import Ecto.Query

query =
  from(u in User,
    join:
      p(
        from(Post,
          where: p.author_id = u.id,
          where: u.deactivated == false,
          select: u.id
        )
      )
  )

users = Ecto.Repo.all(query)

do_something(users)

users_id = Enum.map(users, &(&1.id))

new_query = from u in User, where: id in ^users_id

Ecto.Repo.update_all(new_query, 
  set: [
    deactivated: true, 
    deactivated_on: Date.utc_today()
  ]
)

So, instead of update_all updating every user, it will only update the user with the ids that we passed into the query.

Problem with update_all

As Elixir points out in the documentation, using update_all/2 with a queryable means that certain autogenerated columns, such as inserted_at and updated_at, will not be updated when using update_all/2 as they would be if you used update.

There is one more way that you can do a bulk update using Ecto.

Upserts with insert_all

The insert_all/3 callback of Ecto has an interesting option called on_conflict. If you specify the on_conflict AND you provide a list of structs with ids of database rows that already exist in your database, a single UPDATE call will be made passing the ids provided your list.

Let’s take our use example before. We do the same thing as before: build a complicated query, pass the structs to a function, and then map over the structs taking their ids.

Finally, we pass that list into insert_all/3 with the on_conflict option. We also pass in [set: [deactivated: true, deactived_on: Date.utc_today()]].

import Ecto.Query

query =
  from(u in User,
    join:
      p(
        from(Post,
          where: p.author_id = u.id,
          where: u.deactivated == false,
          select: u.id
        )
      )
  )

users = Ecto.Repo.all(query)

do_something(users)

new_users = Enum.map(users, &(&1.id))

Example.Repo.insert_all(
  Example.User,
  new_users,
  on_conflict: [
    set: [
      deactivated: true,
      deactivated_on: DateTime.utc_today()
    ]
  ],
  conflict_target: [:id]
)

The database will see the “on conflict” option and perform an UPDATE call with the set option we passed in. It will update the database rows in one call instead of one for each row.

The Problem with insert_all/3

Unfortunately, insert_all/3 is not without its flaws either. It also will not update any autogenerated columns, such as timestamps. Not only that, but if the row has a column that is not provided by your structs, in our case the list ids, then insert_all will replace those values with NULL in those columns.

A solution to this problem would be to provide these columns with values in your struct before you pass them to insert_all.

now = DateTime.utc_now()

new_users =
  users
  |> Enum.map(& &1.id)
  |> Enum.map(fn id -> struct(id, %{inserted_at: now, updated_at: now}) end)

Example.Repo.insert_all(
  Example.User,
  new_users,
  on_conflict: [
    set: [
      deactivated: true,
      deactivated_on: DateTime.utc_today()
    ]
  ],
  conflict_target: [:id]
)

As stated in the documentation, insert_all/3 can return some cryptic values depending on the database/persistence layer you choose to use. Some will return the amount of rows updated (postgresql), others will return the amount of rows attempted to be updated or inserted (mysql), etc.

Hope this information was helpful. If you have any questions or have another tip, feel free to leave a comment below.

This post is part of my series of Elixir Tutorials


Freelance Elixir & Ruby on Rails Developer Hey, I’m Adam. I’m guessing you just read this post from somewhere on the interwebs. Hope you enjoyed it.

You can also follow me on the Twitters at: @DeLongShot

4 thoughts on “Performing Bulk Updates with Ecto And Elixir

Comments are closed.