OddThinking

A blog for odd things and odd thoughts.

Python Consumer Thread Shutdown Design Notes

Oh well, it worked well yesterday. Let’s try another session of rubber-ducking.

The design so far is that there is a proxy handler object that runs in the client thread(s) and a consumer thread in the background. The consumer thread monitors a Queue.Queue for work items, which is the standard Python idiom.

Ideally, the consumer thread should remain until it the clients no longer exist, then it should complete any outstanding I/O (if any) and then immediately terminate.

What’s the best way to ensure that happens?

If the project goes well, the proxy handler will be instantiated by other Python developers, who will use an existing interface, so I cannot make any assumptions that they will clean up nicely after themselves.

Let’s look at some of the options:

Daemon Threads

One option is to declare the consumer thread to be a daemon thread. If the interpreter finds that all the real threads are completed, and only daemon threads remain, it immediately terminates the daemon threads, and hence the application.

This termination appears to be with prejudice. It isn’t a graceful shutdown, but the thread just stops executing.

This is unsatisfactory, as it will terminate the consumer thread before it completes any outstanding I/O. Given logging is often the last action of a dying application, it is important to hang around for long enough to record its final words.

Unfortunately, it isn’t possible to change the daemon status on a running thread; I can’t declare it is a daemon thread now, but not when it has work to do.

Call to join()

If the client is shutting down and no longer cares about real-time behaviour, the handler object in the client thread could call queue.join() which causes it to block until all the tasks in the queue has been completed.

After that, the client thread could safely terminate. The consumer thread (declared as a daemon thread) would then shut-down when all the other threads do.

If the client calls close(), the join() could be done then. This sounds perfect.

However, if the client neglects to call close() before terminating, it causes the consumer thread to quit suddenly before the I/O is completed.

Sentinel Values

If the client can declare that it no longer requires the handler (by calling close()), then the client thread can push a sentinel value onto the queue. The client thread may then terminate immediately.

When the consumer thread (declared NOT to be a daemon thread, in this case) catches up with its backlog, and sees the sentinel value, it can shut the thread down.

Again, this is perfect, when the client calls close(). When the client neglects to call close() it causes the application to freeze up when the main thread terminates.

Destructors

Python has garbage collection. When an object is no longer required, its destructor, if any, is normally called. The destructor could call the close() method, to ensure the object is properly shut-down.

However, Python destructors are rather fragile. If the garbage collector detects a cycle, it won’t clean it up if there is an object with a destructor involved. Also, destructors are not consistently called when a thread terminates.

This was the design I had originally envisioned. My attempts to use the destructor have been only intermittently successful.

Polling

Rather than have the consumer thread block forever waiting for input, it could timeout and do some check to see if the client thread is still around.

Firstly, this has issues with how it would detect the “client thread”, which could really by many threads.

Secondly, this either is polling is either done frequently enough to make it inefficient background noise for the CPU (especially for an infrequently required service), or it is done infrequently enough that the program takes a long time to terminate, just because of the logging. Neither seems satisfactory.

Thread per Work Item

One thread could be created per transaction (e.g. emitting a message). This thread would run until the transaction was completed, and then terminate.

This strikes me as very inefficient (although it may be worth testing). In the case where the system is asked to send a flood of emails, it would take some time to allocate all of the new threads. It would also be memory and CPU intensive, meaning it may well cause other problems for the application.

Using atexit

A function may be registered with the atexit module. When the interpreter is shutting down, those functions are called.

The logging module registers a shut-down method which cleans up all handlers.

However, the atexit functions are only called when the program is terminating. The problem here is that the consumer thread is causing the program to not terminate properly, so the atexit functions are not being called.

Daemon Thread / atexit Combination

Oooh, here is an idea that came from rubber-ducking.

What if the consumer thread was a daemon thread, so it terminated suddenly, but the client thread, during its atexit-driven shutdown, went to check on the queue and processed any remaining items? That is, when the producer was finished, the consumer thread died, the producer woke up for a last ditched effort to act like a consumer to finish off any remaining tasks.

The item that was currently being processed by the consumer thread when it died would be lost. I can think of ways to reduce the size of the window of potential lossage, but not to eradicate it and not without introducing the counter-risk that an item would be processed twice.

[Stop Press: I thought of a way to guarantee at-least-once semantics. If every item was posted to the queue twice (and was guaranteed to be in immediate succession, which could be done with a semaphore), then the single consumer thread could process the first one and then discard the second one. The producer thread could process each unique one it finds. That would ensure no log message would be lost, but one might be emitted twice.]

[Stop, stop press: No, that assumes that there is only one producer-turned-consumer, but there may be many client threads shutting down.]

Multiprocessing

The multiprocessing module in Python offers a thread-like interface to processes. It might provide an interface that allowed atexit to be used.

However, as well as imposing yet more constraints on the client (e.g. the main module must be importable), it also isn’t supported by the (default) logging locking system. The provided (protected) handler is probably not process-safe, and runs the risk of corrupting the log messages.

What else?

I am out of ideas here…

… I may have to assume that the client will jump through hoops to ensure that the handler object is properly closed when the program ends, which will make this project unsuitable for a broader audience.

