利用 Mnesia 设计业务计数器

前言


有关「计数器」的概念我在本博客的功能计划上有过介绍,主要是对数据的数量统计进行缓存和同步持久化。之所以要独立计数是因为要面对不同的「可见性」问题,例如前/后台。缓存自然是为了避免大量重复的即时查询。

通常来讲,这类计数会使用 Redis 之类的独立 KV 数据库,因为它们有很高的性能并且多节点共享。但本博客毕竟是使用 Elixir 实现的,运行于 Erlang 虚拟机之上。物尽其用,为何不选择 Mnesia 呢?

Mnesia


Mnesia 是 Erlang 自带的嵌入式数据库管理系统,支持分布式/多节点。Mnesia 的后端可以是基于内存的 ETS 或基于磁盘的 DETS。嗯…… ETS 和 DETS 也是 Erlang 自带的数据库,只不过后两个类似于 KV 储存并非 DBMS。

Mnesia 之于 ETS/DETS 的关系有些类似于 MySQL 之于 InnoDB(储存引擎)。所以虽然都是 Erlang 自带的储存方案,但它们并不是谁替代谁,而是在互补的同时还相互依赖。

需求


要实现独立的计数缓存,至少要满足以下两点:

  1. 支持分布式/多节点
  2. 具备很高的性能

第一点是因为要让数据集中化储存,而不是分散到每一个节点或每个节点一份副本。不然直接在应用内存中缓存就行了,不需要 Reids 什么的(这是一句废话解释)。

Mnesia 虽然不能算真正意义上的内存数据库,但在 RAM-Only 下也具有出色的性能,所以 Mnesia 基本满足了以上两点。更重要的是 Mnesia 是自带的,能与应用无缝集成,更加强大且不增加部署负担。

设计


我们需要实现两个不同但相关联的层:首先是基于 Mnesia 的键值存储服务器,然后是基于 Ecto 的持久化计数表。

我们示例的计数目标是“阅读数量”。对于「阅读数量」而言,应区分“有效阅读次数”和“全部阅读次数”。虽然阅读次数的累加是集成在访问业务中的,但是不应该影响到业务功能,所以阅读次数的累加还应该是「异步」执行的。类似的,大多数类型的计数更新实际都不应该影响外部业务功能,这是我将它设计成一个 C/S 模型的主要原因。

缓存计数

上文介绍过,计数的缓存是 C/S 模型的,所以需要实现一个 OTP 中最常见的 GenServer 行为作为调用的基础架构。

创建一个 Blog.Counter 模块,实现纯缓存的键值服务器:

defmodule Blog.Counter do
  use GenServer

  alias :mnesia, as: Mnesia

  def start_link(default) when is_list(default) do
    state = default[:state] || %{}
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def init(state) do

    Mnesia.create_schema([node()])
    Mnesia.start()
    Mnesia.create_table(Counter, attributes: [:key, :val])
    Mnesia.wait_for_tables([Counter], 5000)

    {:ok, state}
  end

  def read(key) do
    GenServer.call(__MODULE__, {:read, key})
  end

  def inc(key) do
    GenServer.cast(__MODULE__, {:inc, key})
  end

  def handle_call({:read, key}, _from, state) do
    case Mnesia.dirty_read({Counter, key}) do
      [{Counter, _, val}] -> {:reply, val, state}
      [] -> {:reply, 0, state}
    end
  end

  def handle_cast({:inc, key}, state) do
    Mnesia.dirty_update_counter(Counter, key, 1)
    {:noreply, state}
  end
end

初始化

我们在 init/1 中初始化了 Mnesia,创建一个仅存在 keyval 两个字段的 Counter 表。毕竟我们的计数只需要简单的 KV 储存模型就满足了。因为是本机运行的示例代码,所以传递的节点是 [node()](本机),大家明白 Mnesia 能支持远程节点即可。

核心实现

上述 GenServer 实现的核心在于 read/1inc/1 这两个客户端调用函数。因为我们无需手动输入计数的数值,只是不断的 +1,所以不需要 write/2,提供 inc/1 即可。inc 即 Increment 的简写,意为自增。

