Timing out Lua coroutines

Time for “that's for another post [1]”—this time, handling timeouts in Lua [2] coroutines.

So, I have this Lua coroutine running, and I want to make a DNS (Domain Name Service) request:

>
```
host = dns.request("www.conman.org.","a")
```

It takes a non-trivial amount of time—time that could be used to run other coroutines. But the operation could take longer than expected (say, two seconds, one second longer than we want to wait) or never even complete (because we lost the request packet, or the reply packet). In that case, we want to timeout the operation.

Step one—let's set a limit to how long we can run:

>
```
timeout(1) -- time out in one second
host,err = dns.request("www.conman.org.","a") -- DNS query
timeout(0) -- no longer need the timeout
```

The trick now is to implement the code behind timeout(). First we need to have some way of storing the coroutines (technically, a reference to the coroutine) that are waiting to timeout, and some easy way to determine which ones are timed out. For this, I used a binary heap [3], or techically, a “binary min heap” to store the coroutine reference. The “min” variation because the nodes are ordered by the “awake” time, and times in the future are larger (and thus, the next coroutine that will timeout next will always appear in the first spot in the binary min heap).

Along with the awake value and the coroutine reference, I have a trigger flag. This flag is important for the “cancelling a timeout” case. An earlier version of the code searched sequentially through the list, making “cancelling” an expensive operation (O(n) [4]) compared to “setting” a timeout (O(log(n))). I decided it was easier to just have a flag, and keep a secondary index, keyed by coroutine, to the correct node in the binary min tree. That way, cancelling a timeout is cheap (an amortized O(1)) with the following code:

>
```
function timeout(when)
local co = coroutine.running()
if when == 0 then
if TQUEUE[co] then -- guard to make sure we have an entry
TQUEUE[co].trigger = false -- disarm timeout trigger
end
else
TQUEUE:insert(when,co) -- otherwise, set the timeout
end
end
```

(I should note that the code I presented last Thursday was buggy [5]—I realized I wasn't keeping the invariant condition necessary for a binary min heap (child nodes have a larger value than the parent node) by setting the awake field to 0 (a child would then have a smaller value than its parent)—it didn't break the code but it did make it behave a bit oddly)

I also maintain a run queue—a list of coroutines that are ready to run, used in the main loop:

>
```
function mainloop()
local timeout = -1 -- the timeout to wait for new packets
local now = clock.get()
while #TQUEUE > 0 do -- check timeout queue
local obj = TQUEUE[1] -- next coroutine to timeout
if obj.trigger then -- do we trigger?
timeout = obj.awake - now -- if so, when
if timeout > 0 then -- if now now (or before)
break -- stop looking through the timeout queue
end
table.insert(RQUEUE,obj) -- insert coroutine into the run queue
end
TQUEUE:delete() -- delete the entry from the timeout queue
end
if #RQUEUE > 0 then -- if there's anything in the run queue
timeout = 0 -- we set our timeout
end
-- ---------------------------------------------------
-- Receive packets and process---see this post [6] for some details; the actual
-- code this isn't
-- ---------------------------------------------------
for _,event in ipairs(poll:events(timeout)) do
event.obj(event)
end
-- -----------------------------------------------------------
-- run each coroutine in the run queue (some details snipped)
-- -----------------------------------------------------------
while #RQUEUE > 0 do
local obj = table.remove(RQUEUE,1)
if coroutine.status(obj.co) == 'suspended' then
coroutine.resume(obj.co)
end
end
return mainloop()
end
```

The code initially sets the timeout to wait for network activity to -1, or “wait indefinitely.” It then runs through the timeout queue, checking each item to see if there's a coroutine that's triggered to run. If it comes across a coroutine that has been “disarmed” then it's simply ignored and deleted from the timeout queue. Otherwise, it checks to see if the awake time is still in the future and if so, it's done with the timeout queue; otherwise, it moves the coroutine reference to the run queue and deletes it from the timeout queue.

Eventually, it'll get out of tha loop with a timeout of “indefinite” (if there were no coroutines waiting for a timeout) or some fixed amount of time until the next coroutine times out. It then checks the run queue, and if there is anything in that list, it sets the network activity timeout to 0 (if there is no packets, return immedately).

It then checks for network packets and for each one, process the packet (which in turn might schedule more coroutines to run). After that, it then goes through the run queue, resuming each coroutine in the list. After that, it starts over again.

[1] /boston/2015/02/12.1

[2] http://www.lua.org/

[3] http://en.wikipedia.org/wiki/Binary_heap

[4] http://en.wikipedia.org/wiki/Big_O_notation

[5] /boston/2015/02/12.1

[6] /boston/2015/02/05.1

Gemini Mention this post

Contact the author