Linux: shell实现多线程, Forking / Multi-Threaded Processes | Bash, shell 线程池

简单介绍

按照shell语法,后一个前台命令必须等待前一个前台命令执行完毕才能进行,这就是所谓的单线程程序。

如果两条命令之间有依赖性还好,否则后一条命令就白白浪费了等待的时间了。

网上查了一遍,shell并没有真正意义上的多进程。而最简单的节省时间,

达到“多线程”效果的办法,是将前台命令变成后台进程,这样一来就可以跳过前台命令的限制了。

全前台进程:

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
#filename:simple.sh
starttime=$(date +%s)
for ((i=0;i<5;i++));do
{
sleep 3;echo 1>>aa && endtime=$(date +%s) && echo "我是$i,运行了3秒,整个脚本执行了$(expr $endtime - $starttime)秒"
}
done
cat aa|wc -l
rm aa

运行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ time bash simple.sh 
我是0,运行了3秒,整个脚本执行了3秒
我是1,运行了3秒,整个脚本执行了6秒
我是2,运行了3秒,整个脚本执行了9秒
我是3,运行了3秒,整个脚本执行了12秒
我是4,运行了3秒,整个脚本执行了15秒
5

real 0m15.117s
user 0m0.022s
sys 0m0.023s
"我是X,运行了3秒" 就规规矩矩的顺序输出了。


方案1:使用”&”使命令后台运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

#!/bin/bash
#filename:multithreading.sh
starttime=$(date +%s)
for ((i=0;i<5;i++));do
{
sleep 3;echo 1>>aa && endtime=$(date +%s) && echo "我是$i,运行了3秒,整个脚本执行了$(expr $endtime - $starttime)秒"
}&
done
wait
cat aa|wc -l
rm aa


运行测试:
$ time bash multithreading.sh
我是0,运行了3秒,整个脚本执行了3秒
我是3,运行了3秒,整个脚本执行了3秒
我是4,运行了3秒,整个脚本执行了3秒
我是1,运行了3秒,整个脚本执行了3秒
我是2,运行了3秒,整个脚本执行了3秒
5

real 0m3.030s
user 0m0.041s
sys 0m0.025s

运行很快,而且很不老实(顺序都乱了,大概是因为expr运算所花时间不同)

解析:这一个脚本的变化是在命令后面增加了&标记,意思是将进程扔到后台。

在shell中,后台命令之间是不区分先来后到关系的。所以各后台子进程会抢夺资源进行运算。

wait命令:

1
2
wait  [n]
n 表示当前shell中某个执行的后台命令的pid,wait命令会等待该后台进程执行完毕才允许下一个shell语句执行;如果没指定则代表当前shell后台执行的语句,wait会等待到所有的后台程序执行完毕为止。

如果没有wait,后面的shell语句是不会等待后台进程的,

一些对前面后台进程有依赖关系的命令执行就不正确了。

例如wc命令就会提示aa不存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ time bash multithreading.sh 
cat: aa: No such file or directory
0
rm: cannot remove 'aa': No such file or directory

real 0m0.011s
user 0m0.011s
sys 0m0.003s
buildsrv-ci@buildsrv-219:/tmp$ 我是0,运行了3秒,整个脚本执行了3秒
我是1,运行了3秒,整个脚本执行了3秒
我是2,运行了3秒,整个脚本执行了3秒
我是3,运行了3秒,整个脚本执行了3秒
我是4,运行了3秒,整个脚本执行了3秒


在linux中,在命令的末尾加上&符号,则表示该命令将在后台执行,

这样后面的命令不用等待前面的命令执行完就可以开始执行了。

示例中的循环体内有多条命令,则可以以{}括起来,在大括号后面添加&符号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#/bin/bash
all_num=10

a=$(date +%H%M%S)

for num in `seq 1 ${all_num}`
do
{
sleep 1
echo ${num}
} &
done

b=$(date +%H%M%S)

echo -e "startTime:\t$a"
echo -e "endTime:\t$b"

运行结果:
$ bash 1.sh
startTime: 115639
endTime: 115639
$ 9
6
1
2
3
7
8
10
5
4

通过结果可知,程序没有先打印数字,而是直接输出了开始和结束时间,

