使用 Python 进行可靠的文件 IO

看到一篇描述关于 Python IO 的英文文章,翻译分享出来。

程序经常需要读写更新文件。虽然大多数的程序员知道在 IO 过程中会出现异常,但是很多人写的 IO 代码的方式都非常原始。在本文章中,我会分享一些python 提高 IO 可靠性的内部细节。

请看下面的代码片段,这段代码从一个文件中读取数据,经过处理后又写入了该文件。

1
2
3
4
5
with open(filename) as f:
input = f.read()
output = do_something(input)
with open(filename, 'w') as f:
f.write(output)

相当简单吧?但是使用 Python 进行 IO 并不像 Python 语言看起来那样简单。我在生产环境的服务器上调试应用程序经常会遇到以下几个故障模型:

  • 失控的服务器进程产生大量日志并把磁盘空间占满。此时再次调用write()函数会抛出异常,导致文件截断,造成文件内容丢失;
  • 多个并行运行的程序同时对一个文件进行 IO,导致文件内容错乱;
  • 服务器掉电导致写文件未刷入磁盘,进而操作程序数据不一致。

下面引入的内容都不是什么新鲜玩意。我的目的就是向一些缺乏系统编程经验的 Python 开发者分析一些通用的方法和技术。我将对讲的每个方法都提供代码样例,以便开发者可以把这些方法应用在自己的程序中。

“可靠性”到底意味着什么

从最广泛的意义上讲,可靠性意味着一个操作可以在所有条件下都能完成其功能的执行。对于普通文件来说,功能就是是创建、替换或扩展文件的内容。最为数据库的起源,我们可以根据数据库理论中经典交易模型的ACID原则作为提高文件更新可靠性的指导原则。

