multiprocessingのサンプルコード
multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。
# -*- coding: utf-8 -*- from multiprocessing import Pool import time import os def f(x): print("f(x)",os.getpid()) time.sleep(1) print(x) return x*x class MyClass(): def __init__(self): print("init MyClass") self.foo=1 #共通の情報 def g(self,x): print("MyClass g(x)",os.getpid()) time.sleep(1) #共通の情報にアクセスできる?→できた。 print(x,self.foo) #共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。 self.foo=2 return x*x+self.foo if __name__ == '__main__': print("main()",os.getpid()) myclass=MyClass() #普通の関数を実行 with Pool(2) as pool: multiple_results = [pool.apply_async(f, (i,)) for i in range(5)] print([res.get() for res in multiple_results]) print("----------------------------") #クラスの中の関数を実行 #新しいpoolにしてるのでプロセスIDは変わる。 with Pool(2) as pool: multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)] print([res.get() for res in multiple_results])
実行結果
main() 6120 init MyClass f(x) 4252 f(x) 2784 0 f(x) 4252 1 f(x) 2784 2 f(x) 4252 3 4 [0, 1, 4, 9, 16]
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"
threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。
"TypeError: function1() got multiple values for argument 'arg1'"
原因はよくわからない。
とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。
# -*- coding: utf-8 -*- import threading class MyThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None): #threading.Threadとしてのコンストラクタ threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None) #threadの中で使う引数(辞書) self.kwargs = kwargs return def func1(self,*args,**kwargs): x=kwargs["kwargs"] a=x["a"] b=x["b"] c=x["c"] print("thread main fun1",a,b,c) return def run(self): #実際の処理は、func1で行う。 self.func1(kwargs=self.kwargs) return def main(): kwargs={"a":1,"b":2,"c":3} t=MyThread(kwargs=kwargs) t.start() if __name__== '__main__': main()
時刻ちょうどに実行する
プログラム
# -*- coding: utf-8 -*- import threading import datetime from time import sleep """ 1秒ごとに交互に実行する。 時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。 →時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。 """ class MyThread(threading.Thread): def __init__(self, odd = True ,second = 2): self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒) self.stop_event = threading.Event() #停止させるためのフラグ self.dt_now=datetime.datetime.utcnow() self.odd=odd if odd==True: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None) else: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None) threading.Thread.__init__(self) #threadとしてのコンストラクタ def stop(self): #終了させるためのもの。 print("Thread Stop Event") self.stop_event.set() def run(self): while not self.stop_event.is_set(): self.dt_next = self.dt_next + self.dt_offset self.dt_now = datetime.datetime.utcnow() if self.dt_now > self.dt_next: continue #時刻になるまで待ち sleep( timedelta2sec( self.dt_next - self.dt_now ) ) #実行したい内容をここに記述 print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow()) print("thread end") return def timedelta2sec(t): return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000 if __name__ == '__main__': x=MyThread(second=2, odd=True) y=MyThread(second=2, odd=False) x.start() y.start() #15秒したら終了させる。 sleep(15) x.stop() y.stop() #スレッドの終了まち x.join() y.join() print("プログラム終了")
実行結果
python.exe timerThreadTest.py Thread-2 2 False 2017-09-06 05:13:45.017042 Thread-1 2 True 2017-09-06 05:13:46.021042 Thread-2 2 False 2017-09-06 05:13:47.020042 Thread-1 2 True 2017-09-06 05:13:48.008042 Thread-2 2 False 2017-09-06 05:13:49.005042 Thread-1 2 True 2017-09-06 05:13:50.018042 Thread-2 2 False 2017-09-06 05:13:51.012042 Thread-1 2 True 2017-09-06 05:13:52.029042 Thread-2 2 False 2017-09-06 05:13:53.026042 Thread-1 2 True 2017-09-06 05:13:54.018042 Thread-2 2 False 2017-09-06 05:13:55.027042 Thread-1 2 True 2017-09-06 05:13:56.021042 Thread-2 2 False 2017-09-06 05:13:57.012042 Thread-1 2 True 2017-09-06 05:13:58.022042 Thread-2 2 False 2017-09-06 05:13:59.027042 Thread Stop Event Thread Stop Event Thread-1 2 True 2017-09-06 05:14:00.016042 thread end Thread-2 2 False 2017-09-06 05:14:01.007042 thread end プログラム終了
QueueをつかったPython マルチスレッド
マルチスレッドのテストプログラム。
Listをもらって、加算して、結果をグローバルのリストに書き込む。
import threading import queue import time commonList=[] q=queue.Queue() def worker(): """ マルチスレッドで走らせる関数 Queueからデータをもらう。 結果をcommonListに書き込む 計算内容や書き込む場所はqに書いてある """ global q global commonList print("thread Start") while True: time.sleep(0.1) #Ctrl+Cで終了させるおまじない。 item = q.get() if item is None: print("thread None") break commonList.append((item.ID,sum(item.values))) q.task_done() print("thread End") def main(): global q global commonList num_worker_threads=3 a=Job([1,2,3],0) b=Job([3,4,5],1) c=Job([6,7,8],2) d=Job([9,0,1],3) e=Job([2,3,4],4) f=Job([5,6,7,8],5) threadItem=[a,b,c,d,e,f] threads=[] #スレッドを立てる。作業はあとでQueueで渡す。 for i in range(num_worker_threads): t = threading.Thread(target=worker) t.start() threads.append(t) #itemを投入。データを渡す。 for item in threadItem: q.put(item) # block until all tasks are done qが空になるのを待っている。 q.join() #スレッド停止命令(None)の投入 for i in range(num_worker_threads): q.put(None) #スレッドの終了まち for t in threads: t.join() #結果の表示 print(commonList) class Job(): #xはリスト,yは書き込み先のリストのインデックス def __init__(self,x,y): self.values=x.copy() self.ID=y if __name__ == '__main__': main()
結果
C:\Python34>python multiThreadTest.py thread Start thread Start thread Start thread None thread End thread None thread End thread None thread End [(0, 6), (1, 12), (2, 21), (3, 10), (4, 9), (5, 26)]
DBに特定のデータがあるかどうかを確認する
DB内にデータがあるかどうか確認する。
あった時は更新(update)、なかった時は挿入(insert)するように処理すればよい。
cur.execute(sql,data)に渡した時にSQL文がうまく展開されない?問題で困った。
# -*- coding: utf-8 import mysql.connector import datetime config = { 'user': 'root', 'password': 'password', 'host': 'localhost', 'database':'testdb', 'charset':'utf8' } cnx = mysql.connector.connect(**config) cur = cnx.cursor(buffered=True) #tableの削除と作成 sql = 'DROP TABLE IF EXISTS TEST_UPDATE;' cur.execute(sql) cnx.commit() sql = 'CREATE TABLE IF NOT EXISTS TEST_UPDATE (\ id INT UNSIGNED NOT NULL AUTO_INCREMENT,\ time DATETIME DEFAULT NULL, \ value1 INT UNSIGNED DEFAULT NULL ,\ value2 INT UNSIGNED DEFAULT NULL ,\ primary key(id),\ unique (time));' cur.execute(sql) cnx.commit() #初期データのインサート sql = 'INSERT INTO TEST_UPDATE (time, value1,value2) VALUES (%s, %s, %s);' data= ('2017-5-01T05:50:00','1','1') cur.execute(sql, data) sql = 'INSERT INTO TEST_UPDATE (time, value1,value2) VALUES (%s, %s, %s);' data= ('2017-5-01T05:50:05','2','2') cur.execute(sql, data) cnx.commit() #この時点でのテーブルの中身 #mysql> select * from test_update; #+----+---------------------+--------+--------+ #’ id | time | value1 | value2 | #+----+---------------------+--------+--------+ #| 1 | 2017-05-01 05:50:00 | 1 | 1 | #| 2 | 2017-05-01 05:50:05 | 2 | 2 | #+----+---------------------+--------+--------+ #2 rows in set (0.00 sec) #データの有無の確認1 sql= "select exists(select * from test_update where time=%s);" #data=(datetime.datetime(2017,5,1,5,50,00),) data= ('2017-5-01T05:50:00',)#なぜかカンマが必要。 try: cur.execute(sql,data) except mysql.connector.Error as err: print(cur.statement)#実行したSQL文の確認 raise #print(cur.fetchone()) #あった時…(1,) #なかった時…(0,) if cur.fetchone()[0]==0: print("なかった") else: print("あった") #データの有無の確認2 #テーブル名をdataで渡そうとすると、クォートがついてしまってうまく動かない。 tablename='test_update' sql= "select exists(select * from " + tablename +" where time=%s);" data= ('2017-5-01T05:50:05',) try: cur.execute(sql,data) except mysql.connector.Error as err: print(cur.statement)#実行したSQL文の確認 raise if cur.fetchone()[0]==0: print("なかった") else: print("あった") #データの有無の確認3 tablename='test_update' sql= "select exists(select * from " + tablename +" where time=%s);" data= ('2017-5-01T05:50:15',) try: cur.execute(sql,data) except mysql.connector.Error as err: print(cur.statement)#実行したSQL文の確認 raise if cur.fetchone()[0]==0: print("なかった") else: print("あった") cur.close() cnx.close() print('end')
pythonのunittestのコード
ちょっと前に書いた記事compute-cucco.hatenablog.comのテストコード。というか抽象クラスは関係ないので、ただのテストコード。
raiseに対するテストコードの書き方が分からない。。。
# -*- coding: utf-8 -*- import unittest import movingCalcs from movingCalcs import factory class TestmovingCalcs(unittest.TestCase): """test class of movingCalcs.py """ def test_add(self): """test method for Add """ node = factory("Add") value1 = 1 value2 = 2 expected = 3 actual = node.calc(value1, value2) self.assertEqual(expected, actual) def test_sub(self): """test method for sub """ node = factory("Sub") value1 = 1 value2 = 2 expected = -1 actual = node.calc(value1, value2) self.assertEqual(expected, actual) def test_zero(self): """test method for Zero """ node = factory("Zero") value1 = 1 value2 = 2 expected = 0 actual = node.calc(value1, value2) self.assertEqual(expected, actual) def test_other(self): """test method for other """ with self.assertRaises(factory): node = factory("Other") if __name__ == "__main__": unittest.main()