IDL 中的线程处理
原文链接: https://www.nv5geospatialsoftware.com/Learn/Blogs/Blog-Details/threaded-processing-in-idl
17800 文章评分:
5.0
IDL 中的线程处理
匿名 2014年6月16日,星期一
IDL_IDLBridge 是一个有用的功能,可以帮助您在单个 IDL 进程中执行多进程操作。虽然有一些函数利用了系统的线程池,但其中大部分是数学例程。IDL_IDLBridge 允许您利用系统上未使用的线程。首先,我们来看一段代码,它读取 IDL 安装目录中的每个文件,并计算每行的平均字符数。
compile_opt idl2
; 让我们找到 IDL 示例/数据目录内的所有文件
filepath = filepath('')
filelist = file_search(filepath,'*', /TEST_REGULAR)
cpl = 0
tic
; 循环遍历文件并计算每行字符数
foreach file, filelist do begin
lines = file_lines(file)
if lines gt 0 then begin
data = strarr(lines)
openr, lun, file, /get_lun
readf, lun, data
free_lun, lun
cpl = (total(strlen(data)) / lines)
endif
endforeach
print, cpl/n_elements(filelist)
toc
为了将此代码转换为使用 IDL_IDLBridge,我们需要将此逻辑转换为使用主从控制器范式。我们需要做的第一件事是隔离将在工作线程上完成的工作。对于这个例子,我们有一个循环,它一遍又一遍地执行相同的操作,几乎不依赖于循环外部的变量。让我们首先将该功能提取出来,放到它自己的函数中。
pro bridgeFunction, file, data
compile_opt idl2
lines = file_lines(file)
if lines gt 0 then begin
data = strarr(lines)
openr, lun, file, /get_lun
readf, lun, data
free_lun, lun
data = (total(strlen(data)) / lines)
endif else begin
data = 0
endelse
end
注意:如果您使用 IDL_IDLBridge,请确保您的函数位于您的 IDL_PATH 上。工作线程只会在 PATH 上查找函数。如果找不到,程序将失败。
接下来,让我们设置主控制器。主控制器负责确定需要多少个工作线程、如何划分工作,并在工作线程空闲时分配任务。主控制器需要做的第一件事是确定需要多少个 IDL_IDLBridge 对象并创建它们。这取决于具体系统,但一个好的起点通常是系统总线程数的一半。
; 为系统总线程数的一半创建一个桥
oBridge = objarr(!cpu.TPOOL_NTHREADS/2)
for i=0, oBridge.length-1 do begin
oBridge[i] = obj_new('IDL_IDLBridge', $
Callback='bridgeFunctionCallback')
oBridge[i].setProperty, userData=0
endfor
USERDATA 和 CALLBACK 用于确定哪些进程已完成执行,稍后将进行解释。下一步是设置我们的迭代。对于目录中的每个文件,我们希望告诉一个工作线程去计算每行的字符数。
while filesProcessed lt nFiles do begin
for i=0, oBridge.length-1 do begin
oBridge[i].execute, "bridgeFunction,'" + $
filelist[nextIndex] + "', data"
cpl += oBridge[i]->getVar('data')
endfor
endwhile
注意这个逻辑有一个问题。我们的代码仍然不是线程化的!虽然每个文件将在不同的线程中处理,但每个线程会在下一个线程开始前完成,从而失去了线程化设计的好处。这个问题很容易解决,只需在对 execute 的调用中添加 /NOWAIT 关键字。使用 /NOWAIT 关键字的一个后果是,我们需要负责检查确保每个桥(bridge)已完成其执行。幸运的是,IDL_IDLBridge 对象上的 CALLBACK 可以帮助我们完成这个任务。
pro bridgeFunctionCallback, status, error, node, userdata
compile_opt idl2
node->setProperty, userData=2
end
当线程结束执行时,IDL 将调用回调,我们可以利用这一点向主控制器发出信号,表示工作线程已完成对其文件的工作并准备好处理另一个文件。在主从范式编程中需要牢记的一点是工作线程的状态。在这个例子中,我们有三种状态:准备就绪状态、运行状态和完成执行状态。我们可以在 USERDATA 字段中表示这些状态。考虑到这些状态,我们的迭代变为:
; 处理每个文件
while filesProcessed lt nFiles do begin
for i=0, oBridge.length-1 do begin
oBridge[i].getProperty, userdata=status
; 检查我们线程的状态
switch (status) of
0: begin
; 如果有工作,就分配给它
if nextIndex lt nFiles then begin
oBridge[i].setProperty, userData=1
oBridge[i].execute, "bridgeFunction,'" + $
filelist[nextIndex] + "', data",/nowait
nextIndex++
endif
break
end
2: begin
; 获取结果
filesProcessed++
cpl += oBridge[i]->getVar('data')
oBridge[i].setProperty, userData=0
break
end
else: begin
end
endswitch
endfor
endwhile
当我们还有文件要处理时,我们检查每个线程,看看是否需要分配文件给它们处理。如果一个线程完成了一个文件,我们使用 GETVAR 获取输出,并将线程设置为就绪状态,这将在循环的下一次迭代中被选中。
最后要注意的是创建 IDL_IDLBridge 的开销。如果您有一个运行时间很短的任务,通常使用单线程执行会比使用 IDL_IDLBridge 更快。然而,创造性地使用 IDL_IDLBridge 可以显著减少处理时间。在我的机器上,计算 IDL 安装目录(超过 20,000 个文件)的平均每行字符数花了 72.182 秒。使用线程化代码后,只用了 46.931 秒。这有多酷?
抢先看: 我曾见过使用类似代码处理大型数据文件的情况。每晚一个 cron 任务会启动一个 IDL 进程,查找所有新文件并进行处理。在 IDL 的下一个版本中,我们将引入 WATCHFOLDER 例程,它会监视指定文件夹内的变化,并在检测到变化时发出 CALLBACK。通过 WATCHFOLDER 和 IDL_IDLBridge,您可以创建一个线程化系统,在新文件到达时即时处理它们。
以下是所使用的文件。将每个部分复制到其各自命名的文件中,并确保将其保存在 IDL 路径的某个位置。
; bridgeFunction.pro
;-------------------
pro bridgeFunction, file, data
compile_opt idl2
lines = file_lines(file)
if lines gt 0 then begin
data = strarr(lines)
openr, lun, file, /get_lun
readf, lun, data
free_lun, lun
data = (total(strlen(data)) / lines)
endif else begin
data = 0
endelse
end
; bridgeExample.pro
;------------------
pro bridgeFunctionCallback, status, error, node, userdata
compile_opt idl2
node->setProperty, userData=2
end
;-----------------
pro bridgeexample
compile_opt idl2
tic
; 为系统线程数的一半创建桥
print, '使用 ',strtrim(!cpu.TPOOL_NTHREADS/2,2),' 个线程...'
oBridge = objarr(!cpu.TPOOL_NTHREADS/2)
for i=0, oBridge.length-1 do begin
oBridge[i] = obj_new('IDL_IDLBridge', $
callback='bridgeFunctionCallback')
oBridge[i].setProperty, userData=0
endfor
; 设置变量
filepath = filepath('')
filelist = file_search(filepath,'*',/TEST_REGULAR)
filesProcessed = 0
nextIndex=0
nFiles = n_elements(filelist)
cpl = 0
; 处理每个文件
while filesProcessed lt nFiles do begin
for i=0, oBridge.length-1 do begin
oBridge[i].getProperty, userdata=status
; 检查线程的状态
switch (status) of
0: begin
; 如果有工作,分配工作
if nextIndex lt nFiles then begin
oBridge[i].setProperty, userData=1
oBridge[i].execute, "bridgeFunction,'" + $
filelist[nextIndex] + "', data",/nowait
nextIndex++
endif
break
end
2: begin
; 获取结果
filesProcessed++
cpl += oBridge[i]->getVar('data')
oBridge[i].setProperty, userData=0
break
end
else: begin
end
endswitch
endfor
endwhile
print,'每行平均字符数:',cpl/nFiles
toc
end
; nonbridgeExample.pro
;---------------------
pro nonBridgeExample
compile_opt idl2
; 让我们找到 IDL 示例/数据目录内的所有文件
filepath = filepath('')
filelist = file_search(filepath,'*', /TEST_REGULAR)
cpl = 0
tic
; 循环遍历文件并计算每行字符数
foreach file, filelist do begin
; 执行一些处理
bridgefunction,file,data
cpl += data
endforeach
print, '每行平均字符数:', cpl/n_elements(filelist)
toc
end
干杯