跳转至

IDL 中的线程处理

原文链接: https://www.nv5geospatialsoftware.com/Learn/Blogs/Blog-Details/threaded-processing-in-idl

17800 文章评分:

5.0

IDL 中的线程处理

匿名 2014年6月16日,星期一

IDL_IDLBridge 是一个有用的功能,可以帮助您在单个 IDL 进程中执行多进程操作。虽然有一些函数利用了系统的线程池,但其中大部分是数学例程。IDL_IDLBridge 允许您利用系统上未使用的线程。首先,我们来看一段代码,它读取 IDL 安装目录中的每个文件,并计算每行的平均字符数。

compile_opt idl2

; 让我们找到 IDL 示例/数据目录内的所有文件

filepath = filepath('')

filelist = file_search(filepath,'*', /TEST_REGULAR)

cpl = 0

tic

; 循环遍历文件并计算每行字符数

foreach file, filelist do begin

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

cpl = (total(strlen(data)) / lines)

endif

endforeach

print, cpl/n_elements(filelist)

toc

为了将此代码转换为使用 IDL_IDLBridge,我们需要将此逻辑转换为使用主从控制器范式。我们需要做的第一件事是隔离将在工作线程上完成的工作。对于这个例子,我们有一个循环,它一遍又一遍地执行相同的操作,几乎不依赖于循环外部的变量。让我们首先将该功能提取出来,放到它自己的函数中。

pro bridgeFunction, file, data

compile_opt idl2

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

data = (total(strlen(data)) / lines)

endif else begin

data = 0

endelse

end

注意:如果您使用 IDL_IDLBridge,请确保您的函数位于您的 IDL_PATH 上。工作线程只会在 PATH 上查找函数。如果找不到,程序将失败。

接下来,让我们设置主控制器。主控制器负责确定需要多少个工作线程、如何划分工作,并在工作线程空闲时分配任务。主控制器需要做的第一件事是确定需要多少个 IDL_IDLBridge 对象并创建它们。这取决于具体系统,但一个好的起点通常是系统总线程数的一半。

; 为系统总线程数的一半创建一个桥

oBridge = objarr(!cpu.TPOOL_NTHREADS/2)

for i=0, oBridge.length-1 do begin

oBridge[i] = obj_new('IDL_IDLBridge', $

Callback='bridgeFunctionCallback')

oBridge[i].setProperty, userData=0

endfor

USERDATA 和 CALLBACK 用于确定哪些进程已完成执行,稍后将进行解释。下一步是设置我们的迭代。对于目录中的每个文件,我们希望告诉一个工作线程去计算每行的字符数。

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data"

cpl += oBridge[i]->getVar('data')

endfor

endwhile

注意这个逻辑有一个问题。我们的代码仍然不是线程化的!虽然每个文件将在不同的线程中处理,但每个线程会在下一个线程开始前完成,从而失去了线程化设计的好处。这个问题很容易解决,只需在对 execute 的调用中添加 /NOWAIT 关键字。使用 /NOWAIT 关键字的一个后果是,我们需要负责检查确保每个桥(bridge)已完成其执行。幸运的是,IDL_IDLBridge 对象上的 CALLBACK 可以帮助我们完成这个任务。

pro bridgeFunctionCallback, status, error, node, userdata

compile_opt idl2

node->setProperty, userData=2

end

当线程结束执行时,IDL 将调用回调,我们可以利用这一点向主控制器发出信号,表示工作线程已完成对其文件的工作并准备好处理另一个文件。在主从范式编程中需要牢记的一点是工作线程的状态。在这个例子中,我们有三种状态:准备就绪状态、运行状态和完成执行状态。我们可以在 USERDATA 字段中表示这些状态。考虑到这些状态,我们的迭代变为:

; 处理每个文件

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].getProperty, userdata=status

; 检查我们线程的状态

switch (status) of

0: begin

