mtail 学习

mtail 介绍

extract internal monitoring data from application logs for collection in a timeseries database

mtail :从应用程序日志中提取指标以导出到时间序列数据库

它是一个google开发的日志提取工具,用途就是:

  1. 实时读取应用程序的日志、
  2. 再通过自己编写的脚本进行分析、
  3. 最终生成时间序列指标

mtail使用流式读取日志,通过正则表达式匹配的方式从日志中提取metrics指标,

这种方式可以利用目标机器的算力,不过如果量太大,可能会影响目标机器上的业务程序

运行 mtail

-progs string
    Name of the directory containing mtail programs

通过 --progs 参数指定一个目录,这个目录里放置一堆的*.mtail文件,

-logs value
    List of log files to monitor, separated by commas.  This flag may be specified multiple times.

每个mtail文件就是描述的正则提取规则,通过 --logs 参数来指定要监控的日志目录,
可以写通配符,--logs 可以写多次

-one_shot
    Compile the programs, then read the contents of the provided logs from start until EOF, print the values of the metrics store in the given format and exit. This is a debugging flag only, not for production use.
-one_shot_format string
    Format to use with -one_shot. This is a debugging flag only, not for production use. Supported formats: json, prometheus. (default "json")

-one_shot 只显示打印出来一次指标数据,测试的时候非常有用。-one_shot_format 用来指定打印出来的数据的格式,默认是json 格式。 可以指定为prometheus格式的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

$ /tmp/___mtail -progs ./conf/input.mtail -logs /var/log/kern.log -one_shot -one_shot_format prometheus -logtostderr

I1108 11:01:06.610898 20788 main.go:119] mtail version invalid:-use-make-to-build git revision invalid:-use-make-to-build go version go1.18.6 go arch amd64 go os linux
I1108 11:01:06.610923 20788 main.go:120] Commandline: ["/tmp/___mtail" "-progs" "/home/mamh/work/github/flashcatcloud-categraf/conf/input.mtail" "-logs" "/var/log/kern.log.1" "-one_shot" "-one_shot_format" "prometheus" "-logtostderr"]
I1108 11:01:06.611064 20788 store.go:189] Starting metric store expiry loop every 1h0m0s
I1108 11:01:06.611209 20788 runtime.go:84] unmarking mtail.toml
I1108 11:01:06.611229 20788 logstream.go:61] Parsed url as /var/log/kern.log.1
I1108 11:01:06.611295 20788 filestream.go:278] signalling stop at next EOF
I1108 11:01:06.611312 20788 tail.go:287] Tailing /var/log/kern.log.1
I1108 11:01:06.611388 20788 tail.go:343] No polling loop in oneshot mode.
I1108 11:01:06.611388 20788 tail.go:315] No gc loop in oneshot mode.
I1108 11:01:06.611414 20788 mtail.go:132] Listening on [::]:3903
I1108 11:01:06.621508 20788 runtime.go:288] END OF LINE



去掉 -one_shot 执行 就会阻塞到那里了,然后就可以浏览器 访问 3903 端口 这个 地址啦。

mtail 命令 源码分析

前提背景:

0. 为什么要看这个mtail源码呢? 最开始是 categraf 监控里面加入了这个 mtail插件,然后
我试着执行了一下 但是没有执行出来想要的结果,也就是 用 mtail 文件 加上一个 日志文件 去执行没有出来
想要的结果。 但是呢 我直接同样的文件 用 mtail 命令 就能得出想要的结果了。 所以想分析分析 mtail 里面
的源码都是一步一步怎么执行的。

通过源码分析 我们想探讨的几个问题:

1. 在哪里 读取 日志文件的???  这个答案已经出来了。 参考 下面  Runtime  New 方法 相关的 分析。
2. 在哪里 使用 mtail 文件的 ??? 这个知道 是在  Runtime  New 方法 里面去加载了, 同时 这个里面也
                                 加载 日志文件内容。那么 这2个结合去处理 估计也是在这个 Runtime 里面了

mtail.New 初始化

