跳转至

Threaded Processing In IDL

原文链接: https://www.nv5geospatialsoftware.com/Learn/Blogs/Blog-Details/threaded-processing-in-idl

17800 Rate this article:

5.0

Threaded Processing In IDL

Anonym Monday, June 16, 2014

The IDL_IDLBridge is a useful feature which helps you perform multi-process operations within a single IDL process.  While there are functions which make use of a systems thread pool, most of those functions are math routines.  The IDL_IDLBridge allows you to utilize unused threads on your system.  To start out, let's look at some code which reads in each file in IDL's installation directory and computes the average number of characters per line.

compile_opt idl2

; Let's find all files inside of IDL's examples/data directory

filepath = filepath('')

filelist = file_search(filepath,'*', /TEST_REGULAR)

cpl = 0

tic

; Loop through the files and calculate the characters per line

foreach file, filelist do begin

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

cpl = (total(strlen(data)) / lines)

endif

endforeach

print, cpl/n_elements(filelist)

toc

In order to convert this code to use the IDL_IDLBridge, we need to covert this logic to use a master-controller paradigm.  The first thing we need to do is isolate the work which will be done on the worker threads.  For, this example, we have a loop which performs the same operation over and over with minimal dependence on variables outside of the loop.  Let's start by taking that functionality and putting it in its own function.

pro bridgeFunction, file, data

compile_opt idl2

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

data = (total(strlen(data)) / lines)

endif else begin

data = 0

endelse

end

Note: If you are using the IDL_IDLBridge make sure your functions are on your IDL_PATH.  A worker will only look for a function on the PATH.  If it can't find it, the program will fail.

Next, let's set up the master.  The master is responsible for determining how many workers are needed, how to split up the work, and giving workers work when they are free.  The first thing the master needs to do is figure out how many IDL_IDLBridge objects are needed and create them.  This is system specific, but a good place to start is typically half the total number of threads available on the system.

; Create a bridge for half the total threads on the system

oBridge = objarr(!cpu.TPOOL_NTHREADS/2)

for i=0, oBridge.length-1 do begin

oBridge[i] = obj_new('IDL_IDLBridge', $

Callback='bridgeFunctionCallback')

oBridge[i].setProperty, userData=0

endfor

The USERDATA and CALLBACK are used to determine which processes have completed execution and will be explained later.  The next step is setting up our iteration.  For each file in our directory, we want to tell a worker to count the characters per line.

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data"

cpl += oBridge[i]->getVar('data')

endfor

endwhile

Notice there is a problem with this logic.  Our code is still not threaded!  While each file will be processed in a different thread, each thread will complete before the next thread starts, thus loosing the benefit of a threaded design.  This problem is easily solved by added the /NOWAIT keyword to our call to execute.  One consequence of the /NOWAIT keyword, is that we are responsible for checking to make sure each bridge has completed its execution.  Lucky for us, the CALLBACK on the IDL_IDLBridge object can help us accomplish this. 

pro bridgeFunctionCallback, status, error, node, userdata

compile_opt idl2

node->setProperty, userData=2

end

IDL will call the callback when the thread has ended execution, we can use this to signal the master the worker has completed working on its file and is ready for another.  An important thing to keep in mind when programming in the master-worker paradigm is the state of a worker.  In this example we have three states: a ready for work state, a running state, and a finished execution state.  We can represent these states in the USERDATA field.  Accounting for these states, our iteration becomes:

; Process each file

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].getProperty, userdata=status

; Check the status of our thread

switch (status) of

0: begin

; Assign it work if there is work to be had

if nextIndex lt nFiles then begin

oBridge[i].setProperty, userData=1

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data",/nowait

nextIndex++

endif

break

end

2: begin

; Capture the results

filesProcessed++

cpl += oBridge[i]->getVar('data')

oBridge[i].setProperty, userData=0

break

end

else: begin

end

endswitch

endfor

endwhile

While we still have files to process, we check each of our threads to see if they need to be assigned a file to work on.  If a thread is done with a file, we fetch the output with GETVAR and set the thread to a ready state which will be picked up on the next iteration of the loop. 

The last thing to note is the overhead of creating an IDL_IDLBridge.  If you have a short running task, it will often be faster to execute in a single thread instead of using an IDL_IDLBridge.  However, being creative with your IDL_IDLBridge can lead to marked decrease in processing time.  On my machine, calculating the average character per line for the IDL install directory (over 20,000 files) took 72.182 seconds.  Using the threaded code, it took only 46.931 seconds.  How cool is that?

Sneak Peak:  I've seen code like this used to process large data files.  Every night a cron job would kick off an IDL process which would find all of the new files and process them.  In the next release of IDL we are introducing the WATCHFOLDER routine which will watch for changes inside of a specified folder and issue a CALLBACK when a change is noticed.  With WATCHFOLDER and IDL_IDLBridge, you could create a threaded system which would process new files when they arrived.

Below are the files used.  Copy each section into its own named file and make sure to save them somewhere on IDL's path.

; bridgeFunction.pro

;-------------------

pro bridgeFunction, file, data

compile_opt idl2

lines = file_lines(file)

if lines gt 0 then begin

data = strarr(lines)

openr, lun, file, /get_lun

readf, lun, data

free_lun, lun

data = (total(strlen(data)) / lines)

endif else begin

data = 0

endelse

end

; bridgeExample.pro

;------------------

pro bridgeFunctionCallback, status, error, node, userdata

compile_opt idl2

node->setProperty, userData=2

end

;-----------------

pro bridgeexample

compile_opt idl2

tic

; Create a bridge for half the threads on the system

print, 'Using ',strtrim(!cpu.TPOOL_NTHREADS/2,2),' threads...'

oBridge = objarr(!cpu.TPOOL_NTHREADS/2)

for i=0, oBridge.length-1 do begin

oBridge[i] = obj_new('IDL_IDLBridge', $

callback='bridgeFunctionCallback')

oBridge[i].setProperty, userData=0

endfor

; Set up our variables

filepath = filepath('')

filelist = file_search(filepath,'*',/TEST_REGULAR)

filesProcessed = 0

nextIndex=0

nFiles = n_elements(filelist)

cpl = 0

; Process each file

while filesProcessed lt nFiles do begin

for i=0, oBridge.length-1 do begin

oBridge[i].getProperty, userdata=status

; Check the status of our thread

switch (status) of

0: begin

; Assign it work if there is work to be had

if nextIndex lt nFiles then begin

oBridge[i].setProperty, userData=1

oBridge[i].execute, "bridgeFunction,'" + $

filelist[nextIndex] + "', data",/nowait

nextIndex++

endif

break

end

2: begin

; Capture the results

filesProcessed++

cpl += oBridge[i]->getVar('data')

oBridge[i].setProperty, userData=0

break

end

else: begin

end

endswitch

endfor

endwhile

print,'Average characters per line:',cpl/nFiles

toc

end

; nonbridgeExample.pro

;---------------------

pro nonBridgeExample

compile_opt idl2

; Let's find all files inside of IDL's examples/data directory

filepath = filepath('')

filelist = file_search(filepath,'*', /TEST_REGULAR)

cpl = 0

tic

; Loop through the files and calculate the characters per line

foreach file, filelist do begin   

; Perform some processing

bridgefunction,file,data

cpl += data

endforeach

print, 'Average characters per line:', cpl/n_elements(filelist)

toc

end

Cheers

The Top 5 Ways to Retrieve Remote Data in ENVI Why Hadoop is Kind of a Big Deal