Logging when using multiple CPUs

The problem

I have recently worked a lot with large social media datasets (e.g. in my project on social media discourse after refugee arrival). A common issue is that there is so much data that you cannot actually work with in on your own computer but need to work with High-performance computing (HPC) providers, such as AWS or Cineca. Typically, your connection to the HPC is made with your terminal and especially when running long tasks, you may want to receive intermediate information about how things are going. This is where logging comes in. Logging in python isquite easy thanks to the built-in Logging module.

import logging
logger = logging.getLogger(__name__)
logging.basicConfig(filename='logfile.log', level=logging.INFO)
logger.info('Some logging info.')

But things become more difficult when you are using multiple CPUs to run your code using e.g. multiprocessing. Optimally, we might like to save all the logs from different processes (i.e. CPUs) into the same logfile. But unfortuantely this is not supported in python, because when multiple CPUs would try to write into the same file, there may be conflicts.

From the python docs

Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python.

Therefore, we will have to use a workaround.

The solution: A logging process

The TL,DR solution to the problem is to create an extra process which collects all the logs from all other processes and then logs all these into a single file. To do so, we can make use of pythons utilities for communicating between processes, specifically, we will use the Manager() class of the multiprocessing module, which allows sharing a Queue() between processes, that we can use for communication.

Setup multiprocessing

Let’s start by setting up a basic multiprocessing task by importing the required modules

import logging 
import logging.handlers
import multiprocessing as mp
from time import sleep

For this example, we will let our workers perform some simple multiplication and then wait a while.

def worker_function(num1,num2):
    result = num1*num2
    sleep(1)
    return result

def main():
    args = [(1,2),(4,5),(3,6),
            (12,2),(22,2),(11,11)] #the numbers we want to multiply
    with mp.Pool(3) as pool:
        # Execute the tasks using pool.startmap_async()
        result_async = pool.starmap_async(worker_function, args)
        # Wait for the async results
        result_async.wait()
        # Get the results from the async result object
        results = result_async.get()
    print(results)

if __name__=='__main__':
    main()

Here is the output:

[2, 20, 18, 24, 44, 121]

Logging listener

Perfect. With the basic framework down, lets get to logging. We will start with a listener function, that will run on a seperate process and collect all the logs and write them to a file.

def logging_listener(queue,filepath,level=logging.INFO):
    logger = logging.getLogger()
    logger.setLevel(level)
    fh = logging.FileHandler(filepath,"a")
    fo = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    fh.setFormatter(fo)
    logger.addHandler(fh)

    while True:
        record = queue.get()
        if record is None:
            break
        logger.log(record[0],record[1])

The listener receives a Queue() object, which is a shared object that we will define later. The queue can share basic python objects. For us, the queue will contain a tuple, consisting of the level at which we would like to log the message and the message we would like to log. The second input is a filepath, where we would like out log to be created and finally the level at which we would like to log, the default here is logging.INFO. The function then sets up a simple logger, with a FileHandler() and Formatter(). The actual logging is done in the while loop. First, we get the first record in the queue. The get() method, works similar to pop() for lists, it returns the first entry and removes it from the queue. We then check if the record is None in which case we stop the look, else, we log the record.

We can now adjust our main() function:

def main(log_file):
    m = mp.Manager()
    queue = m.Queue(-1)
    listener = mp.Process(target=logging_listener,
                          args=(queue,log_file,logging.INFO,))
    listener.start() # Start the listener process

    args = [(1,2,queue),(4,5,queue),(3,6,queue),
            (12,2,queue),(22,2,queue),(11,11,queue)]
            #the numbers we want to multiply and the queue
    with mp.Pool(3) as pool:
        # Execute the tasks using pool.startmap_async()
        result_async = pool.starmap_async(worker_function, args)
        # Wait for the async results
        result_async.wait()
        # Get the results from the async result object
        results = result_async.get()

    queue.put_nowait(None)
    listener.join()

    print(results)

What did we change? First, we added a manager that holds the queue that we will share with the workers. Next, we create a new process which will run the logging_listener() function which we pass the queue, the log_file and the level at which we want to log. The next line starts the process. We also adjusted the arguments that we wnat to pass to our worker function. We added the queue to every set of arguments. Therefore we will also need to adjust the worker function. But lets go step by step. The final two new lines of the main() have the purpose of stopping the listener process onces everything else is done. We put None into the queue. This is because we have earlier decided that the while loop in the logging_listener() will stop when it encounters None.

Logging inside the workers

Lets now adjust the worker_function().

def worker_function(num1,num2,queue):
    queue.put((logging.INFO,f"Multiplying {num1} by {num2}"))
    result = num1*num2
    sleep(1)
    queue.put((logging.INFO,f"Result of {num1} * {num2} is {result}"))
    return result

The adjusted worker_function() now also takes the queue as input. To log anything we can just use queue.put(), to keep with the listener function, we should pass a tuple, first the level at which we want to log and then the message.

A slightly more fancy version

We can improve the solution a little bit by creating a class that we can use as a logger.

class MPLogger():
    def __init__(self,queue):
        self.queue = queue
    
    def info(self,message):
        self.queue.put_nowait((logging.INFO,message))
    
    def warning(self,message):
        self.queue.put_nowait((logging.WARNING,message))    
    
    def debug(self,message,nobase=False):
        self.queue.put_nowait((logging.DEBUG,message))

The worker_function() would then simply look as follows:

def worker_function(num1,num2,queue):
    logger=MPLogger(queue)
    logger.info(f"Multiplying {num1} by {num2}")
    result = num1*num2
    sleep(1)
    logger.info(f"Result of {num1} * {num2} is {result}")
    return result

Using the MPLogger() class allows us to use the logging syntax we are used to while still logging to only one file.