Python如何仅用5000行代码,实现强大的logging模块?

Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:

  • logging 简介

  • logging API 设计

  • 记录器对象 Logger

  • 日志记录对象 LogRecord

  • 处理器对象 Hander

  • 格式器对象 Formatter

  • 滚动日志文件处理器

  • 小结

  • 小技巧

logging 简介

本次代码使用的是 python 3.8.5 的版本,官方中文文档 3.8.8 。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。

功能
logging-module logging的API
Logger 日志记录器对象类,可以创建一个对象用来记录日志
LogRecord 日志记录对象,每条日志记录都封装成一个日志记录对象
Hander 处理器对象,负责日志输出到流/文件的控制
Formatter 格式器,负责日志记录的格式化
RotatingFileHandler 按大小滚动的日志文件记录器
TimedRotatingFileHandler 按时间滚动的日志文件处理器

我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。

logging API 设计

先看看日志使用:

 
 
 
 
  1. import logging
  2. logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s')
  3. lang = {"name": "python", "age":20}
  4. logging.info('This is a info message %s', lang)
  5. logging.debug('This is a debug message')
  6. logging.warning('This is a warning message')
  7. logger = logging.getLogger(__name__)
  8. logger.warning('This is a warning')

输出内容如下:

 
 
 
 
  1. INFO     root       2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20}
  2. WARNING  root       2021-03-04 00:03:53,473 This is a warning message
  3. WARNING  __main__   2021-03-04 00:03:53,473 This is a warning

可以看到 logging 的使用非常方便,模块直接提供了一组API。

 
 
 
 
  1. root = RootLogger(WARNING)  # 默认提供的logger
  2. Logger.root = root
  3. Logger.manager = Manager(Logger.root)
  4. def debug(msg, *args, **kwargs): # info,warning等api类似
  5.     if len(root.handlers) == 0:
  6.         basicConfig()  # 默认配置
  7.     root.debug(msg, *args, **kwargs)
  8. def getLogger(name=None):
  9.     if name:
  10.         return Logger.manager.getLogger(name)  # 创建特定的logger
  11.     else:
  12.         return root  # 返回默认的logger

这种API的提供方式,我们在 requests 中也有看到。api中很重要的设置config的方式:

 
 
 
 
  1. def basicConfig(**kwargs):
  2.     ...
  3.     if handlers is None:
  4.         filename = kwargs.pop("filename", None)
  5.         mode = kwargs.pop("filemode", 'a')
  6.         if filename:
  7.             h = FileHandler(filename, mode)
  8.         else:
  9.             stream = kwargs.pop("stream", None)
  10.             h = StreamHandler(stream)  # 默认的handler
  11.         handlers = [h]
  12.     dfs = kwargs.pop("datefmt", None)
  13.     style = kwargs.pop("style", '%')
  14.     fs = kwargs.pop("format", _STYLES[style][1])
  15.     fmt = Formatter(fs, dfs, style)  # 生成formatter
  16.     for h in handlers:
  17.         if h.formatter is None:
  18.             h.setFormatter(fmt)
  19.         root.addHandler(h)  # 设置root的handler
  20.     level = kwargs.pop("level", None)
  21.     if level is not None:
  22.         root.setLevel(level)  # 设置日志级别

