// diskQueue implements a filesystem backed FIFO queuetypediskQueuestruct{// 64bit atomic vars need to be first for proper alignment on 32bit platforms// run-time state (also persisted to disk)readPosint64// 读的位置writePosint64// 写的位置readFileNumint64// 读文件的编号writeFileNumint64// 写文件的编号depthint64// 读写文件的距离 (用于标识队列的长度)sync.RWMutex// instantiation time metadatanamestring// 标识队列名称,用于落地文件名的前缀 dataPathstring// 落地文件的路径maxBytesPerFileint64// 每个文件最大字节数minMsgSizeint32// 单条消息的最小大小maxMsgSizeint32// 单挑消息的最大大小syncEveryint64// 每写多少次刷盘一次syncTimeouttime.Duration// 至少多久会刷盘一次exitFlagint32// 退出标识needSyncbool// 如果 needSync 为true, 则需要fsync刷新metadata 数据// keeps track of the position where we have read// (but not yet sent over readChan)nextReadPosint64// 下一次读的位置nextReadFileNumint64// 下一次读的文件numberreadFile*os.File// 读 fdwriteFile*os.File// 写 fdreader*bufio.Reader// 读 bufferwriteBufbytes.Buffer// 写 buffer// exposed via ReadChan()readChanchan[]byte// 读channel// internal channelswriteChanchan[]byte// 写 channelwriteResponseChanchanerror// 同步写完之后的 responseemptyChanchanint// 清空文件的channelemptyResponseChanchanerror// 同步清空文件后的channelexitChanchanint// 退出channelexitSyncChanchanint// 退出命令同步等待channellogfAppLogFunc// 日志句柄}
funcNew(namestring,dataPathstring,maxBytesPerFileint64,minMsgSizeint32,maxMsgSizeint32,syncEveryint64,syncTimeouttime.Duration,logfAppLogFunc)Interface{d:=diskQueue{name:name,dataPath:dataPath,maxBytesPerFile:maxBytesPerFile,minMsgSize:minMsgSize,maxMsgSize:maxMsgSize,readChan:make(chan[]byte),writeChan:make(chan[]byte),writeResponseChan:make(chanerror),emptyChan:make(chanint),emptyResponseChan:make(chanerror),exitChan:make(chanint),exitSyncChan:make(chanint),syncEvery:syncEvery,syncTimeout:syncTimeout,logf:logf,}// no need to lock here, nothing else could possibly be touching this instanceerr:=d.retrieveMetaData()iferr!=nil&&!os.IsNotExist(err){d.logf(ERROR,"DISKQUEUE(%s) failed to retrieveMetaData - %s",d.name,err)}god.ioLoop()return&d}
可以看出, 队列中均使用不带cache 的chan,消息只能阻塞处理。
d.retrieveMetaData() 是从文件中恢复元数据。
d.ioLoop() 是队列的事件处理逻辑,后文详细解答
消息的读写
文件格式
文件名 "name" + .diskqueue.%06d.dat 其中, name 是 topic, 或者topic + channel 命名.
数据采用二进制方式存储, 消息大小+ body 的形式存储。
func(d*diskQueue)ioLoop(){vardataRead[]bytevarerrerrorvarcountint64varrchan[]byte// 定时器的设置syncTicker:=time.NewTicker(d.syncTimeout)for{// 若到达刷盘频次,标记等待刷盘ifcount==d.syncEvery{d.needSync=true}ifd.needSync{err=d.sync()iferr!=nil{d.logf(ERROR,"DISKQUEUE(%s) failed to sync - %s",d.name,err)}count=0}// 有可读数据,并且当前读chan的数据已经被读走,则读取下一条数据if(d.readFileNum<d.writeFileNum)||(d.readPos<d.writePos){ifd.nextReadPos==d.readPos{dataRead,err=d.readOne()iferr!=nil{d.logf(ERROR,"DISKQUEUE(%s) reading at %d of %s - %s",d.name,d.readPos,d.fileName(d.readFileNum),err)d.handleReadError()continue}}r=d.readChan}else{// 如果无可读数据,那么设置 r 为nil, 防止将dataRead数据重复传入readChan中r=nil}select{// the Go channel spec dictates that nil channel operations (read or write)// in a select are skipped, we set r to d.readChan only when there is data to readcaser<-dataRead:count++// moveForward sets needSync flag if a file is removed// 如果读chan 被写入成功,则会修改读的偏移d.moveForward()case<-d.emptyChan:// 清空所有文件,并返回empty的结果d.emptyResponseChan<-d.deleteAllFiles()count=0casedataWrite:=<-d.writeChan:// 写msgcount++d.writeResponseChan<-d.writeOne(dataWrite)case<-syncTicker.C:// 到刷盘时间,则修改needSync = trueifcount==0{// avoid sync when there's no activitycontinue}d.needSync=truecase<-d.exitChan:gotoexit}}exit:d.logf(INFO,"DISKQUEUE(%s): closing ... ioLoop",d.name)syncTicker.Stop()d.exitSyncChan<-1}
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
现象 nsq 在定义struct 的时候,很多会出现类似的注释
原因 原因在golang 源码 sync/atomic/doc.go 中
1
2
3
4
5
// On ARM, x86-32, and 32-bit MIPS,// it is the caller's responsibility to arrange for 64-bit// alignment of 64-bit words accessed atomically. The first word in a// variable or in an allocated struct, array, or slice can be relied upon to be// 64-bit aligned.