Fix unbounded recursion in account discovery (v3.5 backport) (#22026)
* Fix trying to fetch posts from other users when fetching featured posts * Rate-limit discovery of new subdomains * Put a limit on recursively discovering new accounts
This commit is contained in:
parent
696f7b3608
commit
ee66f5790f
1
Gemfile
1
Gemfile
|
@ -66,6 +66,7 @@ gem 'oj', '~> 3.13'
|
||||||
gem 'ox', '~> 2.14'
|
gem 'ox', '~> 2.14'
|
||||||
gem 'parslet'
|
gem 'parslet'
|
||||||
gem 'posix-spawn'
|
gem 'posix-spawn'
|
||||||
|
gem 'public_suffix', '~> 4.0.7'
|
||||||
gem 'pundit', '~> 2.2'
|
gem 'pundit', '~> 2.2'
|
||||||
gem 'premailer-rails'
|
gem 'premailer-rails'
|
||||||
gem 'rack-attack', '~> 6.6'
|
gem 'rack-attack', '~> 6.6'
|
||||||
|
|
|
@ -803,6 +803,7 @@ DEPENDENCIES
|
||||||
private_address_check (~> 0.5)
|
private_address_check (~> 0.5)
|
||||||
pry-byebug (~> 3.9)
|
pry-byebug (~> 3.9)
|
||||||
pry-rails (~> 0.3)
|
pry-rails (~> 0.3)
|
||||||
|
public_suffix (~> 4.0.7)
|
||||||
puma (~> 5.6)
|
puma (~> 5.6)
|
||||||
pundit (~> 2.2)
|
pundit (~> 2.2)
|
||||||
rack (~> 2.2.3)
|
rack (~> 2.2.3)
|
||||||
|
|
|
@ -222,7 +222,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
|
||||||
return if tag['href'].blank?
|
return if tag['href'].blank?
|
||||||
|
|
||||||
account = account_from_uri(tag['href'])
|
account = account_from_uri(tag['href'])
|
||||||
account = ActivityPub::FetchRemoteAccountService.new.call(tag['href']) if account.nil?
|
account = ActivityPub::FetchRemoteAccountService.new.call(tag['href'], request_id: @options[:request_id]) if account.nil?
|
||||||
|
|
||||||
return if account.nil?
|
return if account.nil?
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ class ActivityPub::Activity::Update < ActivityPub::Activity
|
||||||
def update_account
|
def update_account
|
||||||
return reject_payload! if @account.uri != object_uri
|
return reject_payload! if @account.uri != object_uri
|
||||||
|
|
||||||
ActivityPub::ProcessAccountService.new.call(@account.username, @account.domain, @object, signed_with_known_key: true)
|
ActivityPub::ProcessAccountService.new.call(@account.username, @account.domain, @object, signed_with_known_key: true, request_id: @options[:request_id])
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_status
|
def update_status
|
||||||
|
@ -28,6 +28,6 @@ class ActivityPub::Activity::Update < ActivityPub::Activity
|
||||||
|
|
||||||
return if @status.nil?
|
return if @status.nil?
|
||||||
|
|
||||||
ActivityPub::ProcessStatusUpdateService.new.call(@status, @object)
|
ActivityPub::ProcessStatusUpdateService.new.call(@status, @object, request_id: @options[:request_id])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,11 +3,24 @@
|
||||||
module DomainMaterializable
|
module DomainMaterializable
|
||||||
extend ActiveSupport::Concern
|
extend ActiveSupport::Concern
|
||||||
|
|
||||||
|
include Redisable
|
||||||
|
|
||||||
included do
|
included do
|
||||||
after_create_commit :refresh_instances_view
|
after_create_commit :refresh_instances_view
|
||||||
end
|
end
|
||||||
|
|
||||||
def refresh_instances_view
|
def refresh_instances_view
|
||||||
Instance.refresh unless domain.nil? || Instance.where(domain: domain).exists?
|
return if domain.nil? || Instance.exists?(domain: domain)
|
||||||
|
|
||||||
|
Instance.refresh
|
||||||
|
count_unique_subdomains!
|
||||||
|
end
|
||||||
|
|
||||||
|
def count_unique_subdomains!
|
||||||
|
second_and_top_level_domain = PublicSuffix.domain(domain, ignore_private: true)
|
||||||
|
with_redis do |redis|
|
||||||
|
redis.pfadd("unique_subdomains_for:#{second_and_top_level_domain}", domain)
|
||||||
|
redis.expire("unique_subdomains_for:#{second_and_top_level_domain}", 1.minute.seconds)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,10 +3,11 @@
|
||||||
class ActivityPub::FetchFeaturedCollectionService < BaseService
|
class ActivityPub::FetchFeaturedCollectionService < BaseService
|
||||||
include JsonLdHelper
|
include JsonLdHelper
|
||||||
|
|
||||||
def call(account)
|
def call(account, **options)
|
||||||
return if account.featured_collection_url.blank? || account.suspended? || account.local?
|
return if account.featured_collection_url.blank? || account.suspended? || account.local?
|
||||||
|
|
||||||
@account = account
|
@account = account
|
||||||
|
@options = options
|
||||||
@json = fetch_resource(@account.featured_collection_url, true, local_follower)
|
@json = fetch_resource(@account.featured_collection_url, true, local_follower)
|
||||||
|
|
||||||
return unless supported_context?(@json)
|
return unless supported_context?(@json)
|
||||||
|
@ -38,9 +39,9 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService
|
||||||
def process_items(items)
|
def process_items(items)
|
||||||
status_ids = items.filter_map do |item|
|
status_ids = items.filter_map do |item|
|
||||||
uri = value_or_id(item)
|
uri = value_or_id(item)
|
||||||
next if ActivityPub::TagManager.instance.local_uri?(uri)
|
next if ActivityPub::TagManager.instance.local_uri?(uri) || invalid_origin?(uri)
|
||||||
|
|
||||||
status = ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: local_follower)
|
status = ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: local_follower, expected_actor_uri: @account.uri, request_id: @options[:request_id])
|
||||||
next unless status&.account_id == @account.id
|
next unless status&.account_id == @account.id
|
||||||
|
|
||||||
status.id
|
status.id
|
||||||
|
|
|
@ -8,7 +8,7 @@ class ActivityPub::FetchRemoteAccountService < BaseService
|
||||||
SUPPORTED_TYPES = %w(Application Group Organization Person Service).freeze
|
SUPPORTED_TYPES = %w(Application Group Organization Person Service).freeze
|
||||||
|
|
||||||
# Does a WebFinger roundtrip on each call, unless `only_key` is true
|
# Does a WebFinger roundtrip on each call, unless `only_key` is true
|
||||||
def call(uri, id: true, prefetched_body: nil, break_on_redirect: false, only_key: false)
|
def call(uri, id: true, prefetched_body: nil, break_on_redirect: false, only_key: false, request_id: nil)
|
||||||
return if domain_not_allowed?(uri)
|
return if domain_not_allowed?(uri)
|
||||||
return ActivityPub::TagManager.instance.uri_to_resource(uri, Account) if ActivityPub::TagManager.instance.local_uri?(uri)
|
return ActivityPub::TagManager.instance.uri_to_resource(uri, Account) if ActivityPub::TagManager.instance.local_uri?(uri)
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ class ActivityPub::FetchRemoteAccountService < BaseService
|
||||||
|
|
||||||
return unless only_key || verified_webfinger?
|
return unless only_key || verified_webfinger?
|
||||||
|
|
||||||
ActivityPub::ProcessAccountService.new.call(@username, @domain, @json, only_key: only_key, verified_webfinger: !only_key)
|
ActivityPub::ProcessAccountService.new.call(@username, @domain, @json, only_key: only_key, verified_webfinger: !only_key, request_id: request_id)
|
||||||
rescue Oj::ParseError
|
rescue Oj::ParseError
|
||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
|
|
@ -4,7 +4,8 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||||
include JsonLdHelper
|
include JsonLdHelper
|
||||||
|
|
||||||
# Should be called when uri has already been checked for locality
|
# Should be called when uri has already been checked for locality
|
||||||
def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil)
|
def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil)
|
||||||
|
@request_id = request_id
|
||||||
@json = begin
|
@json = begin
|
||||||
if prefetched_body.nil?
|
if prefetched_body.nil?
|
||||||
fetch_resource(uri, id, on_behalf_of)
|
fetch_resource(uri, id, on_behalf_of)
|
||||||
|
@ -30,6 +31,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||||
end
|
end
|
||||||
|
|
||||||
return if activity_json.nil? || object_uri.nil? || !trustworthy_attribution?(@json['id'], actor_uri)
|
return if activity_json.nil? || object_uri.nil? || !trustworthy_attribution?(@json['id'], actor_uri)
|
||||||
|
return if expected_actor_uri.present? && actor_uri != expected_actor_uri
|
||||||
return ActivityPub::TagManager.instance.uri_to_resource(object_uri, Status) if ActivityPub::TagManager.instance.local_uri?(object_uri)
|
return ActivityPub::TagManager.instance.uri_to_resource(object_uri, Status) if ActivityPub::TagManager.instance.local_uri?(object_uri)
|
||||||
|
|
||||||
actor = account_from_uri(actor_uri)
|
actor = account_from_uri(actor_uri)
|
||||||
|
@ -40,7 +42,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||||
# activity as an update rather than create
|
# activity as an update rather than create
|
||||||
activity_json['type'] = 'Update' if equals_or_includes_any?(activity_json['type'], %w(Create)) && Status.where(uri: object_uri, account_id: actor.id).exists?
|
activity_json['type'] = 'Update' if equals_or_includes_any?(activity_json['type'], %w(Create)) && Status.where(uri: object_uri, account_id: actor.id).exists?
|
||||||
|
|
||||||
ActivityPub::Activity.factory(activity_json, actor).perform
|
ActivityPub::Activity.factory(activity_json, actor, request_id: request_id).perform
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -52,7 +54,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService
|
||||||
|
|
||||||
def account_from_uri(uri)
|
def account_from_uri(uri)
|
||||||
actor = ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
|
actor = ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
|
||||||
actor = ActivityPub::FetchRemoteAccountService.new.call(uri, id: true) if actor.nil? || actor.possibly_stale?
|
actor = ActivityPub::FetchRemoteAccountService.new.call(uri, id: true, request_id: @request_id) if actor.nil? || actor.possibly_stale?
|
||||||
actor
|
actor
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,9 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||||
include Redisable
|
include Redisable
|
||||||
include Lockable
|
include Lockable
|
||||||
|
|
||||||
|
SUBDOMAINS_RATELIMIT = 10
|
||||||
|
DISCOVERIES_PER_REQUEST = 400
|
||||||
|
|
||||||
# Should be called with confirmed valid JSON
|
# Should be called with confirmed valid JSON
|
||||||
# and WebFinger-resolved username and domain
|
# and WebFinger-resolved username and domain
|
||||||
def call(username, domain, json, options = {})
|
def call(username, domain, json, options = {})
|
||||||
|
@ -15,9 +18,12 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||||
@json = json
|
@json = json
|
||||||
@uri = @json['id']
|
@uri = @json['id']
|
||||||
@username = username
|
@username = username
|
||||||
@domain = domain
|
@domain = TagManager.instance.normalize_domain(domain)
|
||||||
@collections = {}
|
@collections = {}
|
||||||
|
|
||||||
|
# The key does not need to be unguessable, it just needs to be somewhat unique
|
||||||
|
@options[:request_id] ||= "#{Time.now.utc.to_i}-#{username}@#{domain}"
|
||||||
|
|
||||||
with_lock("process_account:#{@uri}") do
|
with_lock("process_account:#{@uri}") do
|
||||||
@account = Account.remote.find_by(uri: @uri) if @options[:only_key]
|
@account = Account.remote.find_by(uri: @uri) if @options[:only_key]
|
||||||
@account ||= Account.find_remote(@username, @domain)
|
@account ||= Account.find_remote(@username, @domain)
|
||||||
|
@ -25,7 +31,18 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||||
@old_protocol = @account&.protocol
|
@old_protocol = @account&.protocol
|
||||||
@suspension_changed = false
|
@suspension_changed = false
|
||||||
|
|
||||||
create_account if @account.nil?
|
if @account.nil?
|
||||||
|
with_redis do |redis|
|
||||||
|
return nil if redis.pfcount("unique_subdomains_for:#{PublicSuffix.domain(@domain, ignore_private: true)}") >= SUBDOMAINS_RATELIMIT
|
||||||
|
|
||||||
|
discoveries = redis.incr("discovery_per_request:#{@options[:request_id]}")
|
||||||
|
redis.expire("discovery_per_request:#{@options[:request_id]}", 5.minutes.seconds)
|
||||||
|
return nil if discoveries > DISCOVERIES_PER_REQUEST
|
||||||
|
end
|
||||||
|
|
||||||
|
create_account
|
||||||
|
end
|
||||||
|
|
||||||
update_account
|
update_account
|
||||||
process_tags
|
process_tags
|
||||||
|
|
||||||
|
@ -149,7 +166,7 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_featured_collection!
|
def check_featured_collection!
|
||||||
ActivityPub::SynchronizeFeaturedCollectionWorker.perform_async(@account.id)
|
ActivityPub::SynchronizeFeaturedCollectionWorker.perform_async(@account.id, { 'request_id' => @options[:request_id] })
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_links!
|
def check_links!
|
||||||
|
@ -249,7 +266,7 @@ class ActivityPub::ProcessAccountService < BaseService
|
||||||
|
|
||||||
def moved_account
|
def moved_account
|
||||||
account = ActivityPub::TagManager.instance.uri_to_resource(@json['movedTo'], Account)
|
account = ActivityPub::TagManager.instance.uri_to_resource(@json['movedTo'], Account)
|
||||||
account ||= ActivityPub::FetchRemoteAccountService.new.call(@json['movedTo'], id: true, break_on_redirect: true)
|
account ||= ActivityPub::FetchRemoteAccountService.new.call(@json['movedTo'], id: true, break_on_redirect: true, request_id: @options[:request_id])
|
||||||
account
|
account
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
|
||||||
include Redisable
|
include Redisable
|
||||||
include Lockable
|
include Lockable
|
||||||
|
|
||||||
def call(status, json)
|
def call(status, json, request_id: nil)
|
||||||
raise ArgumentError, 'Status has unsaved changes' if status.changed?
|
raise ArgumentError, 'Status has unsaved changes' if status.changed?
|
||||||
|
|
||||||
@json = json
|
@json = json
|
||||||
|
@ -15,6 +15,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
|
||||||
@account = status.account
|
@account = status.account
|
||||||
@media_attachments_changed = false
|
@media_attachments_changed = false
|
||||||
@poll_changed = false
|
@poll_changed = false
|
||||||
|
@request_id = request_id
|
||||||
|
|
||||||
# Only native types can be updated at the moment
|
# Only native types can be updated at the moment
|
||||||
return @status if !expected_type? || already_updated_more_recently?
|
return @status if !expected_type? || already_updated_more_recently?
|
||||||
|
@ -185,7 +186,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
|
||||||
next if href.blank?
|
next if href.blank?
|
||||||
|
|
||||||
account = ActivityPub::TagManager.instance.uri_to_resource(href, Account)
|
account = ActivityPub::TagManager.instance.uri_to_resource(href, Account)
|
||||||
account ||= ActivityPub::FetchRemoteAccountService.new.call(href)
|
account ||= ActivityPub::FetchRemoteAccountService.new.call(href, request_id: @request_id)
|
||||||
|
|
||||||
next if account.nil?
|
next if account.nil?
|
||||||
|
|
||||||
|
|
|
@ -5,8 +5,10 @@ class ActivityPub::SynchronizeFeaturedCollectionWorker
|
||||||
|
|
||||||
sidekiq_options queue: 'pull', lock: :until_executed
|
sidekiq_options queue: 'pull', lock: :until_executed
|
||||||
|
|
||||||
def perform(account_id)
|
def perform(account_id, options = {})
|
||||||
ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id))
|
options = { note: true, hashtag: false }.deep_merge(options.deep_symbolize_keys)
|
||||||
|
|
||||||
|
ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id), **options)
|
||||||
rescue ActiveRecord::RecordNotFound
|
rescue ActiveRecord::RecordNotFound
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
|
@ -109,4 +109,98 @@ RSpec.describe ActivityPub::ProcessAccountService, type: :service do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'discovering many subdomains in a short timeframe' do
|
||||||
|
before do
|
||||||
|
stub_const 'ActivityPub::ProcessAccountService::SUBDOMAINS_RATELIMIT', 5
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:subject) do
|
||||||
|
8.times do |i|
|
||||||
|
domain = "test#{i}.testdomain.com"
|
||||||
|
json = {
|
||||||
|
id: "https://#{domain}/users/1",
|
||||||
|
type: 'Actor',
|
||||||
|
inbox: "https://#{domain}/inbox",
|
||||||
|
}.with_indifferent_access
|
||||||
|
described_class.new.call('alice', domain, json)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'creates at least some accounts' do
|
||||||
|
expect { subject }.to change { Account.remote.count }.by_at_least(2)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'creates no more account than the limit allows' do
|
||||||
|
expect { subject }.to change { Account.remote.count }.by_at_most(5)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'accounts referencing other accounts' do
|
||||||
|
before do
|
||||||
|
stub_const 'ActivityPub::ProcessAccountService::DISCOVERIES_PER_REQUEST', 5
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:payload) do
|
||||||
|
{
|
||||||
|
'@context': ['https://www.w3.org/ns/activitystreams'],
|
||||||
|
id: 'https://foo.test/users/1',
|
||||||
|
type: 'Person',
|
||||||
|
inbox: 'https://foo.test/inbox',
|
||||||
|
featured: 'https://foo.test/users/1/featured',
|
||||||
|
preferredUsername: 'user1',
|
||||||
|
}.with_indifferent_access
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
8.times do |i|
|
||||||
|
actor_json = {
|
||||||
|
'@context': ['https://www.w3.org/ns/activitystreams'],
|
||||||
|
id: "https://foo.test/users/#{i}",
|
||||||
|
type: 'Person',
|
||||||
|
inbox: 'https://foo.test/inbox',
|
||||||
|
featured: "https://foo.test/users/#{i}/featured",
|
||||||
|
preferredUsername: "user#{i}",
|
||||||
|
}.with_indifferent_access
|
||||||
|
status_json = {
|
||||||
|
'@context': ['https://www.w3.org/ns/activitystreams'],
|
||||||
|
id: "https://foo.test/users/#{i}/status",
|
||||||
|
attributedTo: "https://foo.test/users/#{i}",
|
||||||
|
type: 'Note',
|
||||||
|
content: "@user#{i + 1} test",
|
||||||
|
tag: [
|
||||||
|
{
|
||||||
|
type: 'Mention',
|
||||||
|
href: "https://foo.test/users/#{i + 1}",
|
||||||
|
name: "@user#{i + 1 }",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
to: [ 'as:Public', "https://foo.test/users/#{i + 1}" ]
|
||||||
|
}.with_indifferent_access
|
||||||
|
featured_json = {
|
||||||
|
'@context': ['https://www.w3.org/ns/activitystreams'],
|
||||||
|
id: "https://foo.test/users/#{i}/featured",
|
||||||
|
type: 'OrderedCollection',
|
||||||
|
totelItems: 1,
|
||||||
|
orderedItems: [status_json],
|
||||||
|
}.with_indifferent_access
|
||||||
|
webfinger = {
|
||||||
|
subject: "acct:user#{i}@foo.test",
|
||||||
|
links: [{ rel: 'self', href: "https://foo.test/users/#{i}" }],
|
||||||
|
}.with_indifferent_access
|
||||||
|
stub_request(:get, "https://foo.test/users/#{i}").to_return(status: 200, body: actor_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
|
||||||
|
stub_request(:get, "https://foo.test/users/#{i}/featured").to_return(status: 200, body: featured_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
|
||||||
|
stub_request(:get, "https://foo.test/users/#{i}/status").to_return(status: 200, body: status_json.to_json, headers: { 'Content-Type': 'application/activity+json' })
|
||||||
|
stub_request(:get, "https://foo.test/.well-known/webfinger?resource=acct:user#{i}@foo.test").to_return(body: webfinger.to_json, headers: { 'Content-Type': 'application/jrd+json' })
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'creates at least some accounts' do
|
||||||
|
expect { subject.call('user1', 'foo.test', payload) }.to change { Account.remote.count }.by_at_least(2)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'creates no more account than the limit allows' do
|
||||||
|
expect { subject.call('user1', 'foo.test', payload) }.to change { Account.remote.count }.by_at_most(5)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue