Kestrel中的Journal.scala类详解

本文是Scala代码实例之Kestrel的第六部分,讲述PersistentQueue中的Journal.scala类。

我们提供的服务有:成都网站设计、做网站、微信公众号开发、网站优化、网站认证、昌邑ssl等。为千余家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的昌邑网站制作公司

在PersistentQueue之下,有一个Journal.scala的类,支撑了消息队列的存储问题。这是Kestrel提供的另外一个特性:通过文件系统保存消息队列,避免服务重启的时候,Kestrel的队列丢失。

通过前面一段时间的阅读,我们对Scala的语法已经有一个基本的把握,所以在阅读Journal的时候,我们就更注重实现的方式,而不是语法细节了。当然Journal也没有太多的语法细节需要讲的了。当然出了还没有详细说过的case class和case object。

在Journal.scala的开始部分就定义了一个abstract class类JournalItem,并且定义了它的许多子类,这些子类是用来和PersistentQueue进行消息传递的。case class/case object是一种特殊的class/object,其功能是在对象里面增加了几个功能

1. 把所有的建构函数的var变成val,也就是变成了不可变的常量

2.自动实现了equal, hashCode和toString三个方法

3.当对象出现在case之后的时候,会自动apply出一个对象,对象的值和创建的时候一样,这个功能保证了可以和match…case语法可以写得很简练。

关于Case class的具体说明可以参考:CaseClasses和MatchingOnCaseClasses。关于第三条特性,还可以参考CompanionObjects。

简单的理解,我们就把case object/case class当作消息传递中需要使用的对象类型就可以了。

Journal使用了noi的FileChannel,来处理文件的读取和存储。核心的算法,可以只看readJournalEntry和replay两个方法。readJournalEntry的功能是从文件中读取数据,并且根据格式组成各种case class/case object,并且同时返回字节数。而在上层的方法,比如replay,则根据得到的不同数据类型,调用更上层的函数f(case class/case object)。

我们回到PersistentQueue中看replayJournal的时候,发现它将调用replay后得到的一系列的case class转义成为在PersistentQueue中需要执行的各种命令——所以这个方法的名字叫做replay!就是回放的意思。

当系统重启的时候,打开每个queue之前都需要一段回放的时间,把文件系统中记录的当时的整个存取过程重新回放一次,通过回放来重建内存中的队列。回过来再看Journal.scala的时候,我们就更清晰的知道,文件存储的不是当时的队列状态,而是每一次系统执行的轨迹。所以,Journal对整个Kestrel消息队列的开销才会很小。

但是另外一个问题随之而来,如果记录所有的操作过程,那么这个文件不是只会增大,不会缩小么?为了解决这个问题,Journal.scala实现了一个叫做roll的机制。从PersisitentQueue中的add方法中,我们可以看到这样的代码:

 
 
 
  1. if (keepJournal() && !journal.inReadBehind) {  
  2.      if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) {  
  3.        // force re-creation of the journal.  
  4.        log.info("Rolling journal file for '%s' (qsize=%d)", name, queueSize)  
  5.        journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue)  
  6.      }  
  7.      if (queueSize >= maxMemorySize()) {  
  8.        log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)  
  9.        journal.startReadBehind  
  10.      }  
  11.    }  

如果发生了Journal文件的尺寸太大,但是实际的Queue尺寸也没有满的时候,就启动roll进程来重新建立一个Journal文件。处理的方法也很简单,就是把当前内存中的队列直接写入到Journal对应的文件中,变成一连串的add。这么做,是不是一个很好的做法?只有一种意外的情况,那就是现存在消息队列里面的数据很多,那么重建Journal的时间就需要很多。作者也考虑到了这个问题,所有要求queue实际的size必须小于Journal所能存储的量的时候,才会做roll的操作,也就是说,当队列里面有很多的事件没有处理的时候,就算硬盘占用得再多,也不会启动roll方法。而解决的方法是,当内存中的Queue太大,大到超过了***的内存使用限制的时候,启动readBehind模式。

当readBehind模式启动之后,会对文件增加一个read句柄,每次从内存里面remove掉消息的时候,就会尝试从文件中读取消息放到内存里面。在这样的模式下,内存就一致保持着最满的队列。更多的消息就先直接存储到文件中,直到文件中的read指针和write指针重合,也就是说所有在文件系统中的消息都已经被处理完毕了,系统就会重新切换回正常的模式。

在这种模式下,只要硬盘的数量足够大,我们基本上可以把这个消息队列理解为无限长……但是在readbehind模式下,是不会进行roll的操作的。所以——大家需要注意的是,在配置中,maxJournalSize必须要小于maxMemorySize,否则这两个机制就会打架了。而maxJournalSize这个数值也不应该很大,这样就能保证每次roll的效率会很快(因为roll的效率是取决于事件占用内存的数量,也就是maxJournalSize的),超过这个值,系统就不会roll。

决定是否roll。还有一个数值就是maxJournalOverflow,这是一个很好的设计,相当于Journal文件的利用率,比如说Overflow设置为5,表示现在有效的消息队列数据,只有整个 Journal 文件大小的 1/5。

假设我们平均每个消息的数据占有1K,那么其他的指令信息基本可以被忽略(因为都只有几个字节而已),所以Overflow的比例相当于总消息数量 / 还没有处理过的消息数量。所以这个数值的上限取决于replay的效率。也就是读取文件的速度,比如说,我们觉得启动的时候,读取100M的文件,大概需要10s,是我们可以接受的,而每个消息字节平均是1K,roll一次100个消息需要100ms,是可以接受的。那么 maxJournalOverflow = 100M / 100 * 1K = 1000。不过实际情况可能是roll的次数会更多一些,因为当内存中的消息队列只有10个的时候,硬盘超过10M,就会触发roll操作了。

但是设置1000的目的是,在最坏的情况可以接受,而不是在一半的情况下效率更高。这是需要注意的。

【相关阅读】

  1. 从Java走进Scala(Scala经典读物)
  2. A Scala Tutorial for Java programmers
  3. 专题:Scala编程语言
  4. 从Scala看canEqual与正确的的equals实现
  5. Scala快速入门:从下载安装到定义方法

当前题目:Kestrel中的Journal.scala类详解
分享路径:http://www.mswzjz.cn/qtweb/news7/141607.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能