rails find_each方法源码分析

在项目中我们经常会使用如下代码

1
2
3
Product.all.each do |product|
do_something
end

在实际应该中如果Product表太大,一次读取会把内存占满,Rails为了解决这个问题提供了两个方法,find_each和find_in_batches方法,把记录分成几个批次,因为find_each其实最终就是调用的find_in_batches,所以这里我们以find_in_batches为例。

1
2
3
Product.all.find_in_batches(start: 2000, batch_size: 5000,include: :infos) do |products|
do_something
end

:start 选项指定批次的起始 ID
:batch_size 每次传入的批次数量
:include 可以让指定的关联和模型一同加载。避免n+1查询
find_in_batches 方法和 find_each 类似,是二者的不同点是,find_in_batches 把整批记录作为一个数组传入代码块,而不是单独传入各记录。

到了这里我们就基本了解find_each还有find_in_batches的用法了,但是实际使用中发现了一个问题,就是如果想对Product排序,是不起作用的,比如说我想降序的方式来执行,例:

1
2
3
Product.all.order('id desc').find_each do |product|
do_something
end

这个时候会发现,终端输出一段提醒,并且排序没有起到效果

1
Scoped order and limit are ignored, it's forced to be batch order and batch size

接下来我们来分析下find_each的源码,看看究竟它是如何操作的。

1
2
3
4
5
6
7
8
9
10
11
def find_each(options = {})
if block_given?
find_in_batches(options) do |records|
records.each { |record| yield record }
end
else
enum_for :find_each, options do
options[:start] ? where(table[primary_key].gteq(options[:start])).size : size
end
end
end

我们可以看到如果有传入代码块,就会直接调用find_in_batches方法,如果没有传入代码块会全部加载并转为Enumerator类.简单介绍一下Enumerable

Enumerable提供了一系列的方法,如each,collect,map,sort之类的。include了Enumerable的类只需要实现each就可以获得所有如collect, map, sort这样的方法。

这里我们主要来看find_in_batches方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def find_in_batches(options = {})
options.assert_valid_keys(:start, :batch_size)
relation = self
start = options[:start]
batch_size = options[:batch_size] || 1000
unless block_given?
return to_enum(:find_in_batches, options) do
total = start ? where(table[primary_key].gteq(start)).size : size
(total - 1).div(batch_size) + 1
end
end
if logger && (arel.orders.present? || arel.taken.present?)
logger.warn("Scoped order and limit are ignored, it's forced to be batch order and batch size")
end
relation = relation.reorder(batch_order).limit(batch_size)
records = start ? relation.where(table[primary_key].gteq(start)).to_a : relation.to_a
while records.any?
records_size = records.size
primary_key_offset = records.last.id
raise "Primary key not included in the custom select clause" unless primary_key_offset
yield records
break if records_size < batch_size
records = relation.where(table[primary_key].gt(primary_key_offset)).to_a
end
end
private
def batch_order
"#{quoted_table_name}.#{quoted_primary_key} ASC"
end

可以看出核心的语句在19行以下,我们来看

1
relation = relation.reorder(batch_order).limit(batch_size)

reorder返回会重新定义了传入的排序,改为了以主键升序排序。并且限制了获取的数量limit batch_size.

1
records = start ? relation.where(table[primary_key].gteq(start)).to_a : relation.to_a

第20行,判断是否传入了start,如果传入了,从主键大于start来开始查找。到这里就已经获取了第一批的数据。

接下来while循环判断如果这一批数据不为空,获取到此次记录的size,以及最后一条数据的id,yield 加载执行传入的 do_something 代码块,当此次记录的size小于上次记录的size的时候,判定为最后一批记录,执行完成跳出,否则,通过每次最后一条数据id,来重新获取最新的一批records,直到全部用执行完成。

ps:还没有想好如何覆写这个方法,有时间看下如何覆写这个方法,或者自己重新定义一个新的方法来实现排序并分批次执行。