让我们用ACID原则的四个方面对上面文件 IO 的代码片段进行分析:

  • 原子性(Atomicity):一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被恢复(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。在上面例子中,磁盘满可能会导致一次写操作只写入一部分。此外,如果其他程序在写入文件时读取该文件,即使写操作没有错误,程序也可能只读到文件一部分内容。
  • 一致性(Consistency):表示更新必须使系统从一个有效状态到另一个有效状态。一致性可以细分为内部一致性和外部一致性:内部一致性意味着文件的数据结构是一致的(完整的)。外部一致性意味着文件的内容与其相关的其他数据一致(比如文件内容和程序内存中内容一致)。在前面的例子中,由于缺少对应用程序的描述,我们很难推断出是否会出现不一致的问题。但由于一致性需要原子性,我们至少可以说内部一致性得不到保证。
  • 隔离性(Isolation):允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以防止多个事务并发执行时由于交叉执行而导致数据的不一致。很明显,上面的代码无法防止更新丢失或其他隔离故障。
  • 持久性(Durability):事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。在我们向用户返回成功之前,我们必须确保我们的数据已经落盘而不是仅仅写入缓存。也许写上面的代码的人会在自己脑海中假设调用write()函数时会立即发生磁盘IO。但事实上POSIX语义不保证这种假设。

请使用数据库

如果可以满足 ACID 的四个基本要素,那我们的代码可以说就很稳定了。但是这要求相当的代码量去实现。事实上,大部分数据库系统都满足 ACID 事务,我们为什么要去重复造轮子呢。

可靠的数据存储是一件已经解决的问题。如果需要可靠稳定的存储,请使用数据库。因为你自己写的代码很大可能没有在这方面工作了几十年的专家写的好。如果你不想去部署一个“巨型”数据库服务器,你可以使用sqlit。这个数据库满足 ACID,足够小巧,并且免费,最重要的是 Python 标准库内置了对它的支持。

看到这里,这篇文章看起来可以结束了。但是,还是有许多很充分的理由不去使用数据库。这些理由经常和文件格式文件位置相关。这两个因素都不容易使用数据库系统进行控制。原因如下:

  • 我们有时候必须处理其他应用程序生成的文件,而这些文件可能有固定的格式和位置;
  • 我们必须写文件来供其他应用程序使用(约束同上);
  • 我们的文件必须是人类可读可编辑的

除此之外,还有各种原因,你懂得。

如果我们打算自己实现可靠的文件更新,则需要考虑使用一些编程技巧。 在下文中,我将介绍文件更新的四种常见模式。然后,我将讨论可以采取哪些来为每个文件更新模式建立ACID属性。

文件更新模式

文件更新有许多中方式,但是有四种通用的模式。这四种模式是后文的基础。

模式1:截断-写入

与上面的例子一样,下面代码的模式可能是最基本的文件更新模式了。在下面的例子中,程序从文件中读数据,利用数据进行一些计算,再以写模式重新打开这个文件。

1
2
3
4
5
with open(filename, 'r') as f:
model.read(f)
model.process()
with open(filename, 'w') as f:
model.write(f)

截断-写模式对上面的例子做一个变体。以读写模式打开文件(Python中的“+”模式),在写操作时先seek 到文件头进行读取,然后显式调用truncate(),最后重写内容:

1
2
3
4
5
6
7
with open(filename, 'a+') as f:
f.seek(0)
model.input(f.read())
model.compute()
f.seek(0)
f.truncate()
f.write(model.output())

这种变体的优势是只用打开文件一次并且一直保持文件打开。这样可以简化加锁操作。

模式2:写入-复制

另一种被广泛使用的模式是把新的内容写入一个临时文件,再用临时文件对原文进行替换。

1
2
3
4
5
with tempfile.NamedTemporaryFile(
'w', dir=os.path.dirname(filename), delete=False) as tf:
tf.write(model.output())
tempname = tf.name
os.rename(tempname, filename)

这种方法比截断-写模式对错误更具备鲁棒性,具体看下面对原子性和一致性的讨论。这种模式被许多应用使用(Emacs、word等)。

模式1和模式2非常通用,以至于Linux 内核中的 ext4 文件系统甚至可以检测到这两种模式并自动修复一些隐藏缺陷,如下图所示。但是不要依赖于这两种模式,因为你并不能保证你的程序总跑在 ext4上,并且管理员有可能关闭这个功能。

屏幕快照 2018-10-19 11.17.57

模式3:追加写

模式3是追加新的数据到已有文件:

1
2
with open(filename, 'a') as f:
f.write(model.output())

这种模式被用来写日志文件或其他记录数据的任务。从技术上讲,模式3的特点是极其简洁。对模式3有一种有趣的扩展,就是是在常规的文件更新操作期间仅进行追加操作,同时定期将文件重新组织为更紧凑的形式来节省空间。

模式4:spooldir

在这里,我们将目录视为逻辑数据存储,并为每条记录创建一个新的唯一命名文件:

1
2
with open(unique_filename(), 'w') as f:
f.write(model.output())

此模式与追加模式共享其累积性质。 一个很大的优点是我们可以将少量元数据放入文件名中。 例如,用于传达有关处理状态的信息。 spooldir模式的一个特别聪明的实现是maildir格式。 maildirs使用带有其他子目录的命名方案以可靠且无锁的方式执行更新操作。mdgocept.filestore 库为maildir操作提供了方便的封装。

如果生成的文件名无法保证是唯一的,那么至少保证文件是新的。使用底层os.open()调用时要使用合适的参数:

1
2
3
fd = os.open(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL, 0o666)
with os.fdopen(fd, 'w') as f:
f.write(...)

当使用 O_EXCL 打开文件时,我们使用 os.fdopen把只读的文件描述符转化为一个Python 的普通文件对象。

应用 ACID 到文件更新中

在下文中,我将尝试依次满足每个ACID属性来增强文件更新的模式。 因为我们不打算编写完整的数据库系统,所以我将尽可能简单地实现这些内容。 下文内容不是尽善尽美的,只用作抛砖引玉。

原子性

模式2写入-替换提供了原子性,因为底层os.rename()是原子的。这意味着,在任何时间点,任何进程要么看到旧文件要么看到新文件。这个模式对写操作错误具有鲁棒性:如果写操作抛出异常,那么替换操作永远都不会执行,因此不会出现已损坏新文件替换无损坏旧文件这种危险操作。

模式3追加本身不是原子的,因为追加写有不完全写的风险。但是有一个小技巧可以让追加写看起来是原子的:使用校验和来注释每个写入的记录。稍后读取日志时,丢弃所有没有有效校验和的记录。这样,只会处理完整的记录。在以下示例中,应用程序进行定期测量,并每次将一行JSON记录附加到日志中。 我们计算记录的字节表示的CRC32校验和,并将其附加到同一行:

1
2
3
4
5
6
with open(logfile, 'ab') as f:
for i in range(3):
measure = {'timestamp': time.time(), 'value': random.random()}
record = json.dumps(measure).encode()
checksum = '{:8x}'.format(zlib.crc32(record)).encode()
f.write(record + b' ' + checksum + b'\n')

示例代码每秒创建一个随机数模拟测量值。

1
2
3
4
$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937

处理日志时,我们逐行读取日志文件,从每条日志中抽出校验和,并和记录进行比较:

1
2
3
4
5
6
7
with open(logfile, 'rb') as f:
for line in f:
record, checksum = line.strip().rsplit(b' ', 1)
if checksum.decode() == '{:8x}'.format(zlib.crc32(record)):
print('read measure: {}'.format(json.loads(record.decode())))
else:
print('checksum error for record {}'.format(record))

现在我们模拟一个被截断的写操作:

1
2
3
4
$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.23202

在读取日志时,最后一行不完整的内容被拒绝了:

1
2
3
4
$ read_checksummed_log.py log
read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828}
read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424}
checksum error for record b'{"timestamp": 1373396987.258291, "value":'