注意 Mnesia.dirty_update_counter(Counter, key, 1) 调用,这是一个很重要的函数,通过它我们能支持并发自增而无需理睬事务,这比自己维护计数数值的更新要简单和高效得多。

至于 read/1 函数基本没什么好介绍的了,就是简单的按主键(key)查询,查不到返回 0 而已。但要注意,不同的是 read/1 是同步调用(call),而 inc/1 是异步调用(cast)。在上文中解释过为什么自增是异步执行的。

应用计数

对计数的应用非常简单,并且不会因为后续增加的缓存同步而受到任何调用方式的影响。通常来讲,获取计数的调用发生在 View 层,而自增计数的调用发生在 Controller 中。

显示计数

当我们需要显示计数的时候,直接使用 Blog.Counter 模块发起客户端调用即可。例如在 eex/leex 模板中:

<span>
   <%= Blog.Counter.read("article:#{@article.id}:all_views") %>次阅读
</span>

这里我们以 article:#{id}:all_views 的格式作为键,假设要显示有效阅读次数,可以以 article:#{id}:effective_views 作为键。

自增计数

在访问文章的 Phoenix 控制器函数(或其它类似概念的 Web 路由函数)中,我们调用 inc/1 来自增计数:

def show(conn, %{"id" => id}) do
  with {:ok, article} <- Business.get_article(id) do
    Blog.Counter.inc("article:#{article.id}:all_views") 
    render(conn, "show.json", article: article)
  end
end

因为计数的累加是异步的,并不会成为文章访问的瓶颈,即便自增出现了异常也不会影响用户访问文章。

至于“有效阅读次数”,在自增上并无区别,只是需要在自增前通过有效访问的判断逻辑。例如:15 分钟内来自同一个 IP 的访问不进行自增。

就是如此简单,完善且高效的阅读次数的计数已经可用了。

持久化计数

就像 Redis 之类的方案可以配置自动同步到数据表一样,我们的计数器也应该有这样的能力。所以这一小节需要创建基于 Ecto 的数据操作层。然后在计数器服务器中定时回调同步操作,将缓存写入数据库中。当然,我们在应用刚启动的同时,也需要执行一次将持久化的计数数据读取到缓存中的反向操作。

定义数据层模型

首先我们创建相关 Schema 和迁移:

mix ecto.gen.migration create_counters 

修改所生成 Migration 文件,创建自定义主键的特殊数据表:

defmodule Blog.Repo.Migrations.CreateCounters do
  use Blog.Migration

  def change do
    create table(:counters, primary_key: false) do
      add :key, :string, primary_key: true
      add :val, :integer

      timestamps()
    end
  end
end

如上,我们也仅需要 key 和 val 字段而已,其中 key 是手动维护的自定义主键。根据 Migration 创建对应 Schema:

defmodule Blog.Schemas.Counter do
  use Blog.Schema

  @required_fields ~w(key val)a
  @optional_fields ~w()a

  @primary_key {:key, :string, []}
  schema "counters" do
    field :val, :integer

    timestamps()
  end

  @doc false
  def changeset(%__MODULE__{} = counter, attrs) do
    counter
    |> cast(attrs, @required_fields ++ @optional_fields)
    |> validate_required(@required_fields)
  end
end

使用 @primary_key 模块属性自定义主键方案,排除主键只剩一个需要指定的 val 字段了。然后创建一个最基本的 changeset/2 函数。在这里我并没有使用唯一约束,因为主键是强制唯一的。但实际上这样不好,正确的做法还是需要在 changeset/2 函数中验证约束。本文的后续更新可能会改进。

提供数据同步层 API

上述模型定义好了,接下来就是正式的读/写库操作了。因为计数器表是为计数器缓存服务器服务的,不是为用户服务的,我们无需提供一整套增删改查。所以这里我定义了仅需的三个函数:

def val(key) do
  if c = Repo.get(Counter, key) do
    c.val
  else
    0
  end
