Python キルスレッド
- Python でスレッドを強制終了するには、スレッドで例外を発生させます
- 
          
            Python で traceを使用してスレッドを強制終了する
- Python でスレッドを強制終了するための停止フラグの作成/リセット
- 
          
            Python で multiprocessingモジュールを使用してスレッドを強制終了する
- Python で与えられたスレッドをデーモンスレッドとして設定してスレッドを強制終了する
- 
          
            Python で隠し _stop()関数を使用してスレッドを強制終了する
 
プログラマーの間では悪いプログラミング慣行としてフラグが立てられていますが、Python ではスレッドを強制終了する必要がある場合があります。このチュートリアルでは、Python でスレッドを強制終了するためのさまざまな方法を示します。
脅威に突然終止符を打つことの欠点は、タスクをバックグラウンドで開いたままにして、問題を引き起こす可能性があります。
さらに、Python は、Python でスレッドを直接強制終了する手段を提供していません。つまり、この重要なタスクを実装するための抜け穴や間接的な方法を見つけることを意味します。
ここでは、Python でスレッドを強制終了するいくつかの方法に焦点を当てて説明します。
Python でスレッドを強制終了するには、スレッドで例外を発生させます
このメソッドは、PyThreadState_SetAsyncExc() 関数を利用します。この関数は、指定されたスレッドで非同期的に例外を発生させます。
次のコードは、Python でスレッドを強制終了するために、スレッドで例外を発生させます。
import threading
import ctypes
import time
class twe(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        try:
            while True:
                print("running " + self.name)
        finally:
            print("ended")
    def get_id(self):
        if hasattr(self, "_thread_id"):
            return self._thread_id
        for id, thread in threading._active.items():
            if thread is self:
                return id
    def raise_exception(self):
        thread_id = self.get_id()
        resu = ctypes.pythonapi.PyThreadState_SetAsyncExc(
            thread_id, ctypes.py_object(SystemExit)
        )
        if resu > 1:
            ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
            print("Failure in raising exception")
x = twe("Thread A")
x.start()
time.sleep(2)
x.raise_exception()
x.join()
コードが実行されると、例外が発生するとすぐに、プログラム制御が例外ハンドラーの try ブロックをバイパスできるため、run() 関数が強制終了されます。
次に、join() 関数が呼び出されて、最後の打撃を与え、run() 関数を強制終了します。
Python で trace を使用してスレッドを強制終了する
Python でスレッドを強制終了する同じタスクを実装する別の方法は、指定されたスレッドに trace をインストールして、スレッドの実行を変更することです。
次のコードは、トレースを使用して Python でスレッドを強制終了します。
import sys
import time
import threading
import trace
class KThread(threading.Thread):
    def __init__(self, *args, **keywords):
        threading.Thread.__init__(self, *args, **keywords)
        self.killed = False
    def start(self):
        self.__run_backup = self.run
        self.run = self.__run
        threading.Thread.start(self)
    def __run(self):
        sys.settrace(self.globaltrace)
        self.__run_backup()
        self.run = self.__run_backup
    def globaltrace(self, frame, why, arg):
        if why == "call":
            return self.localtrace
        else:
            return None
    def localtrace(self, frame, why, arg):
        if self.killed:
            if why == "line":
                raise SystemExit()
        return self.localtrace
    def kill(self):
        self.killed = True
def exfu():
    print("The function begins")
    for i in range(1, 100):
        print(i)
        time.sleep(0.2)
    print("The function ends")
x = KThread(target=exfu)
x.start()
time.sleep(1)
x.kill()
上記のコードは次の出力を提供します。
The function begins
1
2
3
4
5
ここでは、元の threading.Thread クラスのサブセットである KThread クラスを利用します。KThread クラスは、kill() 関数をコードに実装します。
Python でスレッドを強制終了するための停止フラグの作成/リセット
コードで停止フラグを宣言できます。これにより、スレッドが停止したときにスレッドの実行が停止します。
次のコードは、Python でスレッドを強制終了するための停止フラグを作成します。
import threading
import time
def frun():
    while True:
        print("thread running")
        global stop_threads
        if stop_threads:
            break
stop_threads = False
x = threading.Thread(target=frun)
x.start()
time.sleep(1)
stop_threads = True
x.join()
print("killed the thread.")
Python で multiprocessing モジュールを使用してスレッドを強制終了する
multiprocessing モジュールを使用すると、プロセスを生成できます。プロセスと動作は、どちらも API を使用するため、threading モジュールと同様です。
terminate() は特定のプロセスを強制終了できます。これは、スレッド自体を強制終了するよりも比較的安全で複雑ではありません。
次のコードは、multiprocessing モジュールを使用して Python でスレッドを強制終了します。
import multiprocessing
import time
def cp():
    while True:
        for i in range(20):
            print("Process: ", i)
            time.sleep(0.05)
x = multiprocessing.Process(target=cp)
x.start()
time.sleep(0.5)
x.terminate()
print("Terminated the child process")
Python で与えられたスレッドをデーモンスレッドとして設定してスレッドを強制終了する
デーモンスレッドは、メインプログラムが終了すると自動的に強制終了されるスレッドです。特定のスレッドをデーモンスレッドとして設定して、Python で特定のスレッドを強制終了できます。
次のコードは、指定されたスレッドをデーモンスレッドとして設定し、Python でスレッドを強制終了します。
import threading
import time
import sys
def exfu():
    while True:
        time.sleep(0.5)
        print("Thread alive, but it will die on program termination")
x = threading.Thread(target=exfu)
x.daemon = True
x.start()
time.sleep(2)
sys.exit()
Python で隠し _stop() 関数を使用してスレッドを強制終了する
文書化されていませんが、非表示の _stop() 関数は、Python でスレッドを強制終了するタスクを実装できます。
次のコードは、非表示の _stop() 関数を使用して Python のスレッドを強制終了します。
import time
import threading
class th1(threading.Thread):
    def __init__(self, *args, **kwargs):
        super(th1, self).__init__(*args, **kwargs)
        self._stop = threading.Event()
    def stop(self):
        self._stop.set()
    def stopped(self):
        return self._stop.isSet()
    def run(self):
        while True:
            if self.stopped():
                return
            print("Hello, world!")
            time.sleep(1)
x = th1()
x.start()
time.sleep(5)
x.stop()
x.join()
Vaibhhav is an IT professional who has a strong-hold in Python programming and various projects under his belt. He has an eagerness to discover new things and is a quick learner.
LinkedIn