首页 > 系统相关 >【Shell-多并发】使用Shell脚本在循环中进行多并发的操作

【Shell-多并发】使用Shell脚本在循环中进行多并发的操作

时间:2024-09-10 11:26:54浏览次数:8  
标签:脚本 文件 Shell 描述符 Dfs echo 并发 管道 date

【Shell-多并发】使用Shell脚本在循环中进行多并发的操作
1)方式一 (利用 for 循环)
1.1.案例一(缺少 wait 导致执行顺序有误)
1.2.案例二(缺少控制阻塞导致任务全部执行)
1.3.案例三(最终版)
2)方式二(利用命名管道来做任务队列)
3)Shell 脚本中 $ 的含义
1)方式一 (利用 for 循环)
1.1.案例一(缺少 wait 导致执行顺序有误)
> for i in `seq 1 10`
do
sleep 1 &; echo $i
done
echo "all weakup"

这个例子要求在 for 循环中的所有命令(sleep 1)都执行完之后,打印 “all weakup”。如果按照这段脚本,发现情况并不是这样的,因为 for 循环不会等待 sleep 命令执行结束后才结束,而是把命令提交给系统后自己就退出了,进而还没有1个 sleep 执行完毕之前,“all weakup” 就已经打印了。

1.2.案例二(缺少控制阻塞导致任务全部执行)
为了达到题目要求,需要在 echo “all weakup” 命令之前,加上 wait命令,意为等待上面所有 & 作用过的后台任务执行结束后才继续往下。

> for i in `seq 1 10`
do
sleep 1 &; echo $i
done
wait
echo "all weakup"

1.3.案例三(最终版)
上面的示例中,for 循环会将所有命令转为后台执行。显然,如果每个命令需要比较大的开销,并且循环次数太多,这个方法并不可取。那么,要求 for 循环有部分执行或循环达到一定次数后就要 wait,等待前一批次的所提交的任务执行完之后,再提交一定数量命令(再循环一定次数)后再继续 wait。虽然这种方案并不优雅,但至少不会导致一次性向系统提交过多后台任务。

degree=4
for i in `seq 1 10`
do
sleep 1 & # 提交到后台的任务
echo $i
[ `expr $i % $degree` -eq 0 ] && wait
done

上面示例,设置了一个变量 degree,用来表示并行度,在整个循环中控制阻塞的关键就是于语句 [ expr $i % $degree -eq 0 ] && wait,即在第 n 次循环时,如果 n 恰好对 degree 求模等于0时,那么循环先阻塞,等待前面 n 个后台任务执行完毕后再继续,以此类推。

2)方式二(利用命名管道来做任务队列)
cosDbName=${cosDbName}
x8vDbName=${x8vDbName}
x5lTableName=${x5lTableName}
x8vTableName=${x8vTableName}
sampleDateFile=${sampleDateFile}
parallelism=${parallelism}
start_time=`date +%s` #定义脚本运行的开始时间

[ -e /tmp/fd1 ] || mkfifo /tmp/fd1 #创建有名管道
exec 3<>/tmp/fd1 #创建文件描述符,以可读(<)可写(>)的方式关联管道文件,这时候文件描述符3就有了有名管道文件的所有特性
rm -rf /tmp/fd1 #关联后的文件描述符拥有管道文件的所有特性,所以这时候管道文件可以删除,我们留下文件描述符来用就可以了
for ((i=1;i<=${parallelism};i++))
do
echo >&3 #&3代表引用文件描述符3,这条命令代表往管道里面放入了一个"令牌"
done

for date in `cat /opt/corns/${sampleDateFile}`
do

read -u3

{
kinit -kt /opt/conf/x9e.keytab [email protected]

yarn jar /opt/corns/cos-distcp-1.12-3.1.0.jar -libjars /opt/corns/cos_api-bundle-5.6.69.jar,/opt/corns/hadoop-cos-3.1.0-8.1.7.jar \
-Dfs.cosn.credentials.provider=org.apache.hadoop.fs.auth.SimpleCredentialProvider \
-Dfs.cosn.userinfo.secretId=******************************** \
-Dfs.cosn.userinfo.secretKey=******************************** \
-Dfs.cosn.bucket.region=ap-guangzhou \
-Dfs.cosn.impl=org.apache.hadoop.fs.CosFileSystem \
-Dfs.AbstractFileSystem.cosn.impl=org.apache.hadoop.fs.CosN \
-Dmapred.job.queue.name=*** \
--bandWidth=50 \
--taskNumber=10 \
--workerNumber=1 \
--jobName=cos2hdfs-${x5lTableName}-${date} \
--skipMode=length \
--checkMode=length \
--src cosn://buckets-name/user/x5l/hive/${cosDbName}/${x5lTableName}/sample_date=${date}/ \
--dest hdfs://prdns/warehouse/tablespace/external/hive/${x8vDbName}.db/${x8vTableName}/sample_date=${date}/

hadoop distcp \
-D mapred.task.timeout=60000000 \
-D mapreduce.job.name=hdfs2s3-${x8vTableName}-${date} \
-Dmapred.job.queue.name=x9e \
-Dfs.s3a.access.key=******************* \
-Dfs.s3a.secret.key=************************************** \
-Dfs.s3a.endpoint=test01obs.gaccloud.com.cn \
-Dfs.s3a.connection.ssl.enabled=true \
-Dfs.s3a.signing-algorithm=S3SignerType \
-Dfs.s3a.ssl.channel.mode=default_jsse_with_gcm \
-direct \
-bandwidth=150 \
-m=20 \
-numListstatusThreads=40 \
hdfs://prdns/warehouse/tablespace/external/hive/${x8vDbName}.db/${x8vTableName}/sample_date=${date}/* \
s3a://buckets-name/prd/data/${x8vDbName}/${x8vTableName}/sample_date=${date}/

if [ $? -eq 0 ]; then
echo ${date}": succeed"
else
break
fi

echo >&3
} &
done
wait