end

def sync(key, val) do
  c = Counter |> Repo.get(key)

  cond do
    c == nil ->
      %Counter{} |> Counter.changeset(%{key: key, val: val}) |> Repo.insert()

    c.val < val ->
      c |> Counter.changeset(%{key: key, val: val}) |> Repo.update()

    true ->
      {:ok, :unchanged}
  end
end

def all do
  Counter |> Repo.all()
end

它们的功能分别是:

  • val/1 根据 key 获取计数值
  • sync/2 指定键值同步到数据库中
  • all/1 查询出数据库中所有的计数键值列表

特别注意 sync/2 函数的实现,它智能的判断了应该执行更新还是新增。假设缓存中同步过来的计数值比数据库持久化的值还要小,那一定是有问题。例如初次启动时缓存没有正确获取到持久化的值,所以这时候我们不应该更新它(没有自增反而变小的说法)。

至此,数据层的 API 已经提供完成了。

自动同步缓存

最后一步就是在计数缓存服务器的实现中添加同步代码了。我们先定义两个函数,分别是将缓存同步到数据库和从数据库同步到缓存:

defp sync_from_db do
  all = Counter.all()

  all
  |> Enum.each(fn c ->
    Mnesia.dirty_write({Counter, c.key, c.val})
  end)
end

defp sync_to_db do
  data_select = fn -> Mnesia.select(Counter, [{:_, [], [:"$_"]}]) end
  {:atomic, counters} = Mnesia.transaction(data_select)

  counters
  |> Enum.each(fn {Counter, key, val} ->
    Counter.sync(key, val)
  end)
end

我们需要在服务器的 init/1 函数中中调用 sync_from_db/0 初始化缓存:

def init(state) do
  # ...
  sync_from_db()
  # ...
end

我们还需要定期将缓存同步到数据库,也就是定时调用 sync_to_db/0 函数:

@sync_time 1000 * 60 * 60 * 15
defp schedule_sync_to_db(is_first \\ false) do
  unless is_first do
    sync_to_db()
  end

  Process.send_after(self(), :sync, @sync_time)
end

def handle_info(:sync, state) do
  state = state |> Map.put(:sync_time, DateTime.utc_now())
  schedule_sync_to_db()
  {:noreply, state}
end

我们通过 schedule_sync_to_db/1 和对应的 handle_info/2 函数的相互调用实现了循环调度,延迟时间为 15 分钟。其中 is_first 参数是为了避免启动调度之时立即同步到数据库,因为我们在 init/1 启动调度,所以应该主动传递一个 true:

def init(state) do
  # ...
  sync_from_db()
  schedule_sync_to_db(true)
  # ...
end

现在计数服务器已经真正的实现了,它会将缓存中的计数数据自动同步到数据库或在初始化时从数据库同步数据过来。

退出时触发同步

虽然我们 15 分钟就会持久化一次计数数据,但也无法避免应用重启之时丢失最近 15 分钟内新增的计数值。所以我们还需要在应用退出时立即同步,这样就能保证持久化的数据最终总是和缓存是一致的,不会丢失。

让进程捕获退出:

def init(state) do
  Process.flag(:trap_exit, true)
  #...
end

在退出时强制同步:

def handle_info({:EXIT, _from, :shutdown}, state) do
  sync_to_db()
  {:noreply, state}
end

def terminate(_reason, _state) do
  sync_to_db()
  :normal
end

现在重启应用也不会丢失最新的缓存数据了。

这些就是本博客计数器的核心实现。注意到我在调度同步的函数中给 state 写入了一个值为当前时间的 sync_time 字段,这是因为我的博客后台会显示这个字段。并且我还提供了手动同步的 API,当我后台发现同步时间不对时,可以手动强制同步。

结束语


本文对计数器的实现仅是初步可用,但是这个思路没有问题。对 Mnesia 和 GenServer 的应用也值得大家参考。当我后续有对计数器进行比较重要的优化和改进后也会更新本文。

Elixir ❤️ Mnesia 太舒服了。