jenkins中的PingThread介绍

问题引出

异地jenkins节点总是断开链接,然后发出的通知有如下报错

1
2
3
Connection was broken: java.util.concurrent.TimeoutException: Ping started at 1667335510986 hasn't completed by 1667335750987
at hudson.remoting.PingThread.ping(PingThread.java:134)
at hudson.remoting.PingThread.run(PingThread.java:90)

上面报错 行的代码如下:

1
2
onDead(new TimeoutException("Ping started at "+start+" hasn't completed by "+System.currentTimeMillis()));//.initCause(e)

通过计算 1667335750987 和 1667335510986 数字差值在 240001 这个,大致就是 240 秒的时间。

PingThread 类

PingThread 类在 hudson.remoting.PingThread 这里,源码地址在 https://github.com/jenkinsci/remoting.git

PingThread 类 继承 java 的 Thread 类,可见这个应该是 单独 起了一个线程在执行的。

1
2
public abstract class PingThread extends Thread {

通过这个类的注释说明

1
2
3
4
5
6
7
8
* Periodically perform a ping.
*
* Useful when a connection needs to be kept alive by sending data,
* or when the disconnection is not properly detected.
*
* {@link #onDead()} method needs to be overridden to define
* what to do when a connection appears to be dead.

可知这个类是用来:

定期执行ping操作。当然不是真的执行ping命令的,应该是模拟ping命令类似的效果吧。
当连接需要通过发送数据来保持活跃时很有用,
或当没有正确检测到断开时。
onDead()}方法需要重写来定义当连接出现故障时该怎么办。
这是一个抽象类,子类要继承,然后重新实现 onDead()}方法。
这个可以有效防止长时间没有数据发送而网络断开,这样能周期性的发送数据保持在线。

这个类有2个成员 timeout 和 interval

1
2
3
4
5
6
7
8
9
10
11
12
private final Channel channel;

/**
* Time out in milliseconds.
* If the response doesn't come back by then, the channel is considered dead.
*/
private final long timeout;

/**
* Performs a check every this milliseconds.
*/
private final long interval;

timeout就是超时时间, 我们开头那个报错的 240 秒 就是设置到这里的,通过代码发现 jenkins 里面默认设置的超时 就是 240 秒。

这个默认超时直接 就是通过 实例化 PingThread 时候传入的,通过搜索代码可以发现 没有几个地方 调用这个类的构造方法,

第一个就是 hudson.slaves.ChannelPinger 里面。
第二个是 hudson.remoting.Launcher

interval 是执行ping的间隔时间,也就是 每隔多久执行一下。jenkins 里面默认是 300 秒。是 hudson.slaves.ChannelPinger 里面设置的。

ChannelPinger 类

