Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recovery after temporary DB connection close #38

Open
DohanKim opened this issue Jan 6, 2024 · 26 comments
Open

recovery after temporary DB connection close #38

DohanKim opened this issue Jan 6, 2024 · 26 comments
Labels
bug Something isn't working

Comments

@DohanKim
Copy link
Contributor

DohanKim commented Jan 6, 2024

It happens that sometimes the DB connection is closed.
Even though other processes are restarted and reconnected shortly after the temporary connection issue is resolved,
Walex just stopped working.

Can you give me some ideas and how to implement them?
(ex: reconnecting after exponential backoff)
@cpursley

@DohanKim DohanKim changed the title recovery after temporary DB close recovery after temporary DB connection close Jan 6, 2024
@DohanKim
Copy link
Contributor Author

DohanKim commented Jan 6, 2024

I'm looking into supabase realtime codes to get some hints but can't figure out for now

@DohanKim
Copy link
Contributor Author

DohanKim commented Jan 7, 2024

Even though it is happening in my prod app once a week,
(DB connection closed temporarily -> DB connection recovered -> all processes start working again except WalEx event)
I can't manage to reproduce it in the test or dev environment.

Currently closing the issue while migrating my app from v2.3.0 to the latest version of WalEx.

Will reopen the issue when it happens again in the latest version.

@DohanKim DohanKim closed this as completed Jan 7, 2024
@DohanKim
Copy link
Contributor Author

FYI

it works really well on the latest version.

@cpursley
Copy link
Owner

Super! Thank you for your help on this.

I also pushed up a few other changes (mainly, supervisor strategies): e2a2c0f

But haven't cut a release for it yet.

@DohanKim DohanKim reopened this Jan 15, 2024
@DohanKim
Copy link
Contributor Author

DohanKim commented Jan 15, 2024

It happened again today with the latest version.
Again, the symptom is that after a short DB connection error, all other DB-related processes work again except WalEx.
It's tough to debug the problem as there is no error from WalEx at all.
I suspect that reconnecting or re-initiating to the WAL slot is not working properly.

@cpursley can you please give me some advice as this is a really critical issue on my prod service?

@cpursley
Copy link
Owner

cpursley commented Jan 15, 2024

Hum, sorry to hear that. Can you try the last master (reference the github repo instead of last release)? Supervisors are set to restart more often. And maybe fork latest master and add some logging in various places?

@cpursley cpursley added the bug Something isn't working label Jan 15, 2024
@DohanKim
Copy link
Contributor Author

will try that. thanks.

@DohanKim
Copy link
Contributor Author

DohanKim commented Jan 16, 2024

Happened today with the latest master branch.

Same symptom.
WAL slot connected successfully when I restart elixir application with :init.restart

@cpursley
Copy link
Owner