可以看到,日志的配置主要包括下面几项:

  • level 日志级别

  • format 信息格式化模版

  • filename 输出到文件

  • datefmt %Y-%m-%d %H:%M:%S,uuu 时间的格式模版
  • style [ % , { ,$] 格式样板

演示代码输出中,可以看到debug日志没有显示,是因为 debug < info :

 
 
 
 
  1. CRITICAL = 50
  2. FATAL = CRITICAL
  3. ERROR = 40
  4. WARNING = 30
  5. WARN = WARNING
  6. INFO = 20
  7. DEBUG = 10
  8. NOTSET = 0

记录器对象 Logger

查看Logger之前,先看logger对象的管理类Manager

 
 
 
 
  1. _loggerClass = Logger
  2. class Manager(object):
  3.     def __init__(self, rootnode):
  4.         self.root = rootnode
  5.         self.disable = 0
  6.         self.loggerDict = {}  # 所有日志记录对象的字典
  7.     ...
  8.     def getLogger(self, name):
  9.         rv = None
  10.         if name in self.loggerDict:
  11.             rv = self.loggerDict[name]  # 获取已经创建过的同名logger
  12.             ...
  13.         else:
  14.             rv = (self.loggerClass or _loggerClass)(name)  # 创建新的logger
  15.             rv.manager = self
  16.             self.loggerDict[name] = rv
  17.             ...
  18.         return rv

日志过滤器

 
 
 
 
  1. class Filterer(object):
  2.     def __init__(self):
  3.         self.filters = []
  4.     def addFilter(self, filter):
  5.         self.filters.append(filter)
  6.     def removeFilter(self, filter):
  7.         self.filters.remove(filter)
  8.     def filter(self, record):
  9.         rv = True
  10.         for f in self.filters:  # 过滤日志
  11.             if hasattr(f, 'filter'):
  12.                 result = f.filter(record)
  13.             else:
  14.                 result = f(record) # assume callable - will raise if not
  15.             if not result:
  16.                 rv = False
  17.                 break
  18.         return r

核心的 Logger 实际上只是一个控制中心:

 
 
 
 
  1. class Logger(Filterer):  # logger可以过滤日志
  2.     def __init__(self, name, level=NOTSET):
  3.         Filterer.__init__(self)
  4.         self.name = name
  5.         self.level = _checkLevel(level)
  6.         self.parent = None  # 日志可以有层级
  7.         self.propagate = True
  8.         self.handlers = []  # 可以输出到多个handler
  9.         self.disabled = False  # 可以关闭
  10.         self._cache = {}
  11.     
  12.     def debug(self, msg, *args, **kwargs):  # 输出debug日志
  13.         if self.isEnabledFor(DEBUG):
  14.             self._log(DEBUG, msg, args, **kwargs)

logger可以判断日志级别:

 
 
 
 
  1. def isEnabledFor(self, level):
  2.     if self.disabled:
  3.         return False
  4.     try:
  5.         return self._cache[level]
  6.     except KeyError:
  7.         try:
  8.             if self.manager.disable >= level:
  9.                 is_enabled = self._cache[level] = False
  10.             else:
  11.                 is_enabled = self._cache[level] = (
  12.                     level >= self.getEffectiveLevel()
  13.                 )
  14.         return is_enabled
  15. def getEffectiveLevel(self):
  16.     logger = self
  17.     while logger:
  18.         if logger.level:
  19.             return logger.level
  20.         logger = logger.parent
  21.     return NOTSET

日志输出:

 
 
 
 
  1. def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,
  2.          stacklevel=1):
  3.     ...
  4.     fn, lno, func = "(unknown file)", 0, "(unknown function)"
  5.     ...
  6.     # 生成日志记录
  7.     record = self.makeRecord(self.name, level, fn, lno, msg, args,
  8.                              exc_info, func, extra, sinfo)
  9.     # 使用handler处理日志
  10.     self.handle(record)

日志记录的生产,就是创建一个LogRecord对象:

 
 
 
 
  1. _logRecordFactory = LogRecord
  2. def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
  3.                func=None, extra=None, sinfo=None):
  4.     ...
  5.     rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
  6.                          sinfo)
  7.     ...
  8.     return rv

使用logger对象的所有handler处理日志:

 
 
 
 
  1. def handle(self, record):
  2.     c = self
  3.     found = 0
  4.     while c:
  5.         for hdlr in c.handlers:  # 使用所有的handler处理日志
  6.             found = found + 1
  7.             if record.levelno >= hdlr.level:
  8.                 hdlr.handle(record)

root-logger的handler是在config中配置的:

 
 
 
 
  1. def basicConfig(**kwargs):
  2.     ...
  3.     root.addHandler(h)  # 设置root的handler

日志记录对象 LogRecord

日志记录对象非常简单:

 
 
 
 
  1. class LogRecord(object):
  2.     def __init__(self, name, level, pathname, lineno,
  3.                  msg, args, exc_info, func=None, sinfo=None, **kwargs):
  4.         ct = time.time()
  5.         self.name = name  # logger名称
  6.         self.msg = msg  # 日志标识信息
  7.         ...
  8.         self.args = args  # 变量
  9.         self.levelname = getLevelName(level)
  10.         ...
  11.     
  12.     def getMessage(self):
  13.         msg = str(self.msg)
  14.         if self.args:
  15.             msg = msg % self.args  # 格式化消息
  16.         return msg

处理器对象 Hander

顶级Handler定义了Handler的模版方法

 
 
 
 
  1. class Handler(Filterer):  # 处理器也可以过滤日志
  2.     def __init__(self, level=NOTSET):
  3.         Filterer.__init__(self)
  4.         self._name = None
  5.         self.level = _checkLevel(level)  # handler也有日志级别
  6.         self.formatter = None
  7.         _addHandlerRef(self)
  8.         self.createLock()
  9.         
  10.     def handle(self, record):  # 处理日志
  11.         rv = self.filter(record)  # 过滤日志
  12.         if rv:
  13.             self.acquire()  # 申请锁
  14.             try:
  15.                 self.emit(record)  # 提交记录,由不同子类实现 
  16.             finally:
  17.                 self.release()  # 释放锁
  18.         return rv