校验和日志被用在许多应用程序中,包括许多数据库系统。

模式4 spooldir中的各个文件同样可以在每个文件中包含校验和。 另一种可能更容易的方法是从模式2写入中借用:首先将文件写入临时文件,然后将其移动到最终位置。设计一个命名方案,保护正在进行中的文件不被消费者处理。在以下示例中,读取器将忽略以.tmp结尾的所有文件名,因此在写入操作期间可以安全使用:

1
2
3
4
newfile = generate_id()
with open(newfile + '.tmp', 'w') as f:
f.write(model.output())
os.rename(newfile + '.tmp', newfile)

最后,模式1 截断-写入是非原子的。在执行截断操作之后,文件内容被清除并且尚未写入新内容。如果并发程序现在读取文件,会读到空文件。更糟糕的是,当并发程序在读取/写入时发生异常并且中止,我们不仅会丢失旧版本文件内容,也会丢失新版本文件内容。

一致性

关于原子性的大多数方法也可以应用于一致性。 事实上,原子更新是内部一致性的先决条件。外部一致性意味着同步更新多个文件或应用程序。

要保证一致性,可以通过锁定文件来确保读写访问不会干扰。 考虑一个目录,其中所有的文件都需要相互一致。 常见的模式是指定一个文件锁,使用该锁来控制整个目录的访问。

写操作示例代码如下:

1
2
3
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
fcntl.flock(lockfile, fcntl.LOCK_EX)
model.update(dirname)

读操作示例代码如下:

1
2
3
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
fcntl.flock(lockfile, fcntl.LOCK_SH)
model.readall(dirname)

需要注意,此方法只有在我们可以控制所有读者时才有效。 由于同一时间只允许一个进程进行写操作(独占锁阻止所有共享锁),因此该方法的可伸缩性受到限制。

更进一步,我们可以将模式2应用于整个目录。 这需要到为每次版本更新创建一个新目录,并在更新完成后更改符号链接的指向。 例如,一个镜像应用程序维护了一个目录,目录中有一些压缩包和一个索引文件。其中,索引文件列出文件名,文件大小和校验和。当上游镜像更新时,仅为每个压缩包和索引文件单独实现原子文件更新是不够的。 相反,我们需要同时翻转压缩包和索引文件以避免校验和不匹配。 为了解决这个问题,我们为每次更新维护一个子目录,并为使用符号链接指向当前版本:

