使用 Spawn 和计时器管理并发异步任务
13698 评价本文:
未评分
使用 Spawn 和计时器管理并发异步任务
匿名 2015年9月28日,星期一
关于 IDL 的一个常见抱怨是其单线程特性,以及你无法通过运行单个 PRO 例程来充分利用所有 CPU。我在此并非要宣布任何新的 IDL 多线程版本,而是要讨论一位同事向我展示的一个非常酷的实现,它允许你从 IDL 分支出多个进程并管理它们的生命周期。你可能会争辩说我可以在 Python 中做到这一点,甚至在一定程度上可以通过简单的 shell 脚本来实现,但如果没有 IDL 和 ENVI 对数据格式的丰富支持,将很难实现一个通用的解决方案来检查你的输入数据并确定如何划分问题,更不用说将结果重新组装成最终的输出产品。
主要组件是 IDL 的 Spawn 过程,但其用法有些非标准。Spawn() 最常见的用法是构建一个命令字符串,然后将其传递给 Spawn(),同时带上 stdout 的位置参数,可能还有 stderr。这里的主要问题是这是一个阻塞调用,并且没有 stdin 参数,因此你最终需要将数据写入磁盘文件,并在命令字符串中添加重定向或管道。前者可以通过 NOWAIT 关键字克服,但这仅在 Windows 上有效,而且我不完全确定如何捕获 stdout 或 stderr 输出。相反,我们将使用 UNIT 关键字,这使得调用变为非阻塞,并在一个 LUN 中包装了一个双向管道,因此你可以轻松地向 stdin 发送数据并读取 stdout 的结果。当然,如果我们在 Spawn() 调用后加上一个 "while not EOF(lun)" 循环,那么我们什么也没得到,因为我们的代码仍然会有效阻塞,直到外部进程终止并且我们读取完它的 stdout。这时计时器就派上用场了,它允许我们生成多个进程,并在它们完成之前重新访问它们。因此,我的同事创建了这个 AsyncSpawner 类,它正是做这个的,允许你入队任意多的 Spawn 请求,并带有一个回调,当进程在 stdout 上输出内容时会被调用。我们使用一个类来维护所有已发出作业请求和运行进程状态的目录。
为了本文简洁,AsyncSpawn 类已被简化,通常你会希望有更多的错误检查和可配置性。"started" 布尔成员指示是否应该运行任何计时器来处理事件,这由适当命名的 Start 和 Stop 方法控制。传递给 Init() 的可配置 CONCURRENCY 参数控制可以同时运行多少进程。"queue" 成员是一个列表,按接收顺序存储作业请求。有两个哈希成员,一个用于正在运行的作业,以它们的 shell 进程 ID 为键;另一个用于查询进程 stdout 管道的计时器,以计时器 ID 为键。
客户端使用 Enqueue 来请求生成一个进程。第一个参数是你希望传递给 Spawn() 的精确字符串,STDIN 关键字设置为你想写入进程 stdin 管道的任何字符串或字符串数组。CALLBACK 关键字指定一个过程名或一个包含 JobCallback 过程的类的对象引用。无论哪种情况,回调过程都会传递两个参数:一个字符串消息和一个包含当前作业状态的字典。输入参数的值都存储在一个字典对象中(用于不区分大小写和点号访问成员),然后添加到列表成员 queue 中。
主要方法是 ExecutiveLoop,它最初由 Start 调用,随后由使用特殊用户数据值 -1 的计时器调用。如果 "started" 状态被 Stop 方法更改为 False,那么它将不执行任何操作。当 "started" 为 True 时,它将尽可能多地出队作业请求并生成它们,直到达到并发限制。然后,它将设置一个新的计时器,在 0.1 秒后调用自己。当它出队一个作业请求时,它会使用 UNIT 和 PID 关键字调用 Spawn,以便它可以分别向 stdin 写入和从 stdout 读取,同时拥有 shell 进程 ID 作为唯一标识符。一旦任何 stdin 数据被发送并刷新到 stdin,就会创建一个字典来存储作业的状态,该字典随后以进程 ID 为键添加到 "running" 成员哈希中。作业状态包括请求的完整命令字符串、stdin 内容、回调过程名/对象引用、进程的 LUN、进程的进程 ID、一个包含所有 stdout 内容的列表以及进程的开始时间。在客户端实现回调过程时,由客户端负责不更改此信息中的任何部分,否则可能会破坏程序。存储作业状态后,回调过程会以 "spawned" 消息被调用,然后启动一个计时器,使用作业的进程 ID 作为用户数据。
AsyncSpawn 类实现了一个 HandleTimerEvent 方法,计时器到期时会调用此方法。此方法接受两个参数:计时器 ID 和用户数据,用户数据要么是从 Spawn 返回的进程 ID,要么是 -1,表示应再次调用 ExecutiveLoop。虽然使用一个计时器并在所有作业 LUN 上进行选择会更简洁,但 FILE_POLL_INPUT() 方法似乎不能按我期望的方式工作,至少在 Windows 上是这样。因此,我们为每个作业 LUN 以及一个为 ExecuteLoop 使用了单独的计时器。当用户数据传入一个正值时,我们以该进程 ID 调用 PollJobStatus 方法。
PollJobStatus 方法正如其名。它从 "running" 成员获取作业状态字典,并检查该作业的 stdout LUN 是否已到达其 EOF 标记。如果是,则释放 LUN,从 "running" 中移除该作业,并以 "done" 消息和最终作业状态调用回调。如果尚未到达 EOF,则使用 FILE_POLL_INPUT() 验证管道上是否有活动,并从 stdout 读取一个字符串。该字符串被添加到作业的 stdout 状态中,然后以 "updated" 消息和当前作业状态调用回调。
这大致描述了 AsyncSpawn 的最小化实现,代码如下:
function AsyncSpawn::Init, CONCURRENCY=concurrency compile_opt idl2 self.started = !False self.concurrency = (N_Elements(concurrency) eq 1) ? (1>concurrency<20) : 1 self.timers = Hash() self.running = Hash() self.queue = List() return, 1 end
pro AsyncSpawn::Cleanup compile_opt idl2 self.Stop end
pro AsyncSpawn::Start compile_opt idl2 Timer.Block if (~self.started) then begin self.started = !True self.ExecutiveLoop endif Timer.Unblock end
pro AsyncSpawn::Stop compile_opt idl2 Timer.Block self.started = !False ; 终止现有计时器 foreach t, self.timers.Keys() do !null = Timer.Cancel(t) self.timers = Hash() ; 释放与活动作业连接的管道的 LUN foreach job, self.running, pid do Free_Lun, job.unit Timer.Unblock end
pro AsyncSpawn::Enqueue, cmd, STDIN=stdin, CALLBACK=callback compile_opt idl2 if (~ISA(callback, /SCALAR, /STRING) && ~Obj_Valid(callback)) $ then Message, 'CALLBACK must be scalar string or object' if (~ISA(stdin, /STRING)) $ then Message, 'STDIN must be string or string array' self.queue.Add, Dictionary('cmd', cmd, 'stdin', stdin, $ 'callback', callback) end
pro AsyncSpawn::ExecutiveLoop compile_opt idl2 ; 这是主循环,它运行排队的作业并管理监视作业结果的轮询计时器 while (self.started && ~self.queue.IsEmpty() && $ (self.running.Count() lt self.concurrency)) do begin req = self.queue.Remove(0) Spawn, req.cmd, UNIT=u, PID=pid, /NOSHELL, /HIDE ; 打印到管道,它将被生成的进程中的 stdin 读取 printf, u, req.stdin flush, u ; 将作业添加到运行目录 job = Dictionary('cmd', req.cmd, 'stdin', req.stdin, 'unit', u, $ 'callback', req.callback, 'pid', pid, $ 'stdout', List(), 'start', SysTime(/Seconds))
self.*running*[pid] = job
**if** (**Obj_Valid**(req.*callback*)) **then** **begin**
**Call_Method**, 'JobCallback', req.*callback*, 'spawned', job
**endif** **else** **begin**
**Call_Procedure**, req.*callback*, 'spawned', job
**endelse**
; 如果未停止,设置一个计时器来检查这个新作业
**if** (self.*started*) **then** self.*timers*[**Timer**.**Set**(**0.1**, self, pid)] = **!True**
endwhile ; 如果未停止,设置一个计时器调用我自己 if (self.started) then self.timers[Timer.Set(0.1, self, -1)] = !True end
pro AsyncSpawn::PollJobStatus, pid compile_opt idl2 job = self.running[pid] if (EOF(job.unit)) then begin Free_Lun, job.unit self.running.Remove, pid if (Obj_Valid(job.callback)) then begin Call_Method, 'JobCallback', job.callback, 'done', job endif else begin Call_Procedure, job.callback, 'done', job endelse return endif ; 检查此作业的 stdout 上是否有内容 if (File_Poll_Input(job.unit, TIMEOUT=0) gt 0) then begin s = '' readf, job.unit, s job.stdout.Add, s if (Obj_Valid(job.callback)) then begin Call_Method, 'JobCallback', job.callback, 'updated', job endif else begin Call_Procedure, job.callback, 'updated', job endelse endif ; 如果未停止,设置一个计时器重新检查此作业 if (self.started) then self.timers[Timer.Set(0.1, self, pid)] = !True end
pro AsyncSpawn::HandleTimerEvent, id, pid compile_opt idl2 self.timers.Remove, id if (pid eq -1) then begin self.ExecutiveLoop endif else begin self.PollJobStatus, pid endelse end
pro AsyncSpawn__define compile_opt idl2 !null = {AsyncSpawn, $ started: !False, $ concurrency: 0L, $ timers: Hash(), $ running: Hash(), $ queue: List() $ } end
下一步是展示这个类的实际应用。我创建了一个简单的 IDL 过程,可以使用 "idl –e" 语法从命令行运行 (https://www.nv5geospatialsoftware.com/docs/command_line_options_for.html#-e)。它叫做 ProcessRasterTile,有三个输入关键字:URI 是要加载的栅格文件名,SUB_RECT 是要处理的空间子集,INDEX 是要计算的光谱指数名称。它将启动 ENVI,加载栅格,进行空间子集处理,并计算请求的光谱指数。然后它获取一个临时文件名并将光谱指数栅格导出到该文件,并打印文件名和一个特殊的 "OUTPUT=" 前缀,这个前缀可以被 spawn 回调识别。这并不是一个真正需要并行化的过程,但这只是展示这种可能性的一个简短示例。我确实添加了一些错误处理,任何来自 API 调用的错误消息都会在返回前打印到 stdout。
pro ProcessRasterTile, URI=uri, SUB_RECT=subRect, INDEX=index
compile_opt idl2
nv = ENVI(/HEADLESS) oRaster = nv.OpenRaster(uri, ERROR=error) if (StrLen(error) gt 0) then begin print, 'Error opening raster: ' + error return endif
oSubRaster = ENVISubsetRaster(oRaster, SUB_RECT=subRect, ERROR=error) if (StrLen(error) gt 0) then begin print, 'Error subsetting raster: ' + error return endif
oIndexRaster = ENVISpectralIndexRaster(oSubRaster, INDEX=index, ERROR=error) if (StrLen(error) gt 0) then begin print, 'Error calculating spectral index: ' + error return endif
outFilename = nv.GetTemporaryFilename('dat') oIndexRaster.Export, outFilename, 'ENVI'
print, 'OUTPUT=' + outFilename nv.close end
下一部分是回调,我选择将其实现为一个类,因为我需要在所有进程完成之前维护多个进程的状态,并且我不想使用公共块变量来做到这一点。我称这个类为 SpawnCallbackHandler,因为这就是它的工作。这个类有一个布尔 "done" 状态,这在我简单的测试驱动程序中是避免垃圾回收所必需的。它还有两个哈希成员,"jobs" 用于活动作业,"finishedJobs" 用于已完成的作业。这个类的主要目的是它的 JobCallback 方法,该方法由 SpawnAsync 类调用。当它收到 "spawned" 消息时,它将新作业添加到其 "jobs" 哈希中,使用 PID 作为键。当它收到 "updated" 消息时,它更新 "jobs" 哈希中的作业字典;当它收到 "done" 消息时,它将作业字典从 "jobs" 哈希移动到 "finishedJobs" 哈希。有一些错误检查确保 PID 是唯一的且只存在于正确的哈希中。然后它检查是否没有正在运行的作业,只有已完成的作业,在这种情况下,它会调用其 AssembleFinalProduct 方法,一个更健壮的版本会将此作为一个回调过程,接收 "finishedJobs" 哈希作为输入。
AssembleFinalProduct 方法遍历所有已完成的作业,在它们的 stdout 列表中查找带有特殊 "OUTPUT=" 前缀的行。每一个都会被编目,然后可以传递给 ENVI::OpenRaster() 来加载所有的 NDVI 图块。然后我创建一个 ENVIMosaicRaster,它被导出到另一个临时文件名并打印出来,以便你知道它是什么。
function SpawnCallbackHandler::Init
compile_opt idl2 self.done = !False self.jobs = Hash() self.finishedJobs = Hash() return, 1 end
pro SpawnCallbackHandler::Cleanup compile_opt idl2 foo = 1 end
function SpawnCallbackHandler::Done compile_opt idl2 return, self.done end
pro SpawnCallbackHandler::JobCallback, msg, job compile_opt idl2 case msg.ToLower() of 'spawned': begin if (self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin Message, 'Already know about job with PID ' + StrTrim(job.pid, 2) endif self.jobs[job.pid] = job end 'updated': begin if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin Message, 'Update to unknown job with PID ' + StrTrim(job.pid, 2) endif self.jobs[job.pid] = job end 'done': begin if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin Message, 'Completion of unknown job with PID ' + StrTrim(job.pid, 2) endif self.jobs.Remove, job.pid self.finishedJobs[job.pid] = job end endcase ; 它们都完成了吗? if (self.jobs.IsEmpty() && ~self.finishedJobs.IsEmpty()) then begin self.AssembleFinalProduct endif end
pro SpawnCallbackHandler::AssembleFinalProduct compile_opt idl2 jobUrls = List() foreach job, self.finishedJobs do begin foreach line, job.stdout do begin if (line.StartsWith('OUTPUT=')) then begin jobUrls.Add, (line.Substring(7)).Trim() endif endforeach endforeach ; 启动 ENVI 并加载所有输出栅格 nv = ENVI(/HEADLESS) tiles = List() foreach url, jobUrls do begin tiles.Add, nv.OpenRaster(url) endforeach ; 对栅格进行镶嵌并导出结果 mosaic = ENVIMosaicRaster(tiles.ToArray()) outFilename = nv.GetTemporaryFilename('dat') mosaic.Export, outFilename, 'ENVI' print, outFilename nv.close self.done = !True end
pro SpawnCallbackHandler__define compile_opt idl2
!null = {SpawnCallbackHandler, $ done: !False, $ jobs: Hash(), $ finishedJobs: Hash() $ }
end
我的测试驱动程序代码如下所示。由于此示例没有任何持久性,我不得不使用 SpawnCallbackHandler::Done() 方法来使其和 AsyncSpawn 对象在所有内容完成之前保持活动状态。如果在基于小部件的应用程序中使用类似的方法,则不需要该方法。
pro test_parallelRasterTile
compile_opt idl2
filename = 'C:\Program Files\Exelis\ENVI53\data\qb_boulder_msi' subRect = [[0, 0, 511, 511], [512, 0, 1023, 511], $ [0, 512, 511, 1023], [512, 512, 1023, 1023]] index = 'NDVI'
spawner = AsyncSpawn(CONCURRENCY=4) handler = SpawnCallbackHandler()
for i = 0, 3 do begin cmd = 'C:\Program Files\Exelis\IDL85\bin\bin.x86_64\idl.exe -e ' + $ '"ProcessRasterTile, URI=''' + filename + ''', SUB_RECT=[' + $ StrJoin(StrTrim(Reform(subRect[*,i]), 2), ',') + '], INDEX=''' + index + '''"' spawner.Enqueue, cmd, STDIN='', CALLBACK=handler endfor spawner.Start while (~handler.Done()) do begin Wait, 0.5 endwhile end