From 2fafe653b2706e947646a5ac523b0a69327909e2 Mon Sep 17 00:00:00 2001 From: Claire Date: Mon, 12 Feb 2024 13:29:36 +0100 Subject: [PATCH] Improve performance of deleting OAuth tokens --- app/lib/application_extension.rb | 9 +++++++-- app/models/user.rb | 7 +++++-- spec/models/user_spec.rb | 6 ++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/app/lib/application_extension.rb b/app/lib/application_extension.rb index 9245e00c14..400c51a023 100644 --- a/app/lib/application_extension.rb +++ b/app/lib/application_extension.rb @@ -25,8 +25,13 @@ module ApplicationExtension def push_to_streaming_api # TODO: #28793 Combine into a single topic - access_tokens.in_batches.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + access_tokens.in_batches do |tokens| + redis.pipelined do |pipeline| + tokens.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end + end end end end diff --git a/app/models/user.rb b/app/models/user.rb index 39f7b3b0cf..388be31fab 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -346,8 +346,11 @@ class User < ApplicationRecord # Revoke each access token for the Streaming API, since `update_all`` # doesn't trigger ActiveRecord Callbacks: # TODO: #28793 Combine into a single topic - batch.each do |token| - redis.publish("timeline:access_token:#{token.id}", Oj.dump(event: :kill)) + payload = Oj.dump(event: :kill) + redis.pipelined do |pipeline| + batch.ids.each do |id| + pipeline.publish("timeline:access_token:#{id}", payload) + end end end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 051e920454..3cc804af43 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -438,8 +438,10 @@ RSpec.describe User do let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } + let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } + before do - allow(redis).to receive_messages(publish: nil) + allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) user.reset_password! end @@ -457,7 +459,7 @@ RSpec.describe User do end it 'revokes streaming access for all access tokens' do - expect(redis).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once end it 'removes push subscriptions' do