然后显示出了命令提示符$ (出现命令提示符表示脚本已运行完毕),

然后才是数字的输出。这是因为循环体内的命令全部进入后台,

所以均在sleep了1秒以后输出了数字。开始和结束时间相同,

即循环体的执行时间不到1秒钟,这是由于循环体在后台执行,没有占用脚本主进程的时间。

方案2:命令后台运行 + wait 命令

解决上面的问题,只需要在上述循环体的done语句后面加上wait命令,

该命令等待当前脚本进程下的子进程结束,再运行后面的语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#/bin/bash

all_num=10

a=$(date +%H%M%S)

for num in `seq 1 ${all_num}`
do
{
sleep 1
echo ${num}
} &
done

wait

b=$(date +%H%M%S)

echo -e "startTime:\t$a"
echo -e "endTime:\t$b"



运行结果:
$ bash 2.sh
1
2
3
4
5
6
7
8
9
10
startTime: 115936
endTime: 115937

但这样依然存在一个问题:

因为&使得所有循环体内的命令全部进入后台运行,那么倘若循环的次数很多,

会使操作系统在瞬间创建出所有的子进程,

这会非常消耗系统的资源。如果循环体内的命令又很消耗系统资源,则结果可想而知。

如果并行执行的任务数量是可控的,可以简单使用这种方式。如果并行任务很多,就不适合了。

方案3:使用文件描述符控制并发数

最好的方法是并发的进程是可配置的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#/bin/bash

all_num=10

thread_num=5

a=$(date +%H%M%S)

tempfifo="my_temp_fifo"
mkfifo ${tempfifo}

exec 6<>${tempfifo}
rm -f ${tempfifo}

for ((i=1;i<=${thread_num};i++))
do
{
echo
}
done >&6

for num in `seq 1 ${all_num}`
do
{
read -u6
{
sleep 1
echo ${num}
echo "" >&6
} &
}
done

wait

exec 6>&-

b=$(date +%H%M%S)

echo -e "startTime:\t$a"
echo -e "endTime:\t$b"

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ bash 3.sh 
1
2
3
4
5
6
7
8
9
10
startTime: 120248
endTime: 120254

这个方案有时通不过 mkfifo ${tempfifo} ,那么就才用更简单的方法来控制:

1
2
3
4
5
6
7
8
for ARG in  $*; do
command $ARG &
NPROC=$(($NPROC+1))
if [ "$NPROC" -ge 4 ]; then
wait
NPROC=0
fi
done

4 是最大线程,command 是你要执行的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/bin/sh
function a_sub {
sleep 3;
echo 1>>aa && endtime=$(date +%s) && echo "我是$i,运行了3秒,整个脚本执行了$(expr $endtime - $starttime)秒" && echo
}

starttime=$(date +%s)
export starttime
tmp_fifofile="/tmp/$.fifo"
echo $tmp_fifofile
mkfifo $tmp_fifofile
exec 6<>$tmp_fifofile
rm $tmp_fifofile
thread=3

for ((i=0;i<$thread;i++));
do
echo
done >&6

for ((i=0;i<10;i++))
do
read -u6
{
a_sub || {echo "a_sub is failed"}
echo >&6
} &
done

wait
exec 6>&-
wc -l aa
rm -f aa
exit 0

执行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ time bash multi.sh 
/tmp/$.fifo
我是0,运行了3秒,整个脚本执行了3秒

我是1,运行了3秒,整个脚本执行了3秒

我是2,运行了3秒,整个脚本执行了3秒

我是3,运行了3秒,整个脚本执行了6秒

我是4,运行了3秒,整个脚本执行了6秒

我是5,运行了3秒,整个脚本执行了6秒

我是6,运行了3秒,整个脚本执行了9秒

我是7,运行了3秒,整个脚本执行了9秒

我是8,运行了3秒,整个脚本执行了9秒

我是9,运行了3秒,整个脚本执行了12秒

10 aa

real 0m12.047s
user 0m0.057s
sys 0m0.052s

可以看到同一时间只有4个进程(其中一个是shell本身(pid=34389)),子进程数量得到了控制。