stop_time=`date +%s` #定义脚本运行的结束时间

echo "TIME:`expr $stop_time - $start_time`"
exec 3<&- #关闭文件描述符的读
exec 3>&- #关闭文件描述符的写


命名管道

命名管道处理的思路:
就相当于此时有10个开水房间,配有10把钥匙,此时有100个人要打开水,那么前10个人抢到钥匙,就先进去打开水,后面的90个人就需要等前面一个人出来后还回钥匙,在拿着钥匙进去打开水,这样就能控制100个人打开水的任务,同时不会将系统的资源一次性耗用太多,增加压力,减小处理速度。

知识点:

1、命名管道的特性
如果管道内容为空,则阻塞
管道具有读一个少一个,存一个读一个的性质,放回去的可以重复取
可以实现队列控制

2、如果管道放一段内容没有人取,则会阻塞
解决上述问题,可以通过文件描述符
文件描述符具有管道的所有特性,同时还具有一个管道不具有的特性:无限存不阻塞,无限取不阻塞,无需关注管道内容

命名管道创建方式

# 1、创建命名管道
mkfifo /tmp/fl
#2、创建文件描述符100,并关联到管道文件
exec 100<>/tmp/fl
#3、调用文件描述符,向管道里存放内容,同时也表示将用完的管道内容在放回管道
echo >&100
#4、读取文件描述符关联管道中的内容
`read -u100``
#5、关闭文件描述符的读和写
exec 100<&-
exec 100>&-

3)Shell 脚本中 $ 的含义
$0 这个程式的执行名字
$n 这个程式的第n个参数值,n=1…9
$* 这个程式的所有参数,此选项参数可超过9个。
$# 这个程式的参数个数
$$ 这个程式的PID(脚本运行的当前进程ID号)
$! 执行上一个背景指令的PID(后台运行的最后一个进程的进程ID号)
$? 执行上一个指令的返回值 (显示最后命令的退出状态。0表示没有错误,其他任何值表明有错误)
$- 显示shell使用的当前选项,与set命令功能相同
@ 跟 @ 跟@跟*类似,但是可以当作数组用
————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/weixin_53543905/article/details/131420670

标签:脚本,文件,Shell,描述符,Dfs,echo,并发,管道,date
From: https://www.cnblogs.com/cheyunhua/p/18406069

相关文章

  • PARTIII-Oracle事物管理-数据并发性和一致性
    9.数据并发性和一致性本章解释了Oracle数据库如何在多用户数据库环境中维护一致性的数据。本章包含以下部分:数据并发性和一致性的介绍Oracle数据库事务隔离级别的概述Oracle数据库锁定机制的概述自动锁定的概述手动数据锁定的概述用户定义锁的概述9.1.数据并发性和一......
  • js中eval执行的脚本参数不固定,并且脚本中有return时,如何处理
    最近就遇到了用eval执行动态脚本时,拿不到return中的值的问题,如下界面:点击测试函数,进行计算,计算时遇到两点问题:1.传入的参数是动态的,如何解决变量声明问题?2.eval进行执行脚本时,拿不到return的值?现在将代码贴下,查看解决方案:functionemrscript():any{letobj={}......
  • 并发编程:线程池(上)
    一、什么是线程池?顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。二、为什么要用线程池?池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、HTTP连接池等等都是对这个思......
  • 并发编程:线程池(下)
    一、线程池常用的阻塞队列有哪些?新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。容量为Integer.MAX_VALUE的LinkedBlockingQueue(有界阻塞队列):FixedT......
  • 并发编程:ThreadLocal
    一、ThreadLocal有什么用?通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢?JDK中自带的ThreadLocal类正是为了解决这样的问题。ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类......
  • Java并发编程实战 11 | 线程活跃问题(死锁,活锁和饥饿)
    并发应用程序的“活跃度”指的是它及时执行并完成任务的能力。活跃性问题则是指程序无法最终得到预期的运行结果。相比于线程安全问题,存活性问题可能会导致更严重的后果。例如,死锁会使程序完全停滞,导致无法继续执行。常见的活跃性问题包括以下三种:1.死锁(Deadlock)死锁发......
  • linux shell一键查看服务器资源利用率之memory
    基于free-m命令实现[root@logstash~]#free-mtotalusedfreesharedbuff/cacheavailableMem:39313513346112343355Swap:396703967[root@logs......
  • 【Linux】全面讲解 Shell 变量的那些事
    本文内容均来自个人笔记并重新梳理,如有错误欢迎指正!如果对您有帮助,烦请点赞、关注、转发、订阅专栏!专栏订阅入口Linux专栏 | Docker专栏 | Kubernetes专栏往期精彩文章【Docker】(全网首发)KylinV10下MySQL容器内存占用异常的解决方法【Docker】(全网首发)Kyli......
  • 安装mayavi命令,使用cxfreeze打包python脚本
    pipinstallnumpy出现Anewreleaseofpipisavailable:23.2.1->24.2输入:python-mpipinstall--upgradepipsetuptoolswheelpipinstallPyQt5pipinstallvtkpipinstallmayavipipinstallscipypipinstallconfigobj 查看当前项目的依赖包:pipfr......
  • 几种并发模式的实现
    常见的几个需要并发的场景,具体的实现方式asyncio异步并发importasyncioasyncdeftask(id,delay):awaitasyncio.sleep(delay)returnf"Task{id}completed"asyncdefmain():tasks=[task(1,2),task(2,1),task(3,3)]done,pending=await......