; 如果有工作,就分配给它

if nextIndex lt nFiles then begin

oBridge[i].setProperty, userData=1

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data",/nowait

nextIndex++

endif

break

end

2: begin

; 获取结果

filesProcessed++

cpl += oBridge[i]->getVar('data')

oBridge[i].setProperty, userData=0

break

end

else: begin

end

endswitch

endfor

endwhile

当我们还有文件要处理时,我们检查每个线程,看看是否需要分配文件给它们处理。如果一个线程完成了一个文件,我们使用 GETVAR 获取输出,并将线程设置为就绪状态,这将在循环的下一次迭代中被选中。

最后要注意的是创建 IDL_IDLBridge 的开销。如果您有一个运行时间很短的任务,通常使用单线程执行会比使用 IDL_IDLBridge 更快。然而,创造性地使用 IDL_IDLBridge 可以显著减少处理时间。在我的机器上,计算 IDL 安装目录(超过 20,000 个文件)的平均每行字符数花了 72.182 秒。使用线程化代码后,只用了 46.931 秒。这有多酷?

抢先看: 我曾见过使用类似代码处理大型数据文件的情况。每晚一个 cron 任务会启动一个 IDL 进程,查找所有新文件并进行处理。在 IDL 的下一个版本中,我们将引入 WATCHFOLDER 例程,它会监视指定文件夹内的变化,并在检测到变化时发出 CALLBACK。通过 WATCHFOLDER 和 IDL_IDLBridge,您可以创建一个线程化系统,在新文件到达时即时处理它们。

以下是所使用的文件。将每个部分复制到其各自命名的文件中,并确保将其保存在 IDL 路径的某个位置。

; bridgeFunction.pro

;-------------------

pro bridgeFunction, file, data

compile_opt idl2

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

data = (total(strlen(data)) / lines)

endif else begin

data = 0

endelse

end

; bridgeExample.pro

;------------------

pro bridgeFunctionCallback, status, error, node, userdata

compile_opt idl2

node->setProperty, userData=2

end

;-----------------

pro bridgeexample

compile_opt idl2

tic

; 为系统线程数的一半创建桥

print, '使用 ',strtrim(!cpu.TPOOL_NTHREADS/2,2),' 个线程...'

oBridge = objarr(!cpu.TPOOL_NTHREADS/2)

for i=0, oBridge.length-1 do begin

oBridge[i] = obj_new('IDL_IDLBridge', $

callback='bridgeFunctionCallback')

oBridge[i].setProperty, userData=0

endfor

; 设置变量

filepath = filepath('')

filelist = file_search(filepath,'*',/TEST_REGULAR)

filesProcessed = 0

nextIndex=0

nFiles = n_elements(filelist)

cpl = 0

; 处理每个文件

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].getProperty, userdata=status

; 检查线程的状态

switch (status) of

0: begin

; 如果有工作,分配工作

if nextIndex lt nFiles then begin

oBridge[i].setProperty, userData=1

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data",/nowait

nextIndex++

endif

break

end

2: begin

; 获取结果

filesProcessed++

cpl += oBridge[i]->getVar('data')

oBridge[i].setProperty, userData=0

break

end

else: begin

end

endswitch

endfor

endwhile

print,'每行平均字符数:',cpl/nFiles

toc

end

; nonbridgeExample.pro

;---------------------

pro nonBridgeExample

compile_opt idl2

; 让我们找到 IDL 示例/数据目录内的所有文件

filepath = filepath('')

filelist = file_search(filepath,'*', /TEST_REGULAR)

cpl = 0

tic

; 循环遍历文件并计算每行字符数

foreach file, filelist do begin   

; 执行一些处理

bridgefunction,file,data

cpl += data

endforeach

print, '每行平均字符数:', cpl/n_elements(filelist)

toc

end

干杯

在 ENVI 中检索远程数据的 5 大方法 为什么 Hadoop 是件大事