Hands-On Enterprise Application Development with Python
上QQ阅读APP看书,第一时间看更新

Thread synchronization

As we explored in the previous section, although threads can be implemented quite easily in Python, they do come with their own gotchas, which need to be taken care of when trying to write an application that is being targeted for production use cases. If these gotchas are not taken care of at the time of application development, they will produce hard-to-debug behaviors, which concurrent programs are quite famous for.

So, let's try to find out how we can work around the problem we discussed in the previous section. If we think hard, we can categorize the problem as a problem with the synchronization of multiple threads. The optimal behavior for the application would be to synchronize the writes to the file in such a way that only one thread is able to write to the file at any given point in time. This would enforce that no thread can start a write operation until one of the already-executing threads has completed its writes.

To implement such synchronization, we can leverage the power of locking. Locks provide a simple way to implement synchronization. For example, a thread that is going to start its write operation will first acquire a lock. If lock acquisition is successful, the thread can then progress to perform its write operation. Now, if a context switch happens in between and another thread is about to start a write operation, it will block, since the lock has already been taken. This will prevent the thread from writing the data in between an already-running write operation.

In Python multithreading, we can implement locks through the use of the threading.Lock class. The class provides two methods that facilitate the acquisition and release of locks. The acquire() method is called by the thread when it wants to acquire a lock before executing an operation. Once the lock is acquired, the thread continues with the execution of the operation. As soon as the operations of the threads are finished, the thread calls the release() method to release the lock such that the lock can be acquired by another thread that may be waiting for it.

Let's see how we can use locks to synchronize the threaded operations in our JSON to YAML converter example. The following code sample showcases the use of locks:

import threading
import json
import yaml

class JSONConverter(threading.Thread):
def __init__(self, json_file, yaml_file, lock):
threading.Thread.__init__(self)
self.json_file = json_file
self.yaml_file = yaml_file
self.lock = lock

def run(self):
print("Starting read for {}".format(self.json_file))
self.json_reader = open(self.json_file, 'r')
self.json = json.load(self.json_reader)
self.json_reader.close()
print("Read completed for {}".format(self.json_file))
print("Writing {} to YAML".format(self.json_file))
self.lock.acquire() # We acquire a lock before writing
self.yaml_writer = open(self.yaml_file, 'a+')
yaml.dump(self.json, self.yaml_writer)
self.yaml_writer.close()
self.lock.release() # Release the lock once our writes are done
print("Conversion completed for {}".format(self.json_file))

files = ['file1.json', 'file2.json', 'file3.json']
write_lock = threading.Lock()
conversion_threads = []

for file in files:
converter = JSONConverter(file, 'converted.yaml', write_lock)
conversion_threads.append(converter)
converter.start()

for cthread in conversion_threads:
cthread.join()

print("Exiting")

In this example, we first create a lock variable by creating an instance of the threading.Lock class. This instance is then passed to all our threads that need to be synchronized. When a thread has to do a write operation, it first proceeds by acquiring a lock and then starting the writes. Once these writes are completed, the thread releases the lock for acquisition by the other threads.

If a thread acquires a lock but forgets to release it, the program may get into a state of deadlock since no other thread will be able to proceed. Proper caution should be taken so that the acquired locks are released once the thread finishes its operations, to avoid deadlocks.