1
2
3
4
5
6
7
8
9
10
11
mirror
|-- 483
| |-- a.tgz
| |-- b.tgz
| `-- index.json
|-- 484
| |-- a.tgz
| |-- b.tgz
| |-- c.tgz
| `-- index.json
`-- current -> 483

如上所示,新版本484正在更新中。当所有的压缩包都同步完成并且索引文件是最新状态时,我们可以用一个原子调用os.symlink()来修改符号current的指向。其他应用程序始终可以看到完整的旧版和新版内容。很重要的一点是,使用者需要调用os.chdir()进入到current目录,而不用通过路径访问压缩文件或索引文件。因为这里有一个隐藏的竞争条件:如果使用者先通过current/index.json访问索引文件,同时current链接目标发生改变,使用者再次打开current/a.tgz会访问到不同版本下的内容。

隔离性

隔离意味着对同一文件的并发更新是可序列化的,即对同一文件的并发更新需要一个序列调度,该调度会返回与实际并行调度执行相同的结果。 “真正的”数据库系统使用MVCC等高级技术来保持可串行化,同时允许很大程度的并行性。 回到文件更新上来,我们可以使用锁来序列化文件更新。

在模式1中通过加锁实现隔离是很容易实现的,仅需要在所有文件操作之前获取独占锁即可。下面的示例代码从文件中读一个整数,再对整数加1,写回文件。

1
2
3
4
5
6
7
8
def update():
with open(filename, 'r+') as f:
fcntl.flock(f, fcntl.LOCK_EX)
n = int(f.read())
n += 1
f.seek(0)
f.truncate()
f.write('{}\n'.format(n))

在模式2中加锁一个小陷阱:如果使用模式1相同的方法会导致文件更新冲突。一个简单的实现如下所示:

1
2
3
4
5
6
7
8
9
10
def update():
with open(filename) as f:
fcntl.flock(f, fcntl.LOCK_EX)
n = int(f.read())
n += 1
with tempfile.NamedTemporaryFile(
'w', dir=os.path.dirname(filename), delete=False) as tf:
tf.write('{}\n'.format(n))
tempname = tf.name
os.rename(tempname, filename)

想象一下两个进程同时更新一个文件。进程1首先打开文件获得了锁,进程2打开了文件阻塞在fcntl.flock()调用。当进程1替换完文件并释放了锁之后,进程2打开的文件描述符指向一个旧内容的文件。为了避免这种冲突,我们必须在获取锁后检查打开的文件是否被修改了。改进代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class LockedOpen(object):

def __init__(self, filename, *args, **kwargs):
self.filename = filename
self.open_args = args
self.open_kwargs = kwargs
self.fileobj = None

def __enter__(self):
f = open(self.filename, *self.open_args, **self.open_kwargs)
while True:
fcntl.flock(f, fcntl.LOCK_EX)
fnew = open(self.filename, *self.open_args, **self.open_kwargs)
if os.path.sameopenfile(f.fileno(), fnew.fileno()):
fnew.close()
break
else:
f.close()
f = fnew
self.fileobj = f
return f

def __exit__(self, _exc_type, _exc_value, _traceback):
self.fileobj.close()
def update(self):
with LockedOpen(filename, 'r+') as f:
n = int(f.read())
n += 1
with tempfile.NamedTemporaryFile(
'w', dir=os.path.dirname(filename), delete=False) as tf:
tf.write('{}\n'.format(n))
tempname = tf.name
os.rename(tempname, filename)

模式3的使用方法和模式1相同。需要注意的是,在长时间运行的进程中,需要在更新的间隔中释放锁让其他进程使用该文件。

模式4的方式更为优雅。模式4满足隔离性不需要使用任何锁,它依赖于一个设计良好的命名规范和一个具有鲁棒性的唯一文件名生成器。maildir是模式4的良好设计模板。该方法可以被移植到其他场景中。

持久性

持久性相较于其他三个特性有点特殊,因为它不仅取决于应用程序,还取决于操作系统和硬件配置。 理论上,我们可以假设os.fsync()os.fdatasync()调用在数据保存在磁盘或其他非易失存储之前不会返回。在实践中,我们可能遇到类似存在缺陷的fsync实现或磁盘控制器配置,导致底层不能提供任何持久性保证。但是这些不在本文的讨论范围内,我们假设底层存储可以提供正确的持久存储。

因为,在应用层下面(操作系统、硬件)往往会有其他写缓存。在模式1中,我们不得不在关闭文件描述符之前调用fsync。例如,glibc 中的缓存buffer会持有写数据知道传递进内核。为了将glibc 中的缓存内容也刷入磁盘,我们需要在 fsync 调用flush()

1
2
3
4
with open(filename, 'w') as f:
model.write(f)
f.flush()
os.fdatasync(f)

或者在执行 python 程序是使用-u强制使用无缓存的写操作。

在大多数情况下,我喜欢使用os.fdatasync()而非os.fsync()。因为前者可以避免元数据(属主、大小、修改时间等)的同步更新。元数据更新会导致磁盘 IO 寻道时间延长,从而引起 IO 缓慢。

在模式2中,上面的示例代码只做了一半的事情。按照上面的示例,我们可以保证新写入的文件被存入非易失性存储,但是我们无法保证替换操作时目录数据的更新。目录更新可以讨论很久,但是在我们的示例中(新旧文件在同一个目录中)解决方法比较简单:

1
2
3
4
os.rename(tempname, filename)
dirfd = os.open(os.path.dirname(filename), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)

我们使用底层调用os.open()打开一个目录,同时调用os.fsync()同步目录数据。需要注意,Python 内建的open()函数不支持打开目录。

模式3和模式1使用方法一致。

模式4与模式2都存在目录数据更新的问题,我们可以使用同样的方法解决:先同步文件数据,再同步目录数据。

结论

通过满足所有ACID原则,可靠的文件更新是可以实现的。在有些场景下不需要满足ACID所有原则,有时候只需要一个或几个,可以根据具体问题选择最符合需求的编程技巧。