時刻ちょうどに実行する その2
マルチプロセス版の時刻ちょうどに実行する
2秒ごとと5秒ごとに実行する。ただし開始は0秒から。
import multiprocessing import datetime import time class Worker(multiprocessing.Process): def __init__(self,queue,interval=5): self.interval=interval self.q=q self.time_now=datetime.datetime.utcnow() self.time_offset=datetime.timedelta(seconds=interval) #繰り返しの間隔(秒) #秒を0にする。 self.time_next=datetime.datetime(self.time_now.year, self.time_now.month, self.time_now.day, self.time_now.hour, self.time_now.minute, second=0, microsecond=0, tzinfo=None) while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset #super(Worker, self).__init__() def timedelta2sec(self,t): return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000 def run(self): self.time_now = datetime.datetime.utcnow() while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset time.sleep(self.timedelta2sec(self.time_next-self.time_now)) while self.q.empty()==True: #qに何か入ってくれば終了する。 #今の時間を表示 self.time_now = datetime.datetime.utcnow() print(self.name,self.time_now) self.time_next = self.time_next + self.time_offset self.time_now = datetime.datetime.utcnow() while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset time.sleep(self.timedelta2sec(self.time_next-self.time_now)) return if __name__ == '__main__': q=multiprocessing.SimpleQueue() #終了フラグを入れるために使う x=Worker(q,interval=2) y=Worker(q,interval=5) x.start() y.start() time.sleep(20) print("stop process") q.put("STOP") x.join() y.join() print("end")
実行結果
>C:\Python36\python.exe C:\multiProcessTest.py Worker-1 2018-03-15 11:36:48.010984 Worker-1 2018-03-15 11:36:50.001188 Worker-2 2018-03-15 11:36:50.001188 Worker-1 2018-03-15 11:36:52.002991 Worker-1 2018-03-15 11:36:54.006795 Worker-2 2018-03-15 11:36:55.006197 Worker-1 2018-03-15 11:36:56.006599 Worker-1 2018-03-15 11:36:58.007403 Worker-2 2018-03-15 11:37:00.009206 Worker-1 2018-03-15 11:37:00.009206 Worker-1 2018-03-15 11:37:02.009010 Worker-1 2018-03-15 11:37:04.007215 Worker-2 2018-03-15 11:37:05.006616 Worker-1 2018-03-15 11:37:06.002419 stop process end
スーパークラスのコンストラクタを忘れているとこんなエラーになる。
assert self._popen is None, 'cannot start a process twice' AttributeError: 'Worker' object has no attribute '_popen'
sqliteをインメモリで使ってみた
やりたいことは以下。全部できた。
インメモリDBは早い。
- インメモリで動かす
- 日付、日時を格納する
- 日時、日時の新しいN件だけをDB内に維持する
- 日付、日時の新しいM件を取り出す
import sqlite3 import time import datetime start=time.time() con = sqlite3.connect(":memory:") # ファイルの場合は280,000件で20秒 #con = sqlite3.connect("sqlitedb.db") # ファイルの場合は1000件で20秒 con.isolation_level = None # None で自動コミットモード cur = con.cursor() # Create table cur.execute('''CREATE TABLE stocks (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''') for i in range(100): # Insert a row of data now = datetime.datetime.now() cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i))) time.sleep(0.01) # Save (commit) the changes # con.commit() elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") # 新しいほうから3件だけを表示する。 for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"): print(row) print("-------------------------------------------") # レコード数を最新の6件だけ残して、古い行を削除する cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)") elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") for row in cur.execute("SELECT * From stocks"): print(row) # We can also close the connection if we are done with it. # Just be sure any changes have been committed or they will be lost. con.close()
実行結果
C:\Python34>C:\Python34\inmemorysqlite.py elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ------------------------------------------- elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94) ('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95) ('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
ログをディスクに吐いてるようなので、それもメモリにすればもうちょっと早くなる??未評価。
dbconnection =sqlite3.connect(":memory:", check_same_thread = False) dbconnection.isolation_level = None dbcursor = dbconnection.cursor() dbcursor.execute('PRAGMA temp_store=MEMORY;') dbcursor.execute('PRAGMA journal_mode=MEMORY;')
リングバッファと計算
# -*- coding: utf-8 -*- class RingBuffer(): def __init__(self,bufferSize): self.size = bufferSize self.pc= 0 #次に使うバッファの番号 self.data=[None]*self.size #固定長さのリスト self.valid=False #リングバッファがいっぱいになったらTrueにする。 def push(self,value): print("pc=",self.pc) self.data[self.pc]=value self.pc=self.pc+1 if self.pc==self.size: self.pc=0 self.valid=True def max(self): if self.valid: return max(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return max(self.data[0:self.pc-1]) def min(self): if self.valid: return min(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return min(self.data[0:self.pc-1]) def average(self): if self.valid: return sum(self.data)/self.size else: if self.pc==0: return None else: print("pc=",self.pc) return sum(self.data[0:self.pc-1])/self.size def dump(self): return self.data def main(): rb=RingBuffer(5) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(2) rb.push(3) rb.push(4) print(rb.dump()) print("max=",rb.max()) #バッファ内の最大を返す print("min=",rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(1) rb.push(2) rb.push(2) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す print(rb.average()) #バッファ内の最小を返す if __name__== '__main__': main()
multiprocessingのサンプルコード
multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。
# -*- coding: utf-8 -*- from multiprocessing import Pool import time import os def f(x): print("f(x)",os.getpid()) time.sleep(1) print(x) return x*x class MyClass(): def __init__(self): print("init MyClass") self.foo=1 #共通の情報 def g(self,x): print("MyClass g(x)",os.getpid()) time.sleep(1) #共通の情報にアクセスできる?→できた。 print(x,self.foo) #共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。 self.foo=2 return x*x+self.foo if __name__ == '__main__': print("main()",os.getpid()) myclass=MyClass() #普通の関数を実行 with Pool(2) as pool: multiple_results = [pool.apply_async(f, (i,)) for i in range(5)] print([res.get() for res in multiple_results]) print("----------------------------") #クラスの中の関数を実行 #新しいpoolにしてるのでプロセスIDは変わる。 with Pool(2) as pool: multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)] print([res.get() for res in multiple_results])
実行結果
main() 6120 init MyClass f(x) 4252 f(x) 2784 0 f(x) 4252 1 f(x) 2784 2 f(x) 4252 3 4 [0, 1, 4, 9, 16]
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"
threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。
"TypeError: function1() got multiple values for argument 'arg1'"
原因はよくわからない。
とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。
# -*- coding: utf-8 -*- import threading class MyThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None): #threading.Threadとしてのコンストラクタ threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None) #threadの中で使う引数(辞書) self.kwargs = kwargs return def func1(self,*args,**kwargs): x=kwargs["kwargs"] a=x["a"] b=x["b"] c=x["c"] print("thread main fun1",a,b,c) return def run(self): #実際の処理は、func1で行う。 self.func1(kwargs=self.kwargs) return def main(): kwargs={"a":1,"b":2,"c":3} t=MyThread(kwargs=kwargs) t.start() if __name__== '__main__': main()
時刻ちょうどに実行する
プログラム
# -*- coding: utf-8 -*- import threading import datetime from time import sleep """ 1秒ごとに交互に実行する。 時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。 →時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。 """ class MyThread(threading.Thread): def __init__(self, odd = True ,second = 2): self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒) self.stop_event = threading.Event() #停止させるためのフラグ self.dt_now=datetime.datetime.utcnow() self.odd=odd if odd==True: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None) else: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None) threading.Thread.__init__(self) #threadとしてのコンストラクタ def stop(self): #終了させるためのもの。 print("Thread Stop Event") self.stop_event.set() def run(self): while not self.stop_event.is_set(): self.dt_next = self.dt_next + self.dt_offset self.dt_now = datetime.datetime.utcnow() if self.dt_now > self.dt_next: continue #時刻になるまで待ち sleep( timedelta2sec( self.dt_next - self.dt_now ) ) #実行したい内容をここに記述 print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow()) print("thread end") return def timedelta2sec(t): return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000 if __name__ == '__main__': x=MyThread(second=2, odd=True) y=MyThread(second=2, odd=False) x.start() y.start() #15秒したら終了させる。 sleep(15) x.stop() y.stop() #スレッドの終了まち x.join() y.join() print("プログラム終了")
実行結果
python.exe timerThreadTest.py Thread-2 2 False 2017-09-06 05:13:45.017042 Thread-1 2 True 2017-09-06 05:13:46.021042 Thread-2 2 False 2017-09-06 05:13:47.020042 Thread-1 2 True 2017-09-06 05:13:48.008042 Thread-2 2 False 2017-09-06 05:13:49.005042 Thread-1 2 True 2017-09-06 05:13:50.018042 Thread-2 2 False 2017-09-06 05:13:51.012042 Thread-1 2 True 2017-09-06 05:13:52.029042 Thread-2 2 False 2017-09-06 05:13:53.026042 Thread-1 2 True 2017-09-06 05:13:54.018042 Thread-2 2 False 2017-09-06 05:13:55.027042 Thread-1 2 True 2017-09-06 05:13:56.021042 Thread-2 2 False 2017-09-06 05:13:57.012042 Thread-1 2 True 2017-09-06 05:13:58.022042 Thread-2 2 False 2017-09-06 05:13:59.027042 Thread Stop Event Thread Stop Event Thread-1 2 True 2017-09-06 05:14:00.016042 thread end Thread-2 2 False 2017-09-06 05:14:01.007042 thread end プログラム終了