首先 是 根据 命令行的一些参数 去 初始 mtail 了。

在 main.go 中 有 mtail.New() 方法调用

1
mtail.New(ctx, store, opts...)

需要用到 3个 参数

ctx 是 context.WithCancel(context.Background()) 
store 是 metrics.NewStore() 生成出来的
opts 就是对应命令行上的 选项了,把命令行选项 转换了一下,mtail.Option切片了。

主要就是 初始化 type Server struct 这个

1
2
3
4
5
6
7
8
m := &Server{
ctx: ctx,
store: store,

lines: make(chan *logline.LogLine),

reg: prometheus.NewRegistry(),
}

在 New() 方法后续的 还调用了 其他的 一些个 初始化 方法

if err := m.initExporter(); err != nil {

if err := m.initRuntime(); err != nil {

if err := m.initTailer(); err != nil {

if err := m.initHTTPServer(); err != nil {

以上 四个 init 方法 里面 又各自 初始化了 其他类型的 结构体了。 主要有下面几个。

type Exporter struct

type Runtime struct // 这里面有 compiler.New() 初始化

type Tailer struct

type Server struct   net http 包下面的, 直接提供 http 服务的。

当然 这些 结构体 在 调用 New() 方法的里面 也会有 另外的 结构体的 初始化。

Runtime New 方法。

1
2
3
4
5
// initRuntime constructs a new runtime and performs the initial load of program files in the program directory.
func (m *Server) initRuntime() (err error) {
m.r, err = runtime.New(m.lines, &m.wg, m.programPath, m.store, m.rOpts...)
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func New(lines <-chan *logline.LogLine, wg *sync.WaitGroup, programPath string, store *metrics.Store, options ...Option) (*Runtime, error) {
if store == nil {
return nil, errors.New("loader needs a store")
}

r := &Runtime{
ms: store,
programPath: programPath,
handles: make(map[string]*vmHandle),
programErrors: make(map[string]error),
signalQuit: make(chan struct{}),
}

initDone := make(chan struct{})
defer close(initDone)

var err error
if err = r.SetOption(options...); err != nil {
return nil, err
}

if r.c, err = compiler.New(r.cOpts...); err != nil {
return nil, err
}

// Defer shutdown handling to avoid a race on r.wg.
wg.Add(1)
defer func() {
go func() {
defer wg.Done()
<-initDone
r.wg.Wait() // 下面启动了2个 goroutine, 这里等待 就是 r.wg 的????
}()
}()

// This goroutine is the main consumer/producer loop.
r.wg.Add(1)
go func() {
defer r.wg.Done() // signal to owner we're done
<-initDone
for line := range lines {
LineCount.Add(1)
r.handleMu.RLock()
for prog := range r.handles {
r.handles[prog].lines <- line
}
r.handleMu.RUnlock()
}
glog.Info("END OF LINE")
close(r.signalQuit)
r.handleMu.Lock()
for prog := range r.handles {
close(r.handles[prog].lines)
delete(r.handles, prog)
}
r.handleMu.Unlock()
}()

if r.programPath == "" {
glog.Info("No program path specified, no programs will be loaded.")
return r, nil
}

// Create one goroutine that handles reload signals.
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-initDone
if r.programPath == "" {
glog.Info("no program reload on SIGHUP without programPath")
return
}
n := make(chan os.Signal, 1)
signal.Notify(n, syscall.SIGHUP)
defer signal.Stop(n)
for {
select {
case <-r.signalQuit:
return
case <-n:
if err := r.LoadAllPrograms(); err != nil {
glog.Info(err)
}
}
}
}()

// Guarantee all existing programmes get loaded before we leave.
if err := r.LoadAllPrograms(); err != nil {
return nil, err
}

return r, nil
}



这个New 主要就是用来初始化 &Runtime 这个的。

里面用到了 一个小技巧。

initDone := make(chan struct{})  新建一个 通道
defer close(initDone)  这个会在 New 方法结束时候 去关闭这个通道。

我们先来 研究 handles 怎么初始化的?,首选一开始 就新建了个空的 map,这个时候 map 里面 还没有 去
初始化 vmHandle 对象的。

在 New 方法里面 有 3个地方比较重要, 里面启动了 几个 routine。

这第一个重要点

启动了一个 routine, 里面for 循环去遍历 这个 lines,这是一个 通道,就是 日志文件的每一行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// This goroutine is the main consumer/producer loop.
r.wg.Add(1)
go func() {
defer r.wg.Done() // signal to owner we're done
<-initDone
for line := range lines { // for 循环 一个 通道, 会阻塞到这里,一直到通道关闭。通道关闭 是在 tail.go 里面的 New() 方法的 最后有个 close(t.lines)
LineCount.Add(1)
r.handleMu.RLock()
for prog := range r.handles {
r.handles[prog].lines <- line
}
r.handleMu.RUnlock()
}
glog.Info("END OF LINE")
close(r.signalQuit) // 用户 发送 停止 信号 就会 退出上面的 for line := range lines 循环,这里就会 关闭 signalQuit 通道,然后 就会 到 下面 select case <-r.signalQuit: 这里 执行了。。。。。
r.handleMu.Lock()
for prog := range r.handles {
close(r.handles[prog].lines)
delete(r.handles, prog)
}
r.handleMu.Unlock()
}()

这第 2 个 重要点 也是 启动了一个 routine,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Create one goroutine that handles reload signals.
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-initDone
if r.programPath == "" {
glog.Info("no program reload on SIGHUP without programPath")
return
}
n := make(chan os.Signal, 1)
signal.Notify(n, syscall.SIGHUP) // 注册个 sighup 信号的出来,
defer signal.Stop(n)
for {
select {
case <-r.signalQuit: // 这个就是 退出的时候执行到这里
return
case <-n: // 这里 等待 接收 用户 发送 的 sighup 信号, 这个 感觉 发送这个型号 就会 重新 执行 LoadAllPrograms() 方法,感觉像是 重新 加载 配置文件的作用????
if err := r.LoadAllPrograms(); err != nil {
glog.Info(err)
}
}
}
}()

第三个重要点

LoadAllPrograms 加载 .mtail 文件的吧

1
2
3
4
5
6
7
// Guarantee all existing programmes get loaded before we leave.
if err := r.LoadAllPrograms(); err != nil {
return nil, err
}



从表明上 看 先 执行 第一个地方, 然后 第二个 地方,最后是 第三个这个地方。

如果真的是 这样 handles 怎么初始化的? 这个 就会有问题,没有初始化 怎么 能 直接 这么使用呢? r.handles[prog].lines <- line

所以 这段代码 并不是 表明看到的那个顺序 去执行的。

这2个 goroutine 里面 都有一个 等待 接收 initDone 通过的 动作。 <-initDone 都是一上来就先执行这句。
这个是个 只读的 通道,不能往里写入的,也的确没看到哪个地方往里写入。那么调用这个不就会一直阻塞的吗,确实会阻塞到这里,但是当 关闭通道时候
这个就会继续往下执行了。。。 这里面正式 用到了这个技巧 导致 上面 3个 点 执行顺序 不一样了。

总结 Runtime New

下面 总结一下 这个 执行顺序:

1. 先  `initDone := make(chan struct{})` 新建通道
2. 注册 并且 立即 执行 第1个 地方的 goroutine
3. 注册 并且 立即 执行 第2个 地方的 goroutine
4. 执行 第三个 地方的代码 LoadAllPrograms(), 这一步里面 会去 初始化 handles 里面的 对象。这个初始化好之后 就能 ` r.handles[prog].lines ` 这样来使用了
5. 此时 New() 方法 结束了。要执行 defer 注册的 方法了,其中 有一个 就是 关闭 close(initDone) 通道的。
6. 通道关闭之后, 第一个 goroutine  会继续 <-initDone 之后代码的执行。
7. 同样的, 第2个 goroutine  也会继续 <-initDone 之后代码的执行。

mtail 代码 里面 多处 用到 了这样的 技巧。

上面提到的 第一个 goroutine 是 用来接收 从log文件读取的每一行内容的。 从 lines 通道接收。

在哪里 往这个通道放数据呢?

是在 internal/tailer/logstream/filestream.go 中的 stream() 方法,

这里面有个 for 循环 去不停的读文件。读到文件结尾就会 sleep了。后面文件有新写入会继续读取。

用到了 waker.Wake() 这个。是从这里初始化的 logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)

在 for 循环里面 有个 func decodeAndSend 方法, 在里面 有个 func sendLine 就是这个方法 往通道里面发送每一行数据的。

1
2
3
4
5
6
func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
glog.V(2).Infof("sendline")
logLines.Add(pathname, 1)
lines <- logline.New(ctx, pathname, partial.String())
partial.Reset()
}

Runtime 中的 LoadAllPrograms 方法

调用 顺序 LoadAllPrograms -> LoadProgram -> CompileAndRun -> Compile(name, &buf) -> VM 中的 Run()

  1. LoadAllPrograms 里面 循环 -prog 选项 后面的目录,遍历 这个 目录下面的 所有的 mtail 结尾的文件。

    这个里面 还会 对 一些 已经 删除掉的 mtail 文件 进行处理, 会关闭 其对应的 handle.lines 和 从 handles map 中删除。

  2. 如果 -prog 选项 跟着的是一个单独的文件 那就 只 遍历 这个一个文件了,

  3. 其实 就是 循环 每个 mtail 文件,对其调用 LoadProgram 方法了. 这个方法就接收一个 mtail文件路径作为参数。

Runtime 中的 LoadProgram 方法

  1. 判断 是否 点 开头的文件,隐藏文件会忽略的。
  2. 判断是否 .mtail 后缀的文件,不是也会忽略的。
  3. 打开这个文件,打开失败会直接返回,也会加个错误标记 到 ProgLoadErrors.Add(name, 1), 这个是用来 计数 加载 mtail 失败次数的。里面都有 锁保护的。
  4. 打开文件成功,然后 注册 defer 方法 去要关闭这个文件的。
  5. 调用 CompileAndRun 方法, 返回值 会存放到 r.programErrors 字典里面,字典key就是 mtail 文件名,
    当然 往这个字典里面 加数据 前后也有锁 保护的。
  6. 如果 r.programErrors[name] 不是nil ,并且 r.errorsAbort 设置了,就会返回 这个错误状态了。这是个 error 类型的。

Runtime 中的 CompileAndRun 方法

这个方法 接收 2个参数 ,一个是 mtail 文件名, 一个就是 打开的 mtail 文件 的 io.Reader

  1. 计算 mtail 文件内容的 sha256 值, 利用了 TeeReader 来读取,减少内存消耗。

  2. 加锁保护,读取map中数据, vh, ok := r.handles[name] 的好 handles 中的 vh,这个是 vmHandle 类型的。

     第一次读取,ok=false,肯定是没有值的,因为我们还没有 初始化呢,
     只是前边吧 handles 这个 map 给新建出来了,还没往里放东西呢。
     
     后续再次读取就能 得到值,如果的到了,就会 进行 比较了`  bytes.Equal(vh.contentHash, contentHash)`
     vh里面存放的 mtail 文件内容的 hash 值 和 前面那一步算出来的 是否一致,一致说明 mtail 文件没有变化呀,直接return了。
     有变化 才会往下执行,往下执行 肯定是 重新 New 出来一个 新的 vmHandle 复制给 字典里面 r.handles[name] 拉。
    
  3. 执行 obj, errs := r.c.Compile(name, &buf) 把 mtail 文件名传入, mtail 文件 内容传入。返回 *code.Object 类 型,来自 internal/runtime/code/object.go 文件中

  4. 如果第三步 执行 有 err, 或者 返回 的 obj 是 nil 就会 记录在 ProgLoadErrors 里面,就会给这个 里面的计数 加一了。 这个在 (“Runtime 中的 LoadProgram 方法”) 第 6 步的 时候也有介绍。

  5. 重头出来了,vm的初始化,也就是 vmHandle 中的 vm的 初始化。 调用 vm.New() 方法

  6. 接着 重头: vmHandle 的初始化,&vmHandle{contentHash: contentHash, vm: v, lines: lines}

  7. 调用 go v.Run(lines, &r.wg), 还是放到一个 goroutine 里面执行的。在这个里面就会出来 lines 通道。就是处理每一行日志文件的内容了。

VM 中的 Run 方法

1
2
3
4
5
6
7
8
9
10

func (v *VM) Run(lines <-chan *logline.LogLine, wg *sync.WaitGroup) {
defer wg.Done()
glog.V(1).Infof("started VM %q", v.name)
ctx := context.TODO()
for line := range lines {
v.ProcessLogLine(ctx, line)
}
glog.Infof("VM %q finished", v.name)
}

这个 run 方法 就是 循环 处理 来自 通道 lines 的每一行数据的,没有新的数据来 回等待那里。

VM 中的 ProcessLogLine 方法

真正去 处理 每一行日志数据的。每一行 就是一个 字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (v *VM) ProcessLogLine(ctx context.Context, line *logline.LogLine) {
start := time.Now()
defer func() {
LineProcessingDurations.WithLabelValues(v.name).Observe(time.Since(start).Seconds())
}()
t := new(thread)
t.matched = false
v.t = t
v.input = line
t.stack = make([]interface{}, 0)
t.matches = make(map[int][]string, len(v.re))
for {
if t.pc >= len(v.prog) {
return
}
if v.trace != nil {
v.trace = append(v.trace, t.pc)
}
i := v.prog[t.pc]
t.pc++

v.execute(t, i)

if v.terminate {
// Terminate only stops this invocation on this line of input; reset the terminate flag.
v.terminate = false
return
}
}
}

这个方法里面 就是 对 一行数据 进行 处理的。 主要的就是 v.execute(t, i) t 是一个 thread, i 是 Instr,这个应该是mtail文件编译后得到的。

详细 分析一下 Compile(name, &buf) 方法

这个方法的 返回值 是 在 新建VM实例的时候用到的,需要传入的一个参数, vm.New() 时候用到的。

这个方法 就是 把 mtail 文件内容 解析转换成 对应的 code.Object 类型。

1
2
3
4
5
6
ast, err = parser.Parse(name, input)  通过 parse 解析, 得到 ast

name 是 mtail 文件名称,不带路径的
input 就是 mtail 文件整体内容。注释的行 也包含在里面的。

ast 抽象 语法树
1
obj, err = codegen.CodeGen(name, ast) 通过 ast 转换为 obj

这个解析 感觉 又 进入了到了 另外一个 复杂的领域了。。。 yacc, lexer 等等。

parser.Parse 方法,里面有调用 newParser() 然后就调用 mtailParse(p) 了。调用这个方法 就不能断点调试了。。。

1
2
3
4
5
6
7
8
func Parse(name string, input io.Reader) (ast.Node, error) {
p := newParser(name, input)
r := mtailParse(p)
if r != 0 || p.errors != nil {
return nil, p.errors
}
return p.root, nil
}
1
2
3
4
counter lines_total
/$/ {
lines_total++
}

这样的一个 mtail 文件 编译后 到的 Object.Program 里面 有7个 Instr。 这是一个切片。

1
2
3
4
5
6
7
0 = {github.com/google/mtail/internal/runtime/code.Instr} 
1 = {github.com/google/mtail/internal/runtime/code.Instr}
2 = {github.com/google/mtail/internal/runtime/code.Instr}
3 = {github.com/google/mtail/internal/runtime/code.Instr}
4 = {github.com/google/mtail/internal/runtime/code.Instr}
5 = {github.com/google/mtail/internal/runtime/code.Instr}
6 = {github.com/google/mtail/internal/runtime/code.Instr}

同时 Object.Strings 是 nil

Object.Regexps 是 正则 $

Object.Metrics 里面有 1 个