17 CommentsCategories: S/W Dev

Comments

  1. You can use your thread per work item in a slightly different way.

    Create a thread that exits when you’re not processing something and the queue is empty. Then start it when you get a new item to log only if it’s not running already.

    As long as you’re processing anything, you should have a thread running which will prevent your actual processing thread from being killed. When you aren’t processing, there won’t be such a thread and the program will be able to exit.

    Pseudocode:

    ConsumerThread:
    . . while (true)
    . . . . if (queue.empty)
    . . . . . . ProtectorThread.Release
    . . . . queue.GetOne
    . . . . ProtectorThread.Protect
    . . . . process item

    ProtectorThread:
    . . Protect:
    . . . . clear semaphore
    . . . . t = new Thread
    . . Thread function:
    . . . . Wait for a semaphore to be set.
    . . Release:
    . . . . set a semaphore to end the event

  2. @configurator,

    I did actually think about (but didn’t document) an approach where I’d start a consumer thread only if a consumer thread wasn’t already started, but I could see a fatal flaw in my design. I think your design is much more clever that what I came up with – it avoids both a plethora of threads and the client having to wait for them to start – but it might suffer the same flaw:

    A application that is dying, and wants to log the cause, will push an item into the queue, and terminate. There is then a race condition – will the consumer thread get a time-slice, pull the record, call the protector thread and initiate the new Thread before the interpreter realises there are only daemon threads remaining, and terminates them all?

    A way to fix this is to move the call to protect into the proxy, where it would be run from the client’s thread. Then, once again, the client needs to wait for the thread to be started before it can return. I still consider starting a thread to be expensive, but I haven’t actually profiled it for years.

  3. Starting a thread is pretty quick AFAIK. At least in .Net it is, where threads are fake 🙂

    The advantage is that the slowdown won’t be repeated with every logging request, as long as the queue is full. You could put a small delay when creating a thread in my example, to make sure that repeated logging is all done in one batch and a thread isn’t created and destroted for each call. Or perhaps a delay before releasing the thread as in:
    if (queue.empty && queue.GetOne(timeout=100)) ProtectorThread.Release.
    That way as long as the thread exists the delay on the logging caller will be minimal.

  4. Configurator,

    I believed you, but I didn’t really believe you, until I did my own (moderately dodgy) profiling test.

    Python 2.6.4 on Vista (my dev machine, typical load), creating a thread took an average of 354 microseconds.

    Python 2.6.4 on Ubuntu (target machine, minimum spec virtual private server, typical load), creating a thread took an average of 78 microseconds.

    I’ve got to get over this out-dated aversion of waiting for a thread to start.

  5. Thanks for the ideas. I think I came up with a solution.

    When creating the consumer, pass it a handle to the parent thread:

    c = Consumer(parent=threading.current_thread())

    Then, in the consumer’s run loop you can check if the parent is still alive. If the parent is not alive, and the queue is empty, terminate:


    class Consumer(threading.Thread):
    . def __init__(self, parent):
    . . threading.Thread.__init__(self)
    . . . self.parent = parent
    . . . self.q = Queue()
    . def run(self):
    . . while True:
    . . . if not self.parent.isAlive() and self.q.empty():
    . . . . break
    . . else:
    . . . pass #do stuff

    If you have multiple producers, you could have a list of parents, and require them to all be dead. The consumer could then be a singleton, which adds a parent to the list each time its constructor is called.

  6. Thanks for considering this, Smaddox.

    If the “do stuff” section of your code is blocking, waiting for input to the queue, then the consumer thread will block forever when the parent dies, waiting for input.

    If the “do stuff” section of your code is non-blocking, and the loop will occur ever when there is no input, then this becomes the same as the Polling section listed in the article above, with the same performance trade-off.

  7. In another post, I pointed out the configurator’s idea, which I eventually implemented, may lead to a thread explosion.

    In the comments, configurator expressed surprise:

    I thought my suggestion would only create a thread while there isn’t one already alive? How would you have an exploding number of threads, then? You’d just have the one thread, busy in a pop-from-queue-and-send-email loop. […] Looking at the source code I see there is only one consumer thread that will ever get started. Maybe I missed something, but I fail to see the thread explosion.

  8. Let me start with the easy part.

    In the source code available here, there is a _delegate() method that is called each time a log message is handled. Its last line is this:

    Thread(name="NonblockingLogHandler.protector",
        target=NonblockingLogHandler._protect,
        args=(self.message_queue,)).start()

    So, each call to handle a log message results in a new Thread. If it gets in a tight loop, that is lots of new threads.

    So the potential explosion is there and waiting.

    Now, the hard part…

  9. The hard part is deciding whether what I implemented should actually be attributed to configurator! In my mind, I implemented a small variation of configurator’s idea. Perhaps it was a bigger change than I realised, and configurator shouldn’t be blamed!

    You can see in the comments above, I was clearly influenced by configurator’s suggestion that threads were cheap.

    I also went for a solution that, inspired by configurator, involves a protector thread that does nothing but stall the runtime from shutting down the program before the messages were logged.

    However, unlike configurator’s solution, which has a race condition, I have a protector thread per message, which survives until the message is emitted.

    I cannot recall why I made such a significant change to multiple protector threads, rather than a single Protector thread, except that I do remember scribbling on pieces of paper trying to make sure I can covered all the potential race conditions that were sprinkled around. I convinced myself that the solution I adopted wasn’t going to have any such race conditions.

    So, in conclusion:

    Credit to configurator for the idea of fast threads and protector threads.

    Credit/blame to me for the idea of thread-per-log-message.

    And I am still open to better solutions, but be aware that this area is trickier than it looks.

  10. Ah, I see, it’s the protector threads!
    Could you point out if there’s a race condition I’m missing in this pseudo-code example:


    ConsumerThread:
    . . while (true)
    . . . . if (queue.empty)
    . . . . . . lock (locker)
    . . . . . . . . if (queue.empty)
    . . . . . . . . . . ProtectorThread.Release
    . . . . queue.GetOne
    . . . . process item

    ProtectorThread:
    . . Protect:
    . . . . clear semaphore
    . . . . t = new Thread
    . . Thread function:
    . . . . Wait for a semaphore to be set.
    . . Release:
    . . . . set a semaphore to end the event

    LogFunction:
    . . lock (locker)
    . . . . ProtectorThread.Protect
    . . . . queue.Put(item)

    Assume the lock function works like C#’s lock. The LogFunction will be quick except in the case of multiple run-away thread emitting log messages in an infinite loop. This is similar to the problem in your current design, but won’t gobble up system resources – it would just slow down the logging function (because the locker is saturated)

  11. Also, doesn’t python have a Tasks or ThreadPool library?

  12. I appreciate your continued interest. It is a tougher nut to crack than it should be.

    Your pseudo-code is very similar in many ways to the actual implementation I have used (although I have used some language features, such as Queue.queue.join() that produces the same effect as your semaphore, without explicitly requiring one.

    I believe my implementation and this one suffer the same downside: When you get multiple calls to the log function, it makes multiple calls (sequential, due to your lock statement) to the Protect method, which creates a new thread for each.

    Now, the protect threads are consuming minimal system resources each, and, as proved before, can be created quickly, but enough of them being created quickly could theoretically cause problems. I’m sorry but I don’t see how this improves on the actual implementation.

  13. Does Python have Tasks or ThreadPool libraries?

    So many different ones, it makes your eyes water.

    The basic building blocks are threading.Thread and queue.Queue, which together normally give you most of what you need.

    What seems to be special with this logging application, is that the client never wants to see the result of the processing, and never closes down the pool cleanly, making the timing of the pool shutdown more ambiguous than it normally would be.

    Multiprocessing is another built-in library, offering pools, but it is about processes, not threads, and therefore has larger overhead.

    Many third-party libraries go the next step to offer process pools across multiple machines, using your favourite messaging stack.

    Some smaller third-party threadpool libraries are available, such as this one that I believe abandons work when the client shuts-down, or this one which requires the client to explicitly close up the threadpool resources.

    I haven’t found anything that has low-overhead and can handle shut-down of the client without notice, and then close itself down when it has finished the processing.

  14. Sorry, I was thinking one thing and typing another. My example doesn’t improve anything, in fact. Let’s make a small change to the protect method.


    ConsumerThread:
    . . while (true)
    . . . . if (queue.empty)
    . . . . . . lock (locker)
    . . . . . . . . if (queue.empty)
    . . . . . . . . . . ProtectorThread.Release
    . . . . queue.GetOne
    . . . . process item

    ProtectorThread:
    . . Protect:
    . . . . if (semaphore is set)
    . . . . . . clear semaphore
    . . . . . . t = new Thread
    . . Thread function:
    . . . . Wait for a semaphore to be set.
    . . Release:
    . . . . set a semaphore to end the event

    LogFunction:
    . . lock (locker)
    . . . . ProtectorThread.Protect
    . . . . queue.Put(item)

    Now, only one thread will ever exist. This was the entire point of the semaphore (and why you can’t use queue.join()) here – I just forgot to write down that line.

    Note that since protect-and-insert-to-queue is inside a lock, and check-queue-is-empty-and-release is in another, we should be race-condition free, I think.

  15. Nice. I stared at it for a while, and it looks good to me.

  16. I had a play today with this suggestion. I wanted to use this technique, but I wasn’t entirely happy with the encapsulation – detailed knowledge about the solution has been spread over three different parts of the code.

    I finally had a breakthrough.

    I now have a subclass of queue.Queue, called autoclosingqueue.Queue. It encapsulates the semaphore, the lock and the protector thread away from both the producer and the consumer.

    There is a requirement that the consumer call task_done(), which is an existing method on the queue.Queue object. Normally, it is optional to call this, depending on your design. With the new class, it is mandatory, so that the Protector thread can be notified when it is time to shut-down.

    It is a reusable class for anyone who has this same, rather unusual, problem with consumer threads.

    I will issue a new version of the NonblockingHandler containing this class, in due course.

  17. Excellent. Let me know how it goes when you’ve got the new version ready.

Leave a comment

You must be logged in to post a comment.