默认的console流 StreamHandler

 
 
 
 
  1. class StreamHandler(Handler):
  2.     terminator = '\n'  # 自动换行
  3.     def __init__(self, stream=None):
  4.         Handler.__init__(self)
  5.         if stream is None:
  6.             stream = sys.stderr  # 默认使用stderr输出
  7.         self.stream = stream
  8.     
  9.     def emit(self, record):
  10.         try:
  11.             msg = self.format(record)  # 格式化日志记录
  12.             stream = self.stream
  13.             stream.write(msg + self.terminator)  # 写日志
  14.             self.flush()  # 刷新写缓存
  15.         except Exception:
  16.             ...
  17.     
  18.     def format(self, record):
  19.         if self.formatter:
  20.             fmt = self.formatter
  21.         else:
  22.             fmt = _defaultFormatter
  23.         return fmt.format(record)  # 使用格式化器格式化日志记录

为什么使用stderr,可以看下面的测试中的输出都是到console:

 
 
 
 
  1. print("haha")
  2. print("fatal error", file=sys.stderr)
  3. sys.stderr.write("fatal error\n")

格式器对象 Formatter

格式化器主要使用Formatter和Style实现

 
 
 
 
  1. class Formatter(object):
  2.     def __init__(self, fmt=None, datefmt=None, style='%', validate=True):
  3.         self._style = _STYLES[style][0](fmt)
  4.         self._fmt = self._style._fmt
  5.         self.datefmt = datefmt
  6.     
  7.     def format(self, record):
  8.         record.message = record.getMessage()
  9.         s = self.formatMessage(record)
  10.         return s
  11.         
  12.     def formatMessage(self, record):
  13.         return self._style.format(record)  # 格式化

Style类

 
 
 
 
  1. class PercentStyle(object):
  2.     default_format = '%(message)s'
  3.     asctime_format = '%(asctime)s'
  4.     asctime_search = '%(asctime)'
  5.     validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)
  6.     def __init__(self, fmt):
  7.         self._fmt = fmt or self.default_format
  8.     def usesTime(self):
  9.         return self._fmt.find(self.asctime_search) >= 0
  10.     def validate(self):
  11.         """Validate the input format, ensure it matches the correct style"""
  12.         if not self.validation_pattern.search(self._fmt):
  13.             raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))
  14.     def _format(self, record):
  15.         return self._fmt % record.__dict__  # 格式化日志记录对象
  16.     def format(self, record):
  17.         try:
  18.             return self._format(record)
  19.         except KeyError as e:
  20.             raise ValueError('Formatting field not found in record: %s' % e)

滚动日志文件处理器

线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:

 
 
 
 
  1. class BaseRotatingHandler(logging.FileHandler):
  2.     def emit(self, record):
  3.         try:
  4.             if self.shouldRollover(record): # 判断是否需要滚动
  5.                 self.doRollover()  # 滚动日志
  6.             logging.FileHandler.emit(self, record)  # 输出日志
  7.         except Exception:
  8.             self.handleError(record)
  9.     
  10.     def rotate(self, source, dest):
  11.         if not callable(self.rotator):
  12.             if os.path.exists(source):
  13.                 os.rename(source, dest)  # 重命名日志文件
  14.         else:
  15.             self.rotator(source, dest)

按大小滚动 RotatingFileHandler

按照文件大小滚动的处理器:

 
 
 
 
  1. class RotatingFileHandler(BaseRotatingHandler):
  2.     def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
  3.         if maxBytes > 0:
  4.             mode = 'a'
  5.         BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)
  6.         self.maxBytes = maxBytes  # 单个文件大小上限
  7.         self.backupCount = backupCount  # 日志备份数量
  8.         
  9.     def doRollover(self):  # 执行滚动
  10.         if self.stream:
  11.             self.stream.close()  # 关闭当前的流
  12.             self.stream = None
  13.         if self.backupCount > 0:
  14.             for i in range(self.backupCount - 1, 0, -1):
  15.                 sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
  16.                 dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
  17.                                                         i + 1))
  18.                 if os.path.exists(sfn):
  19.                     if os.path.exists(dfn):
  20.                         os.remove(dfn)
  21.                     os.rename(sfn, dfn)
  22.             dfn = self.rotation_filename(self.baseFilename + ".1")
  23.             if os.path.exists(dfn):
  24.                 os.remove(dfn)
  25.             self.rotate(self.baseFilename, dfn)  # 重命名文件
  26.         if not self.delay:
  27.             self.stream = self._open()  # 如果shouldRollover延迟,可以打开新的流
  28.     def shouldRollover(self, record):  # 判断是否需要滚动
  29.         if self.stream is None:  # 立即打开流
  30.             self.stream = self._open()
  31.         if self.maxBytes > 0:   
  32.             msg = "%s\n" % self.format(record)
  33.             self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature
  34.             if self.stream.tell() + len(msg) >= self.maxBytes:  # 判断大小
  35.                 return 1
  36.         return 0