@DohanKim Is there anything showing up in logs or your error reporting system (it sounds like no, but thought I'd ask). I'd like to help but not sure how to set up a scenario that reproduces the issue.

@DohanKim
Copy link
Contributor Author

I am only getting this error from Ecto. (DBConnection.ConnectionError tcp recv: closed)
There are no other error messages, unfortunately.

I spent a couple of days reproducing the issue but was not successful.

The scenario I suspect is
WalEx connects to a WAL slot -> DB restarts -> WalEx tries to reconnect to the WAL slot which is closed already

I will try more and share the results here.

@cpursley
Copy link
Owner

Thank you for the update and sorry for the trouble.

Perhaps we could write some additional tests here to test the scenario of a restarting db: https://github.com/cpursley/walex/blob/master/test/walex/database_test.exs#L36

@DohanKim
Copy link
Contributor Author

DohanKim commented Jan 31, 2024

happened again with v3.8.0. now investigating 🥲

@cpursley
Copy link
Owner

cpursley commented Jan 31, 2024

I wonder if we should instead of creating a slot with "walex_temp_slot_" <> Integer.to_string(:rand.uniform(9_999)) type of naming, we should create a more consistent name and try to connect to that or recreate if it does not exist?

https://github.com/cpursley/walex/blob/master/lib/walex/replication/server.ex#L41

@DohanKim
Copy link
Contributor Author

DohanKim commented Feb 2, 2024

@cpursley that would be a good idea to create a slot name with app_name. let me first write a test case reproducing the error.

@cpursley
Copy link
Owner

cpursley commented Feb 2, 2024

@DohanKim could you create a PR with the experiment you are doing? That way I could also pull down and investigate. Thank you for you help on this!

@DohanKim
Copy link
Contributor Author

DohanKim commented Feb 2, 2024

@cpursley This is the test code I'm currently working on. You can just replace it with database_test.exs

defmodule WalEx.DatabaseTest do
  use ExUnit.Case, async: false
  import WalEx.Support.TestHelpers
  alias WalEx.Supervisor, as: WalExSupervisor

  require Logger

  @hostname "localhost"
  @username "postgres"
  @password "postgres"
  @database "todos_test"

  @base_configs [
    name: :todos,
    hostname: @hostname,
    username: @username,
    password: @password,
    database: @database,
    port: 5432,
    subscriptions: ["user", "todo"],
    publication: "events"
  ]

  describe "logical replication" do
    setup do
      {:ok, database_pid} = start_database()

      %{database_pid: database_pid}
    end

    test "should have logical replication set up", %{database_pid: pid} do
      show_wall_level = "SHOW wal_level;"

      assert is_pid(pid)
      assert [%{"wal_level" => "logical"}] == query(pid, show_wall_level)
    end

    test "should start replication slot", %{database_pid: database_pid} do
      assert {:ok, replication_pid} = WalExSupervisor.start_link(@base_configs)
      assert is_pid(replication_pid)

      pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

      assert [
               %{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"}
               | _replication_slots
             ] = query(database_pid, pg_replication_slots)

      assert String.contains?(slot_name, "walex_temp_slot")
    end

    test "should re-initiate after DB reconnection", %{database_pid: database_pid} do
      {:ok, supervisor_pid} = TestSupervisor.start_link()

      database_pid =
        Supervisor.which_children(supervisor_pid)
        |> tap(&Logger.debug("Children" <> inspect(&1)))
        |> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
        |> elem(1)
        |> tap(&Logger.debug("Database pid" <> inspect(&1)))

      pg_replication_slots = "SELECT slot_name, slot_type, active FROM \"pg_replication_slots\";"

      query(database_pid, pg_replication_slots)
      |> tap(&Logger.debug("Replication slots" <> inspect(&1)))

      name =
        WalEx.Config.Registry.set_name(:set_gen_server, WalEx.Replication.Server, :todos)
        |> tap(&Logger.debug("Server name" <> inspect(&1)))

      replication_server_pid =
        GenServer.whereis(name)
        |> tap(&Logger.debug("Server pid" <> inspect(&1)))

      # {output, exit_code} = System.cmd("sudo", ["service", "postgresql", "restart"])

      Process.info(database_pid) |> tap(&Logger.debug("Database pid" <> inspect(&1)))

      update_user(database_pid)

      :timer.sleep(3000)

      Supervisor.terminate_child(supervisor_pid, DBConnection.ConnectionPool)
      |> tap(&Logger.debug("Terminated" <> inspect(&1)))

      Process.info(database_pid) |> tap(&Logger.debug("Database pid" <> inspect(&1)))

      Supervisor.which_children(supervisor_pid)
      |> tap(&Logger.debug("Children" <> inspect(&1)))

      # Process.exit(database_pid, :kill)

      Logger.debug("waiting")
      :timer.sleep(3000)

      Logger.debug("done waiting")

      Supervisor.restart_child(supervisor_pid, DBConnection.ConnectionPool)
      |> tap(&Logger.debug("Restarted" <> inspect(&1)))

      Supervisor.which_children(supervisor_pid)
      |> tap(&Logger.debug("Children" <> inspect(&1)))

      database_pid =
        Supervisor.which_children(supervisor_pid)
        |> tap(&Logger.debug("Children" <> inspect(&1)))
        |> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
        |> elem(1)
        |> tap(&Logger.debug("Database pid" <> inspect(&1)))

      update_user(database_pid)

      database_pid =
        Supervisor.which_children(supervisor_pid)
        |> tap(&Logger.debug("Children" <> inspect(&1)))
        |> Enum.find(&match?({DBConnection.ConnectionPool, _, _, _}, &1))
        |> elem(1)

      query(database_pid, pg_replication_slots)
      |> tap(&Logger.debug("Replication slots" <> inspect(&1)))

      # assert [
      #          %{"active" => true, "slot_name" => slot_name, "slot_type" => "logical"}
      #          | _replication_slots
      #        ] = query(database_pid, pg_replication_slots)

      # assert String.contains?(slot_name, "walex_temp_slot")
    end
  end

  def start_database do
    Postgrex.start_link(
      hostname: @hostname,
      username: @username,
      password: @password,
      database: @database
    )
  end

  def query(pid, query) do
    pid
    |> Postgrex.query!(query, [])
    |> map_rows_to_columns()
  end

  def map_rows_to_columns(%Postgrex.Result{columns: columns, rows: rows}) do
    Enum.map(rows, fn row -> Enum.zip(columns, row) |> Map.new() end)
  end

  def map_rows_to_columns(_result), do: []
end

defmodule TestSupervisor do
  use Supervisor

  @hostname "localhost"
  @username "postgres"
  @password "postgres"
  @database "todos_test"

  @base_configs [
    name: :todos,
    hostname: @hostname,
    username: @username,
    password: @password,
    database: @database,
    port: 5432,
    subscriptions: ["user", "todo"],
    publication: "events",
    destinations: [
      modules: [TestModule]
    ]
  ]

  def start_link do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    children = [
      {Postgrex,
       [hostname: @hostname, username: @username, password: @password, database: @database]},
      {WalEx.Supervisor, @base_configs}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

defmodule TestModule do
  require Logger
  use WalEx.Event, name: :test_app

  on_event(
    :all,
    fn events -> Logger.info("on_event event occurred: #{inspect(events, pretty: true)}") end
  )

  on_update(
    :user,
    [],
    fn events -> Logger.info("on_update event occurred: #{inspect(events, pretty: true)}") end
  )
end

@cpursley
Copy link
Owner

cpursley commented Feb 3, 2024

@DohanKim I took your idea and created a test branch here: #46

I also set slot name to the app name: https://github.com/cpursley/walex/pull/46/files#diff-f7aa5bafef0b9d259456d1b5344450f3ae79ce730a61d65d6e0cae665592ad4cR43

Please let me know what if this covers the situation you've been experiencing. Feel free to make your own changes. I want to be sure we cover all possible connection cases.

Thanks!

@cpursley
Copy link
Owner

cpursley commented Feb 4, 2024

I added another test case that attempts to stop Postgres via command line. It seems to work on MacOS where Postgres was installed via Postgres.app. Postgres on MacOS via homebrew is also covered but untested.

Also, linux (Debian) is covered but I haven't tested locally. It does not work on the Github Workflow due to no sudo access (and that it's in docker and I don't believe can actually be started/stopped in the test runner).

I'm not sure what type of local machine you use, but I would appreciate you testing this and reporting back. Thanks!

assert :ok == pg_restart()

@DohanKim
Copy link
Contributor Author

DohanKim commented Feb 5, 2024

@cpursley Thanks for the effort!

I'm using M1 Mac (Apple Silicon) locally (and Supabase on prod).
After adding some codes as homebrew installs Postgres in "/opt/homebrew/Cellar/postgresql", the tests working well.

But the test cases are still passing with the random slot names, meaning that the test cases are not covering the case my prod server is experiencing.

I'll try to write a test case covering my error case.

@DohanKim
Copy link
Contributor Author

DohanKim commented Feb 5, 2024

I've spent roughly a week attempting to replicate the issue, but haven't succeeded 🥲.
I plan to switch the approach and implement thorough logging within the library.
I could either tailor the logging only for my use or make it a pull request for the community.
Which option would you prefer?

@cpursley
Copy link
Owner

cpursley commented Feb 5, 2024

Feel free to put in logging! We can always remove later when the issue is resolved.

@cpursley
Copy link
Owner

cpursley commented Feb 5, 2024

Also, could you submit a change to this branch with your homebrew related changes? But please modify so that the version number is dynamic instead of hard coded.

@cpursley
Copy link
Owner

Hi @DohanKim ~ any thoughts on my previous comments?

@DohanKim
Copy link
Contributor Author

@cpursley sorry for the late reply. I'm running late on my service upgrade 🥲.

Anyway, I had put a couple of loggings inside my app instead of WalEx and found some clues.
After DB reconnection, the temporary replication slot WalEx is using is closed and a new one is not opened when DB recovers from the error.

I'll spend at most a couple of days reproducing the error again in the test code, and if it's not successful,
again, I'll start putting logging and creating PR.
Sounds okay?

@DohanKim
Copy link
Contributor Author

Also, I'll submit PR with homebrew related codes.

@cpursley
Copy link
Owner

@DohanKim Thanks for the changes. I went ahead and merged them to make it easer (so we can start new branches as needed).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants