ソースに絡まるエスカルゴ

貧弱プログラマの外部記憶装置です。

【python】別スレッドのExceptionを検知してそのスレッドを再起動させる

 pythonの記事は色々書いてきたものの、スレッドについては今まであまり触れてきませんでした。
 自分の勉強がてら「別スレッドのExceptionを検知してそのスレッドを再起動させる」ことができたのでその備忘録です。

 では始めます。


1:スレッドの書き方
 pythonでのスレッドの書き方は以下の通りです。

・test_thread.py

import threading
from time import sleep
from datetime import datetime

def test_thread1():
    date = datetime.now().strftime("%Y%m%d_%H%M%S")
    print("thread1 start! :{}".format(date))
    sleep(5)
    date = datetime.now().strftime("%Y%m%d_%H%M%S")
    print("thread1 end!!! :{}".format(date))

def test_thread2():
    date = datetime.now().strftime("%Y%m%d_%H%M%S")
    print("thread2 start! :{}".format(date))
    sleep(3)
    date = datetime.now().strftime("%Y%m%d_%H%M%S")
    print("thread2 end!!! :{}".format(date))

def main():
    t1 = threading.Thread(target=test_thread1) # test_thread1関数をスレッドに設定
    t2 = threading.Thread(target=test_thread2) # test_thread2関数をスレッドに設定
    
    # それぞれのスレッドを開始
    t1.start()
    t2.start()

if __name__ == '__main__':
    main()

 上記のプログラムを実行するとtest_thread1の関数とtest_thread2の関数が並列に実行されますが、test_thread2の方がsleep時間が短いので先に「thread2 end!!!」が表示され、その後に「thread1 end!!!」が表示されます。

$ python test_thread.py
thread1 start! :20200513_173130
thread2 start! :20200513_173130
thread2 end!!! :20200513_173133
thread1 end!!! :20200513_173135

 また実行するスレッドをクラスにして記述することができます。

・test_class_thread.py

import threading
from time import sleep
from datetime import datetime


# 実行するスレッドのクラス
class TestThread(threading.Thread):

    # コンストラクタ(特に何もしないのであれば再定義する必要なし)
    def __init__(self, name, sleep_time):
        threading.Thread.__init__(self)
        self.name = name
        self.sleep_time = sleep_time

    # スレッド実行
    def run(self):
        date = datetime.now().strftime("%Y%m%d_%H%M%S")
        print(self.name + " start! :{}".format(date))
        sleep(self.sleep_time)
        date = datetime.now().strftime("%Y%m%d_%H%M%S")
        print(self.name + " end!!! :{}".format(date))


def main():
    # 実行するクラスのスレッドをそれぞれ作成
    th1 = TestThread("thread1", 5)
    th2 = TestThread("thread2", 3)

    # スレッドスタート
    th1.start()
    th2.start()

if __name__ == '__main__':
    main()

 実行結果は関数をスレッドにして実行した時と変わりません。ただクラス分けをしていると様々な処理を分けたり、管理しやすくなります。


2:通常スレッドとデーモンスレッド
 pythonには通常のスレッドとデーモンスレッドの2種類があります。

 通常スレッドは「メインスレッドが終了した場合でも動く」スレッドで、デーモンスレッドは「メインスレッドが終了した場合に強制的に終了する」スレッドという違いがあります。

 デーモンスレッド化するには「thread.start()」を実行する前に「thread.setDaemon(True)」と記述すればそのスレッドがデーモンスレッドになります。

 具体的な動きを見た方がわかりやすいので、まずは通常スレッドとして以下のプログラムを実行してみます。

・normal_thread.py

import threading
from time import sleep
from datetime import datetime


# 実行するスレッドのクラス
class TestThread(threading.Thread):

    # コンストラクタ(特に何もしないのであれば再定義する必要なし)
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    # スレッド実行
    def run(self):
        cnt = 0
        while cnt < 10:
            date = datetime.now().strftime("%Y%m%d_%H%M%S")
            print(self.name + " start! :{}".format(date))
            sleep(1)
            cnt = cnt + 1


def main():
    # 実行するクラスのスレッドをそれぞれ作成
    th1 = TestThread("thread1")
    th2 = TestThread("thread2")

    # デーモンスレッドにする
    # th1.setDaemon(True)
    # th2.setDaemon(True)

    # スレッドスタート
    th1.start()
    th2.start()

    while True:
        print("main function")
        sleep(1)

if __name__ == '__main__':
    main()

 上記のプログラムを実行して10回繰り返す前に「Ctrl + C」でmain関数を終了させます。
 すると実行している他のスレッドの2つが10回繰り返すまではプログラムは終了しません。これが通常スレッドです。

 次にデーモンスレッド化するためにsetDaemonのところのコメントアウトを外した以下のプログラムを実行してみます。

daemon_thread.py

import threading
from time import sleep
from datetime import datetime


# 実行するスレッドのクラス
class TestThread(threading.Thread):

    # コンストラクタ(特に何もしないのであれば再定義する必要なし)
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    # スレッド実行
    def run(self):
        cnt = 0
        while cnt < 10:
            date = datetime.now().strftime("%Y%m%d_%H%M%S")
            print(self.name + " start! :{}".format(date))
            sleep(1)
            cnt = cnt + 1


def main():
    # 実行するクラスのスレッドをそれぞれ作成
    th1 = TestThread("thread1")
    th2 = TestThread("thread2")

    # デーモンスレッドにする
    th1.setDaemon(True)
    th2.setDaemon(True)

    # スレッドスタート
    th1.start()
    th2.start()

    while True:
        print("main function")
        sleep(1)

if __name__ == '__main__':
    main()

 するとmain関数を「Ctrl + C」で終了させるのと同時に実行されている他のスレッドも強制終了します。これがデーモンスレッドです。


3:別スレッドのExceptionを検知してそのスレッドを再起動させる
 簡単にスレッドの書き方について書いてきましたが、いよいよ本題です。

 run関数の中でExceptionが発生するとスレッドが終了するのでそれを検知してスレッドを再起動させたい場合があります。

 その方法は色々あるとは思いますが、今回はQueueを使う方法を紹介します。結論からいうと。以下のような記述でできます。

・thread_exception_catch.py

import sys
import threading
import queue
import random
from time import sleep
from datetime import datetime

# 実行するスレッドのクラス
class ExcThread(threading.Thread):

    # コンストラクタ
    def __init__(self, bucket, name):
        threading.Thread.__init__(self)
        self.bucket = bucket
        self.name = name

    # スレッド実行
    def run(self):
        print(self.name + " start!")
        try:
            while True:
                random_num = random.randint(1, 10)
                date = datetime.now().strftime("%Y%m%d_%H%M%S")
                print(self.name + " :{}, {}".format(random_num, date))
                # 乱数が8より大きい場合はException
                if random_num > 8:
                    raise Exception(self.name + ' Exception!')
                sleep(1)
        except Exception:
            # Exceptionの情報をQueueに追加
            error_data = [self.name, sys.exc_info()]
            self.bucket.put(error_data)


def main():
    print("---- thread Exception Catch start ----")

    thread_name_list = ["th-1", "th-2", "th-3", "th-4"] # スレッド名のリスト
    thread_dict = dict()
    error_queue = queue.Queue()

    # スレッド名の分だけスレッド開始
    for name in thread_name_list:
        thread_start(thread_dict, name, error_queue)

    try:
        while True:
            # 無限ループは「Ctrl+C」で終了させる
            # キューからデータがなくなるまで取り出しを行う
            while not error_queue.empty():
                error_data = error_queue.get(block=False) # Queueのエラー内容を取得
                name = error_data[0]

                # 念のためスレッドの死活判定してエラーの表示+スレッド作り直して始動
                if not thread_dict[name].isAlive():
                    printException(error_data[1])
                    thread_start(thread_dict, name, error_queue)
    except KeyboardInterrupt:
        # 「Ctrl+C」を検知
        printException(sys.exc_info())
        print("---- thread Exception Catch end ----")


# スレッドを開始する関数
def thread_start(thread_dict, name, error_queue):
    thread_dict[name] = ExcThread(error_queue, name)
    # メインスレッドが終了したら他のスレッドも終了させたいのでデーモンスレッドにしている
    thread_dict[name].setDaemon(True)
    thread_dict[name].start()


# エラーの内容を表示する関数
def printException(error_data):
    exc_type, exc_obj, exc_trace = error_data
    print(exc_type, exc_obj, exc_trace)


if __name__ == '__main__':
    main()

 ソースコードを見ればわかると思いますが、共通のQueueを各スレッドの引数に設定して各スレッドの中でExceptionが発生した場合そのQueueの中にスレッド名とエラーが設定されます。そのQueueに設定されたエラーを取得して該当スレッドを再起動させるという方法を行ってます。


 以上が別スレッドのExceptionを検知してそのスレッドを再起動させる方法です。

 スレッドにあまり慣れてないので変なところがあるかとは思いますが、スレッド毎に別の処理をやらせてそこでエラーが出たら自動で再起動させるみたいなことができたので色々と使えそうです。


・参考資料