文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。

按照日期滚动 TimedRotatingFileHandler

按照日期滚动的处理器:

 
 
 
 
  1. class TimedRotatingFileHandler(BaseRotatingHandler):
  2.     def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
  3.         BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
  4.         self.when = when.upper()
  5.         self.backupCount = backupCount
  6.         self.utc = utc
  7.         self.atTime = atTime
  8.         # 日期设置,支持多种方式
  9.         if self.when == 'S':
  10.             self.interval = 1 # one second
  11.             self.suffix = "%Y-%m-%d_%H-%M-%S"
  12.             self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
  13.         ...
  14.         self.extMatch = re.compile(self.extMatch, re.ASCII)
  15.         self.interval = self.interval * interval # multiply by units requested
  16.         filename = self.baseFilename
  17.         if os.path.exists(filename):
  18.             t = os.stat(filename)[ST_MTIME]  # 最后修改时间
  19.         else:
  20.             t = int(time.time())
  21.         self.rolloverAt = self.computeRollover(t)  # 提前计算终止时间
  22.     def computeRollover(self, currentTime):
  23.         # 判断的方法还是很长很复杂的,先pass
  24.     def shouldRollover(self, record):
  25.         t = int(time.time())
  26.         if t >= self.rolloverAt:  # 判断是否到期
  27.             return 1
  28.         return 0
  29.     def doRollover(self):
  30.         ...
  31.         dfn = self.rotation_filename(self.baseFilename + "." +
  32.                                      time.strftime(self.suffix, timeTuple))
  33.         #  滚动日志文件
  34.         if os.path.exists(dfn):
  35.             os.remove(dfn)
  36.         self.rotate(self.baseFilename, dfn)
  37.         if self.backupCount > 0:
  38.             for s in self.getFilesToDelete():
  39.                 os.remove(s)
  40.         ...
  41.         # 计算下一个时间点
  42.         newRolloverAt = self.computeRollover(currentTime)
  43.         ...
  44.         self.rolloverAt = newRolloverAt

日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。

小结

logging的处理逻辑大概是这样的:

  • 创建Logger对象,提供API,用来接收应用程序日志

  • Logger对象包括多个Handler

  • 每个Handler有一个Formatter对象

  • 每条日志都会生成一个LogRecord对象

  • 使用不同的Handler对象将LogRecored对象提交到不同的流

  • 每个日志对象通过Formatter格式化输出

  • 可以使用按日期/文件大小的方式进行日志文件的滚动记录

小技巧

覆盖对象的 __reduce__ 方法,让对象支持 reduce 函数:

 
 
 
 
  1. class RootLogger(Logger):
  2.     def __init__(self, level):
  3.         Logger.__init__(self, "root", level)
  4.     def __reduce__(self):
  5.         return getLogger, ()

线程锁的创建和释放:

 
 
 
 
  1. _lock = threading.RLock()
  2. def _acquireLock():
  3.     if _lock:
  4.         _lock.acquire()
  5. def _releaseLock():
  6.     if _lock:
  7.         _lock.release()

线程锁的使用:

 
 
 
 
  1. def addHandler(self, hdlr):
  2.     _acquireLock()
  3.     try:
  4.         self.handlers.append(hdlr)
  5.     finally:
  6.         _releaseLock()
  7. def removeHandler(self, hdlr):
  8.     _acquireLock()
  9.     try:
  10.         self.handlers.remove(hdlr)
  11.     finally:
  12.         _releaseLock()

参考链接

  • Logging in Python https://realpython.com/python-logging/

  • 日志操作手册 https://docs.python.org/zh-cn/3.8/howto/logging-cookbook.html#cookbook-rotator-namer

  • Python 的日志记录工具 https://docs.python.org/zh-cn/3.8/library/logging.html

标题名称:Python如何仅用5000行代码,实现强大的logging模块?
URL链接:http://www.mswzjz.cn/qtweb/news14/428914.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能