1
2
3
4
5
6
7
8
9
10
11
thread=3 其中3个子进程
│ │ └─sshd,44944
│ │ ├─bash,33614
│ │ │ └─bash,34389 multi.sh
│ │ │ ├─bash,34393 multi.sh
│ │ │ │ └─sleep,34396 3
│ │ │ ├─bash,34394 multi.sh
│ │ │ │ └─sleep,34397 3
│ │ │ └─bash,34395 multi.sh
│ │ │ └─sleep,34398 3

此例子说明了一种用wait、read命令模拟多线程的一种技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash

function a_sub { # 此处定义一个函数,作为一个线程(子进程)
sleep 3 # 线程的作用是sleep 3s
}


tmp_fifofile="/tmp/$$.fifo"
mkfifo $tmp_fifofile # 新建一个fifo类型的文件
exec 6<>$tmp_fifofile # 将fd6指向fifo类型
rm $tmp_fifofile

thread=15 # 此处定义线程数
for ((i=0;i<$thread;i++));do
echo
done >&6 # 事实上就是在fd6中放置了 $thread 个回车符


for ((i=0;i<50;i++));do # 50次循环,可以理解为50个主机,或其他
read -u6
# 一个read -u6命令执行一次,就从fd6中减去一个回车符,然后向下执行,
# fd6中没有回车符的时候,就停在这了,从而实现了线程数量控制

{ # 此处子进程开始执行,被放到后台
a_sub && { # 此处可以用来判断子进程的逻辑
echo "a_sub is finished"
} || {
echo "sub error"
}
echo >&6 # 当进程结束以后,再向fd6中加上一个回车符,即补上了read -u6减去的那个
} &
done

wait # 等待所有的后台子进程结束
exec 6>&- # 关闭df6

exit 0

此程序中的命令 mkfifo tmpfile 和linux中的命令 mknod tmpfile p效果相同。

区别是mkfifo为POSIX标准,因此推荐使用它。该命令创建了一个先入先出的管道文件,

并为其分配文件标志符6。管道文件是进程之 间通信的一种方式,
注意这一句很重要 exec 6<>$tmp_fifofile # 将fd6指向fifo类型

如果没有这句,在向文件$tmp_fifofile或者&6写入数据时,程序会被阻塞,

直到有read读出了管道文件中的数据为止。而执行了上面这一句后就可以在程序运行期间不断向fifo类

型的文件写入数据而不会阻塞,并且数据会被保存下来以供read程序读出。

方案4:使用xargs -P控制并发数

xargs命令有一个-P参数,表示支持的最大进程数,默认为1。为0时表示尽可能地大,即方案2的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#/bin/bash

all_num=10
thread_num=5

a=$(date +%H%M%S)

seq 1 ${all_num} | xargs -n 1 -I {} -P ${thread_num} sh -c "sleep 1;echo {}"

b=$(date +%H%M%S)

echo -e "startTime:\t$a"
echo -e "endTime:\t$b"


运行结果:
$ bash 4.sh
2
1
3
4
5
6
7
8
9
10
startTime: 120537
endTime: 120539