1
2
3
4
public class ChannelPinger extends ComputerListener {
static final int PING_TIMEOUT_SECONDS_DEFAULT = 4 * 60; // 默认的超时时间 4 分钟
static final int PING_INTERVAL_SECONDS_DEFAULT = 5 * 60; // 默认的 周期执行间隔时间 5 分钟

ChannelPinger 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static final Logger LOGGER = Logger.getLogger(ChannelPinger.class.getName());
private static final String TIMEOUT_SECONDS_PROPERTY = ChannelPinger.class.getName() + ".pingTimeoutSeconds";
private static final String INTERVAL_MINUTES_PROPERTY_DEPRECATED = ChannelPinger.class.getName() + ".pingInterval"; // 这个属性 已经不再建议使用了
private static final String INTERVAL_SECONDS_PROPERTY = ChannelPinger.class.getName() + ".pingIntervalSeconds";

/**
* Timeout for the ping in seconds.
*/
private int pingTimeoutSeconds = SystemProperties.getInteger(TIMEOUT_SECONDS_PROPERTY, PING_TIMEOUT_SECONDS_DEFAULT, Level.WARNING);

/**
* Interval for the ping in seconds.
*/
private int pingIntervalSeconds = PING_INTERVAL_SECONDS_DEFAULT;

public ChannelPinger() { // 构造方法,主要就是用来 初始化 pingIntervalSeconds, 超时 pingTimeoutSeconds 那个已经在 类定义时候 设置 初始值了。

Integer interval = SystemProperties.getInteger(INTERVAL_SECONDS_PROPERTY, null, Level.WARNING);

// if interval wasn't set we read the deprecated property in minutes
if (interval == null) {
interval = SystemProperties.getInteger(INTERVAL_MINUTES_PROPERTY_DEPRECATED,null, Level.WARNING);
if (interval != null) {
LOGGER.warning(INTERVAL_MINUTES_PROPERTY_DEPRECATED + " property is deprecated, " + INTERVAL_SECONDS_PROPERTY + " should be used");
interval *= 60; //to seconds
}
}

if (interval != null) {
pingIntervalSeconds = interval;
}
}

这个类作用是:

jenkins的节点启动时候 执行 new PingThread 并且启动这个 线程的, 在 setUpPingForChannel 方法里面

这个类继承 ComputerListener 类,这个就会有一些 节点生命周期方法,例如 节点上线了 执行 某个方法preOnline,
节点将要上线 执行 某个方法onOnline,节点离线了执行 某个方法onOffline。等等。

pingTimeoutSeconds 和 pingIntervalSeconds 可以通过 java 启动时候来设置,修改默认值。

1
2
java -Dhudson.slaves.ChannelPinger.pingTimeoutSeconds=xxx -Dhudson.slaves.ChannelPinger.pingIntervalSeconds=xxx  

特别注意:

1. 要设置在 jenkins master 端的,而且必须重启后才能生效的。
2. 如果设置0就会禁用 这个 PingThread 功能
3. 通过在 master 的 脚本命令行 里面 设置是不生效的,这个我试过,因为 ChannelPinger 类的初始化是在 jenkins master 启动的时候设置的,后续就不会在被设置了。触发用反射??
4. 可以通过 脚本命令行 面 去查看这个值 是否设置了。
1
2
3
4
5
6
7
8
9
10
11
12
// 设置这个 ping thread 的时间

import jenkins.util.SystemProperties;

System.setProperty("hudson.slaves.ChannelPinger.pingTimeoutSeconds","3600")
System.setProperty("hudson.slaves.ChannelPinger.pingIntervalSeconds","600")

def a = SystemProperties.getInteger("hudson.slaves.ChannelPinger.pingTimeoutSeconds", 0)
def b = SystemProperties.getInteger("hudson.slaves.ChannelPinger.pingIntervalSeconds", 0)

println(a)
println(b)

ChannelPinger 里面 的 preOnline 方法

这里 ChannelPinger 选择了 preOnline 方法, 这个方法好像只有 从节点 上线才会执行,master节点 不算的。

也就是 节点 将要上线的时候,会把 PingThread 初始化好,然后调用。
1
2
3
4
5
6
7
8
@Override
public void preOnline(Computer c, Channel channel, FilePath root, TaskListener listener) {
SlaveComputer slaveComputer = null;
if (c instanceof SlaveComputer) {
slaveComputer = (SlaveComputer) c;
}
install(channel, slaveComputer); // 这里调用了 install 方法
}

ChannelPinger 里面 的 install 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@VisibleForTesting
/*package*/ void install(Channel channel, @CheckForNull SlaveComputer c) {
if (pingTimeoutSeconds < 1 || pingIntervalSeconds < 1) {
LOGGER.warning("Agent ping is disabled");
return;
}

// set up ping from both directions, so that in case of a router dropping a connection,
// both sides can notice it and take compensation actions.
try {
channel.call(new SetUpRemotePing(pingTimeoutSeconds, pingIntervalSeconds));
LOGGER.fine("Set up a remote ping for " + channel.getName());
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed to set up a ping for " + channel.getName(), e);
}

setUpPingForChannel(channel, c, pingTimeoutSeconds, pingIntervalSeconds, true);
}

这个方法的主要作用:

1. 判断 pingTimeoutSeconds 和 pingIntervalSeconds, 如果小于 1 就表示 禁用 PingThread了。
上面有说 怎么 设置 这个2个属性值。
2. 第二个作用,也是最主要的作用: 调用 setUpPingForChannel, 用例 实例化 PingThread,并启动这个线程的。
通过代码可以发现 发现 调用了 2次 setUpPingForChannel。
2.1 第一次是  channel.call(new SetUpRemotePing(pingTimeoutSeconds, pingIntervalSeconds));
通过一个 类 SetUpRemotePing 去调用的, 这个类比较特殊 集成了 MasterToSlaveCallable 类。 然后使用了 channel.call() 去调用的,这个表示 让这个 线程
去 channel 链接的那个节点 上执行,也就是 不在master 机器上执行,在从节点上执行。
2.2 第二次 调用是 setUpPingForChannel(channel, c, pingTimeoutSeconds, pingIntervalSeconds, true); 调用,这个是在 master 节点上执行的。
所以通过 日志可以发现 会有多个打印的。日志可以通过 master 页面上  日志记录器 来设置,然后日志等级设置个 all 就会打印出来了。

master 节点的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
十一月 02, 2022 1:51:18 下午 详细 hudson.slaves.ChannelPinger
Set up a remote ping for JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028 // 这个就是 install方法里面 调用完 channel.call() 之后 打印的
十一月 02, 2022 1:51:18 下午 详细 hudson.slaves.ChannelPinger
setting up ping on JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028 with a 300 seconds interval and 240 seconds timeout // 这个是 setUpPingForChannel() 方法的 第一句 打印。

十一月 02, 2022 1:51:18 下午 详细 hudson.slaves.ChannelPinger
Ping thread started for hudson.remoting.Channel@3055f449:JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028 with a 300 seconds interval and a 240 seconds timeout // 这个是 setUpPingForChannel() 方法的 最后 句 打印。

十一月 02, 2022 1:51:18 下午 详细 hudson.remoting.PingThread
pinging JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028
十一月 02, 2022 1:51:18 下午 详细 hudson.remoting.PingThread
waiting 239s on JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028
十一月 02, 2022 1:51:18 下午 详细 hudson.remoting.PingThread
ping succeeded on JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028

node 节点上的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
十一月 02, 2022 1:51:03 下午 详细 hudson.slaves.ChannelPinger
setting up ping on JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743 with a 300 seconds interval and 240 seconds timeout // 这个是 setUpPingForChannel() 方法的 第一句 打印。

十一月 02, 2022 1:51:03 下午 警告 hudson.remoting.PingThread
public PingThread(Channel channel, long timeout, long interval) // PingThread 构造方法里面打印的,自己加的 打印。。。。

十一月 02, 2022 1:51:03 下午 详细 hudson.slaves.ChannelPinger
Ping thread started for hudson.remoting.Channel@401bc256:JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743 with a 300 seconds interval and a 240 seconds timeout // 这个是 setUpPingForChannel() 方法的 最后 句 打印。

十一月 02, 2022 1:51:03 下午 警告 hudson.remoting.PingThread ping
pinging JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743
十一月 02, 2022 1:51:03 下午 警告 hudson.remoting.PingThread ping
waiting 239s on JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743
十一月 02, 2022 1:51:04 下午 警告 hudson.remoting.PingThread ping
ping succeeded on JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743
十一月 02, 2022 1:51:04 下午 警告 hudson.remoting.PingThread$Ping call
class Ping pool-1-thread-6 for JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743 id=7, 47, bright.ma, /tmp

分析:

3. 特别注意的是 class Ping() 一次是在 master 上调用,一次是在 slave 上调用因为 这个类没有继承 MasterToSlaveCallable。 虽然 调用的 都是 channel.callAsync(new Ping())。 之前就是因为看错了 以为这2次都是在slave节点执行呢。
 slave上调用顺序是 channel.call(new SetUpRemotePing(pingTimeoutSeconds, pingIntervalSeconds)); -> PingThread-> run() -> ping() 这个 一个是在 master 上执行,一个是在 slave 上执行的。
 master 上是 PingThread-> run() -> ping() 。 
1
2
3
4
5
class Ping Computer.threadPoolForRemoting [#3] for JNLP4-connect connection from 10.0.12.116/10.0.12.116:37260 id=22, 388,  mamh,             /home/mamh/work/github/jenkins
class Ping pool-1-thread-5 for JNLP4-connect connection to 10.0.61.75/10.0.61.75:37743 id=8, 28, bright.ma, /tmp

日志最后 打印 拼接 的 有 System.getenv("USER") 和 System.getenv("PWD")

ChannelPinger 里面 的 setUpPingForChannel 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@VisibleForTesting
@Restricted(NoExternalUse.class)
public static void setUpPingForChannel(final Channel channel, final SlaveComputer computer, int timeoutSeconds, int intervalSeconds, final boolean analysis) {
LOGGER.log(Level.FINE, "setting up ping on {0} with a {1} seconds interval and {2} seconds timeout", new Object[] {channel.getName(), intervalSeconds, timeoutSeconds});

final AtomicBoolean isInClosed = new AtomicBoolean(false);

final PingThread t = new PingThread(channel, TimeUnit.SECONDS.toMillis(timeoutSeconds), TimeUnit.SECONDS.toMillis(intervalSeconds)) { // 实例化 PingThread 类,
。。。。 省略 这些
};

channel.addListener(new Channel.Listener() { // channel 里面加个 Listener,
。。。。 省略 这些
});

t.start(); // 启动 PingThread 线程

}

这个方法 主要作用:

1. 实例化 PingThread 类,
2. channel 里面加个 Listener, 当 channel 关闭的时候 要去 停止 这个pingthread 线程,同时还设置了 标记 isInClosed,
3. 启动 PingThread 线程 

其他几个知识点:

1. TimeUnit.SECONDS.toMillis 作用, 把 秒 转换 为 毫秒的。 Seconds 表示秒。Millis 表示毫秒。Micros 表示微妙。Nanos 表示纳秒。
PingThread 类 定义 构造方法 使用的 是 毫秒。当然 在 这里面 又 有转换 成 纳秒来使用的,真是醉了。。。。。。
2. AtomicBoolean, 原子操作 布尔 类型的吧。因为是多线程了吧。来自 package java.util.concurrent.atomic; 作者: Doug Lea

ChannelPinger 里面 的 SetUpRemotePing 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

@VisibleForTesting
@Restricted(NoExternalUse.class)
public static class SetUpRemotePing extends MasterToSlaveCallable<Void, IOException> {
private final int pingTimeoutSeconds;
private final int pingIntervalSeconds;

public SetUpRemotePing(int pingTimeoutSeconds, int pingIntervalSeconds) {
this.pingTimeoutSeconds = pingTimeoutSeconds;
this.pingIntervalSeconds = pingIntervalSeconds;
}

@Override
public Void call() throws IOException {
setUpPingForChannel(getOpenChannelOrFail(), null, pingTimeoutSeconds, pingIntervalSeconds, false);
return null;
}


分析:

1. 继承了 MasterToSlaveCallable 类,这样 将会在 从节点上执行。
2. 只有 重写 call() 方法 就行。这个方法里面 调用了 setUpPingForChannel() 方法。

PingThread 的 run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void run() {
try {
while(true) {
long nextCheck = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(interval);

ping();

// wait until the next check
long diff;
while((diff = nextCheck - System.nanoTime()) > 0) {
TimeUnit.NANOSECONDS.sleep(diff);
}
}
} catch (ChannelClosedException e) {
LOGGER.fine(getName()+" is closed. Terminating");
} catch (IOException e) {
onDead(e);
} catch (InterruptedException e) {
// use interruption as a way to terminate the ping thread.
LOGGER.fine(getName()+" is interrupted. Terminating");
}
}

分析:

1. 这个就是 线程 主要执行的代码了。java 中实现 线程 就是 继承个 Thread 类,然后重新 里面的 run 方法的。
2. 这里面有个 while 的死循环。
3.nextCheck = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(interval); 开始 先 计算一下 下一次的时间点,这里 用到了interval,这里类似的就是 每隔 interval 的时候 执行一下 下面的 ping() 方法。
4. ping(); 调用执行,这个真正执行 ping的 方法。
5. 执行完 ping 然后 计算时间差,和 当前时间点 和先前的 nextCheck 计算差值,如果 时间大于0 表示 ping() 方法执行的 很快,我们这里就还需要再次 sleep 一些剩下的 时间,来保证 周期 interval 时间段的完整。
6. 继续 while的死循环。触发有异常抛出,就是 被 catche 到,IOException 异常会调用 onDead(e); 不过我们最开始那个抛出异常的地点不是这里

PingThread 的 ping 方法

下面来看 真正的 ping 动作是怎么 执行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

private void ping() throws IOException, InterruptedException {
LOGGER.log(Level.WARNING, "pinging {0}", channel.getName());
Future<?> f = channel.callAsync(new Ping());
long start = System.currentTimeMillis();

long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
long remaining = end - System.nanoTime();

do {
LOGGER.log(Level.WARNING, "waiting {0}s on {1}", new Object[] {TimeUnit.NANOSECONDS.toSeconds(remaining), channel.getName()});
try {
f.get(Math.max(1,remaining),TimeUnit.NANOSECONDS);
LOGGER.log(Level.WARNING, "ping succeeded on {0}", channel.getName());
return;
} catch (ExecutionException e) {
if (e.getCause() instanceof RequestAbortedException)
return; // connection has shut down orderly.
onDead(e);
return;
} catch (TimeoutException e) {
// get method waits "at most the amount specified in the timeout",
// so let's make sure that it really waited enough
}
remaining = end - System.nanoTime();
} while(remaining>0);

onDead(new TimeoutException("Ping started at "+start+" hasn't completed by "+System.currentTimeMillis()));//.initCause(e) 我们最开始的那个报错 就是 从这里 onDead 抛出的。
}

分析:

1.  LOGGER.log(Level.WARNING, "pinging {0}", channel.getName()); 打印日志, 上面我 贴过一段日志,
pinging JNLP4-connect connection from 192.168.1.116/192.168.1.116:35028 masater 节点打印的
pinging JNLP4-connect connection to 192.168.1.75/192.168.1.75:37743     slave 节点打印的。    之前 分析过 是启动 了 2个 ping,主从节点 各一个。
2. 调用一下 channel.callAsync(new Ping()); 异步调用 执行  class Ping 里面的  call() 方法,这里面是空的。
3. 计算end时间,当前时间 加一个 超时 时间段 timeout。 计算 remaining, 这个第一次 应该是 timeout 这个时间长度。
4.  do while 循环, 判断 remaining 是否大于 0 。
5. f.get(Math.max(1,remaining),TimeUnit.NANOSECONDS); 从 之前的  channel.callAsync 获取结果, 设定超时时间单位是 NANOSECONDS 纳秒。 超时时间是 remaining 也是个纳秒。
这个主要作用就是 获取 那个 异步任务 执行结束了没有,如果 很快执行 结束就不会 超时,如果执行的比较慢 就会 抛出异常。执行正常 就直接 return拉。
6. 抛出 TimeoutException 异常 ,会执行 下面的 remaining = end - System.nanoTime();, 重新计算一下 remaining 的值, 大于 0 就 退出 do while 循环了。然后 抛异常了 。

结论:

我们最开始的那个报错,就是因为  f.get(Math.max(1,remaining),TimeUnit.NANOSECONDS); 这个超时了。报了个 超时的错误。