func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) |
初次启动raftNode时调用WAL.Create方法。创建WAL对象用于记录追加 :判断是否存在dirpath路径,如果已存在则不是初次启动raftNode,返回os.ErrExist。创建临时目录和初始上锁的wal文件—walName(seq=0 & index=0),seek到文件末尾(why?),预分配该wal文件大小(SegmentSizeBytes=64MB,优化追加速度),创建WAL对象并设定路径、 metadata(NodeID和ClusterID)、编码器,将上锁的WAL文件追加到锁表内,然后依次写入crc、metadata和空snapshot,重命名临时目录,同步临时目录的父目录(fsync)使得重命名持久化。 |
func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) |
移除w.dir目录及目录下所有文件和文件夹,调用os.Rename(tmpdirpath, w.dir)将Create方法内创建的临时目录重命名,创建FilePipeline和dirFile *os.File,dirFile is a fd for the wal directory for syncing on Rename |
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error |
检验snapshot是否合法(Since etcd>=3.5.0),pb序列化snapshot得到data字段,加锁,调用w.encoder.encode方法写入record,更新w.enti如果snapshot index > 原w.enti,解锁。 |
func (w *WAL) saveCrc(prevCrc uint32) error |
写入crcType的记录 |
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error |
加锁,如果hardstate和entries为空,返回。判断是否需要sync(entries长度不为0||vote改变||term改变),写入entries和hardstate,判断文件当前位置是否小于SegmentSizeBytes(默认64M),如果小于判断是否需要sync数据,如果不小于,返回cut操作结果,解锁。 |
func (w *WAL) saveEntry(e *raftpb.Entry) error |
写入一条entry记录并更新WAL对象的enti值 |
func (w *WAL) saveState(s *raftpb.HardState) error |
判断是否为空,不为空写入HardState并更新WAL对象的state值 |
func (w *WAL) cut() error |
关闭当前文件并创建一个新的文件用于追加记录:首先移动到锁表最后一个wal文件的当前位置截断文件然后执行sync,调用FilePipeline对象的Open方法创建一个新文件并加入锁表,首先保存旧的encoder的crc,然后创建新的encoder对象替换旧的encoder对象,保存头信息crc、metadata和hardstate,原子重命名文件之前先执行sync和保存当前位置偏移,重命名后对WAL对象的dirFile执行fsync持久化wal目录的变化。关闭文件重新以LockFile方式打开文件并seek到文件末尾,替换锁表尾文件,再次进行新旧encoder替换。 |
func (w *WAL) tail() *fileutil.LockedFile |
WAL对象的锁表如果不为空,返回最后一个上锁文件,否则返回空。 |
func (w *WAL) sync() error |
如果encoder存在,则将encoder pageWriter缓冲区的数据写入,锁表尾文件执行fdatasync,将fdatasync延时上报监控prometheus。 |
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) |
寻找所有wal文件中snapshot条目,有效的snapshot条目index必须小于等于最新的hardstate。步骤:找到目录下所有带有合法名称的wal文件名,以只读模式打开这些wal文件,根据以读模式打开的这些wal文件创建decoder,循环解码每个文件的record:若为snapshotType,追加到snaps中;若为stateType,更新hardstate;若为crcType(wal文件开头),验证是否和decoder.crc相同(上一个文件末尾的crc)。返回所有index小于最新hardstate.Commit的walpb.snap条目。 |
func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) |
从指定目录读取所有wal文件name,并检查name合法性(.wal结尾) |
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) |
根据write标志选择以读模式还是写模式打开文件,步骤:从nameIndex指定的索引开始打开文件。若写模式:打开上锁的wal文件,将该文件添加到锁表、文件关闭表、读文件表;若读模式:以os.O_RDONLY打开文件,将该文件添加到文件关闭表、读文件表,添加nil到锁表(锁表只在写模式下用到)。 |
func Open(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) |
写模式调用openAtIndex。Open opens the WAL at the given snap,The returned WAL is ready to read and the first record will be the one after the given snap. The WAL cannot be appended to before reading out all of its previous records. |
func OpenForRead(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) |
读模式调用openAtIndex。 |
func openAtIndex(lg zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (WAL, error) |
遍历wal目录,找到snap所在位置index,依次打开index后续所有的wal文件并加锁(调用openWALFiles),创建WAL对象并设定解码器、readClose(调用closeAll关闭已打开文件)和锁表,若写模式:readClose置空(写模式下还要继续对wal文件进行append操作,等到读完后不用进行关闭操作),测试锁表最后一个上锁文件是否是合法wal文件(通过是否符合命名规范判断),如果不合法,关闭所有文件返回错误,否则,设定FilePipeline对象(大小超过64M时用于截断并切换到新文件),返回WAL对象 。 |
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) |
读取WAL对象的所有记录 ,对于不同类型记录做不同处理: 判断锁表尾文件如为空(读模式):如果不是读到EOF或ErrUnexpectedEOF则重置state返回;对于写模式,如果err不是EOF,重置状态返回,然后锁表尾文件Seek到lastOffset位置, 将后续内容清零(目的是处理遇到0记录后接非0记录时,非0记录又没有被全部重写,再次打开的时候会出现 CRC错误,由于数据从不会一开始就完全同步到磁盘,因此进行清零操作是安全的 ?暂时没懂),然后判断snapshot是否匹配,关闭decoder实现禁读,重置WAL对象的start为一个空snapshot对象,创建encoder并将decoder设空,返回metadata,state,ents和err。 |