工作中实际用到的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 对 某个目录 下面的所有仓库执行 git fetch操作
# 参数1 目录名称
function find_exec_fetch() {
local L_ROOT_PATH="${1}"
(
[[ -n "${L_ROOT_PATH}" ]] || return 1
cd "${L_ROOT_PATH}" || return 1 # 切换到本的路径下面

if ! declare -F __git_main > /dev/null 2>&1 ; then # 没有定义这个函数,这里才去定义一个默认的
function __git_main(){
local LOCAL_GIT="${1}"
printinfo "[git fetch][$LOCAL_GIT][$USER] "
git -C ${LOCAL_GIT} fetch --all --force --no-tags 2>&1 | awk "{print \"\033[1;32m[ info]\033[0m[git fetch] \", \$0 }"
}
fi # end check declare function __git_main

export -f printinfo __git_main

find . -name "*.git" -type d -prune -print0 | shuf -z | xargs -r -0 -P4 -n1 -I% bash -c '
__git_main "$@"
' "_" "%"
# end bash -c '' 循环对每个 find 出来的 路径进行 遍历的

)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 对 某个目录 下面的所有仓库执行 git gc操作
# 参数1 目录名称
function find_exec_gitgc() {
local L_ROOT_PATH="${1}"
(
[[ -n "${L_ROOT_PATH}" ]] || return 1
cd "${L_ROOT_PATH}" || return 1 # 切换到本的路径下面

if ! declare -F __git_clean > /dev/null 2>&1 ; then # 没有定义这个函数,这里才去定义一个默认的
function __git_clean(){
local LOCAL_GIT="${1}"
find $LOCAL_GIT/objects/ -type f -empty -delete -print
find $LOCAL_GIT/refs/ -type f -empty -delete -print
find $LOCAL_GIT/objects -name "tmp_*" -type f -delete -print
}
fi # end check declare function __git_clean

if ! declare -F __git_gc > /dev/null 2>&1 ; then # 没有定义这个函数,这里才去定义一个默认的
function __git_gc(){
local LOCAL_GIT="${1}"
local pack_num
local objk_num

pack_num=$(find $LOCAL_GIT/objects/pack/ -maxdepth 1 -mindepth 1 -name "*.pack" -type f|wc -l) # pack目录下面的 pack文件个数
objk_num=$(find $LOCAL_GIT/objects/ -maxdepth 1 -mindepth 1 -type d | wc -l) # objects 下面目录个数
if [[ $pack_num -gt 1 ]] || [[ $objk_num -gt 2 ]]; then
printinfo "exec ${LOCAL_GIT} git gc pack_num=${pack_num}, objk_num=${objk_num}"
git -C "$LOCAL_GIT" gc --no-prune || { # gc 失败了
if declare -F __git_gc_post > /dev/null 2>&1 ; then # 定义了这个后置 function 我们才去执行
__git_gc_post $LOCAL_GIT
fi
} # end git gc
else
printinfo "no need to exec ${LOCAL_GIT} git gc pack_num=${pack_num}, objk_num=${objk_num}"
fi
}
fi # end check declare function __git_gc

if ! declare -F __git_main > /dev/null 2>&1 ; then # 没有定义这个函数,这里才去定义一个默认的
function __git_main(){
local LOCAL_GIT="${1}"
printinfo "will run git gc for $LOCAL_GIT=[$@]"
__git_clean $LOCAL_GIT
__git_gc $LOCAL_GIT
__git_clean $LOCAL_GIT
}
fi # end check declare function __git_main

export -f printinfo __git_clean __git_gc __git_main

find . -name "*.git" -type d -prune -print0 | shuf -z | xargs -r -0 -P4 -n1 -I% bash -c '
__git_main "$@"
' "_" "%"
# end bash -c '' 循环对每个 find 出来的 路径进行 遍历的

)
}

方案5:使用GNU parallel命令控制并发数

GNU parallel命令是非常强大的并行计算命令,使用-j参数控制其并发数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#/bin/bash

all_num=10
thread_num=6

a=$(date +%H%M%S)

parallel -j 5 "sleep 1;echo {}" ::: `seq 1 10`

b=$(date +%H%M%S)

echo -e "startTime:\t$a"
echo -e "endTime:\t$b"


运行结果:
$ bash 5.sh
Academic tradition requires you to cite works you base your article on.
When using programs that use GNU Parallel to process data for publication
please cite:

O. Tange (2011): GNU Parallel - The Command-Line Power Tool,
;login: The USENIX Magazine, February 2011:42-47.

This helps funding further development; AND IT WON'T COST YOU A CENT.
If you pay 10000 EUR you should feel free to use GNU Parallel without citing.

To silence this citation notice: run 'parallel --citation'.

1
2
3
4
5
6
7
8
9
10
startTime: 120630
endTime: 120632

使用 wait + 数组 来管理后台进程

最后工作中实际 采用的一个例子

这个有并行多个任务,其中一个失败了,其他的还会继续执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# ############################################################
declare -A jobs # 声明一个字典

function reap() { # 定义一个函数
local L_CMD
local L_STATUS=0

for L_PID in ${!jobs[@]};
do
L_CMD=${jobs[${L_PID}]}
wait ${L_PID}; jobs[${L_PID}]=$?
if [[ ${jobs[${L_PID}]} -ne 0 ]]; then
L_STATUS=${jobs[${L_PID}]}
echo -e "[${L_PID}] Exited with status: ${L_STATUS}, ${L_CMD}"
fi
done
return ${L_STATUS}
}
# ############################################################

{ # make android
#some comands
} & jobs[$!]="comands 1"

{ # build amss
#some comands
} & jobs[$!]="comands 2"


reap || exit 1


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
########################################################################################################################

function handle_jobs() { # 用来 wait 子进程
local -n L_PIDSMAP=$1 # 外部传入的一个字典类型,这里使用到了 local -n。
local L_COMMAND
local L_STATUS=0 # 最后方法的返回值。0 代表成功
local L_PID
for L_PID in "${!L_PIDSMAP[@]}"; do
L_COMMAND=${L_PIDSMAP[${L_PID}]} # 通过这个pid字典获取value,
wait ${L_PID} ; L_PIDSMAP[${L_PID}]=$?
if [[ ${L_PIDSMAP[${L_PID}]} -ne 0 ]]; then
L_STATUS=${L_PIDSMAP[${L_PID}]}
echo -e "[${L_PID}] Exited with L_STATUS: ${L_STATUS}, ${L_COMMAND}"
fi
done
return ${L_STATUS}
}

########################################################################################################################

function do_fast() {
local -A L_JOBS # 定义一个关联数组,也就是一个字典类型的变量。

build_android & L_JOBS[$!]="build_android"
build_amss & L_JOBS[$!]="build_amss"

echo "the children jobs: ${!L_JOBS[*]}: ${L_JOBS[*]}"
handle_jobs L_JOBS || return 1 # 这里判断 并行的任务是否都成功结束了。

# 下面可以继续其他事情
}

这里特别注意 local -A L_JOBS # 定义一个关联数组,也就是一个字典类型的变量。已经函数内部接收这个变量的地方 local -n L_PIDSMAP=$1 # 外部传入的一个字典类型。

函数内使用local或者declare来声明一个局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
declare: declare [-aAfFgilnrtux] [-p] [名称[=值] ...]
设定变量值和属性。

声明变量并且赋予它们属性。如果没用给定名称,
则显示所有变量的属性和值。

选项:
-f 限制动作或显示为只函数名称和定义
-F 限制仅显示函数名称 (以及行号和源文件名,当调试时)
-g 当用于 shell 函数内时创建全局变量; 否则忽略
-p 显示每个 NAME 变量的属性和值

设定属性的选项:
-a 使 NAME 成为下标数组 (如果支持)
-A 使 NAME 成为关联数组 (如果支持)
-i 使 NAME 带有 `integer' (整数)属性
-l 将 NAME 在赋值时转为小写
-n 使 NAME 成为指向一个以其值为名称的变量的引用
-r 将 NAME 变为只读
-t 使 NAME 带有 `trace' (追踪)属性
-u 将 NAME 在赋值时转为大写
-x 将 NAME 导出

用 `+' 代替 `-' 会关闭指定选项。

带有整数属性的变量在赋值时将使用算术估值(见
`let' 命令)

在函数中使用时,`declare' 使 NAME 成为本地变量,和 `local'
命令一致。`-g' 选项抑制此行为。

退出状态:
返回成功除非使用了无效选项或者发生错误。

使用 tail -f 来等待后台进程

1
2


总结

1
2
3
4
5
6
7
8
9
10
11
12
13
“多线程”的好处不言而喻,虽然shell中并没有真正的多线程,
但上述解决方案可以实现“多线程”的效果,重要的是,
在实际编写脚本时应有这样的考虑和实现。

另外:
方案3、4、5虽然都可以控制并发数量,但方案3显然写起来太繁琐。

方案4和5都以非常简洁的形式完成了控制并发数的效果,
但由于方案5的parallel命令非常强大,所以十分建议系统学习下。

方案3、4、5设置的并发数均为5,实际编写时可以将该值作为一个参数传入。


参考资料

http://www.cnitblog.com/sysop/archive/2008/11/03/50974.aspx

https://jerkwin.github.io/2013/12/14/Bash%E8%84%9A%E6%9C%AC%E5%AE%9E%E7%8E%B0%E6%89%B9%E9%87%8F%E4%BD%9C%E4%B8%9A%E5%B9%B6%E8%A1%8C%E5%8C%96/

https://thinkycx.me/2019-01-14-use-mulitiple-processes-to-accelerate-for-loop-in-bash.html