sqliteをインメモリで使ってみた
やりたいことは以下。全部できた。
インメモリDBは早い。
- インメモリで動かす
- 日付、日時を格納する
- 日時、日時の新しいN件だけをDB内に維持する
- 日付、日時の新しいM件を取り出す
import sqlite3 import time import datetime start=time.time() con = sqlite3.connect(":memory:") # ファイルの場合は280,000件で20秒 #con = sqlite3.connect("sqlitedb.db") # ファイルの場合は1000件で20秒 con.isolation_level = None # None で自動コミットモード cur = con.cursor() # Create table cur.execute('''CREATE TABLE stocks (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''') for i in range(100): # Insert a row of data now = datetime.datetime.now() cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i))) time.sleep(0.01) # Save (commit) the changes # con.commit() elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") # 新しいほうから3件だけを表示する。 for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"): print(row) print("-------------------------------------------") # レコード数を最新の6件だけ残して、古い行を削除する cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)") elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") for row in cur.execute("SELECT * From stocks"): print(row) # We can also close the connection if we are done with it. # Just be sure any changes have been committed or they will be lost. con.close()
実行結果
C:\Python34>C:\Python34\inmemorysqlite.py elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ------------------------------------------- elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94) ('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95) ('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
ログをディスクに吐いてるようなので、それもメモリにすればもうちょっと早くなる??未評価。
dbconnection =sqlite3.connect(":memory:", check_same_thread = False) dbconnection.isolation_level = None dbcursor = dbconnection.cursor() dbcursor.execute('PRAGMA temp_store=MEMORY;') dbcursor.execute('PRAGMA journal_mode=MEMORY;')
リングバッファと計算
# -*- coding: utf-8 -*- class RingBuffer(): def __init__(self,bufferSize): self.size = bufferSize self.pc= 0 #次に使うバッファの番号 self.data=[None]*self.size #固定長さのリスト self.valid=False #リングバッファがいっぱいになったらTrueにする。 def push(self,value): print("pc=",self.pc) self.data[self.pc]=value self.pc=self.pc+1 if self.pc==self.size: self.pc=0 self.valid=True def max(self): if self.valid: return max(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return max(self.data[0:self.pc-1]) def min(self): if self.valid: return min(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return min(self.data[0:self.pc-1]) def average(self): if self.valid: return sum(self.data)/self.size else: if self.pc==0: return None else: print("pc=",self.pc) return sum(self.data[0:self.pc-1])/self.size def dump(self): return self.data def main(): rb=RingBuffer(5) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(2) rb.push(3) rb.push(4) print(rb.dump()) print("max=",rb.max()) #バッファ内の最大を返す print("min=",rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(1) rb.push(2) rb.push(2) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す print(rb.average()) #バッファ内の最小を返す if __name__== '__main__': main()
multiprocessingのサンプルコード
multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。
# -*- coding: utf-8 -*- from multiprocessing import Pool import time import os def f(x): print("f(x)",os.getpid()) time.sleep(1) print(x) return x*x class MyClass(): def __init__(self): print("init MyClass") self.foo=1 #共通の情報 def g(self,x): print("MyClass g(x)",os.getpid()) time.sleep(1) #共通の情報にアクセスできる?→できた。 print(x,self.foo) #共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。 self.foo=2 return x*x+self.foo if __name__ == '__main__': print("main()",os.getpid()) myclass=MyClass() #普通の関数を実行 with Pool(2) as pool: multiple_results = [pool.apply_async(f, (i,)) for i in range(5)] print([res.get() for res in multiple_results]) print("----------------------------") #クラスの中の関数を実行 #新しいpoolにしてるのでプロセスIDは変わる。 with Pool(2) as pool: multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)] print([res.get() for res in multiple_results])
実行結果
main() 6120 init MyClass f(x) 4252 f(x) 2784 0 f(x) 4252 1 f(x) 2784 2 f(x) 4252 3 4 [0, 1, 4, 9, 16]
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"
threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。
"TypeError: function1() got multiple values for argument 'arg1'"
原因はよくわからない。
とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。
# -*- coding: utf-8 -*- import threading class MyThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None): #threading.Threadとしてのコンストラクタ threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None) #threadの中で使う引数(辞書) self.kwargs = kwargs return def func1(self,*args,**kwargs): x=kwargs["kwargs"] a=x["a"] b=x["b"] c=x["c"] print("thread main fun1",a,b,c) return def run(self): #実際の処理は、func1で行う。 self.func1(kwargs=self.kwargs) return def main(): kwargs={"a":1,"b":2,"c":3} t=MyThread(kwargs=kwargs) t.start() if __name__== '__main__': main()
時刻ちょうどに実行する
プログラム
# -*- coding: utf-8 -*- import threading import datetime from time import sleep """ 1秒ごとに交互に実行する。 時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。 →時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。 """ class MyThread(threading.Thread): def __init__(self, odd = True ,second = 2): self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒) self.stop_event = threading.Event() #停止させるためのフラグ self.dt_now=datetime.datetime.utcnow() self.odd=odd if odd==True: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None) else: self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None) threading.Thread.__init__(self) #threadとしてのコンストラクタ def stop(self): #終了させるためのもの。 print("Thread Stop Event") self.stop_event.set() def run(self): while not self.stop_event.is_set(): self.dt_next = self.dt_next + self.dt_offset self.dt_now = datetime.datetime.utcnow() if self.dt_now > self.dt_next: continue #時刻になるまで待ち sleep( timedelta2sec( self.dt_next - self.dt_now ) ) #実行したい内容をここに記述 print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow()) print("thread end") return def timedelta2sec(t): return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000 if __name__ == '__main__': x=MyThread(second=2, odd=True) y=MyThread(second=2, odd=False) x.start() y.start() #15秒したら終了させる。 sleep(15) x.stop() y.stop() #スレッドの終了まち x.join() y.join() print("プログラム終了")
実行結果
python.exe timerThreadTest.py Thread-2 2 False 2017-09-06 05:13:45.017042 Thread-1 2 True 2017-09-06 05:13:46.021042 Thread-2 2 False 2017-09-06 05:13:47.020042 Thread-1 2 True 2017-09-06 05:13:48.008042 Thread-2 2 False 2017-09-06 05:13:49.005042 Thread-1 2 True 2017-09-06 05:13:50.018042 Thread-2 2 False 2017-09-06 05:13:51.012042 Thread-1 2 True 2017-09-06 05:13:52.029042 Thread-2 2 False 2017-09-06 05:13:53.026042 Thread-1 2 True 2017-09-06 05:13:54.018042 Thread-2 2 False 2017-09-06 05:13:55.027042 Thread-1 2 True 2017-09-06 05:13:56.021042 Thread-2 2 False 2017-09-06 05:13:57.012042 Thread-1 2 True 2017-09-06 05:13:58.022042 Thread-2 2 False 2017-09-06 05:13:59.027042 Thread Stop Event Thread Stop Event Thread-1 2 True 2017-09-06 05:14:00.016042 thread end Thread-2 2 False 2017-09-06 05:14:01.007042 thread end プログラム終了
QueueをつかったPython マルチスレッド
マルチスレッドのテストプログラム。
Listをもらって、加算して、結果をグローバルのリストに書き込む。
import threading import queue import time commonList=[] q=queue.Queue() def worker(): """ マルチスレッドで走らせる関数 Queueからデータをもらう。 結果をcommonListに書き込む 計算内容や書き込む場所はqに書いてある """ global q global commonList print("thread Start") while True: time.sleep(0.1) #Ctrl+Cで終了させるおまじない。 item = q.get() if item is None: print("thread None") break commonList.append((item.ID,sum(item.values))) q.task_done() print("thread End") def main(): global q global commonList num_worker_threads=3 a=Job([1,2,3],0) b=Job([3,4,5],1) c=Job([6,7,8],2) d=Job([9,0,1],3) e=Job([2,3,4],4) f=Job([5,6,7,8],5) threadItem=[a,b,c,d,e,f] threads=[] #スレッドを立てる。作業はあとでQueueで渡す。 for i in range(num_worker_threads): t = threading.Thread(target=worker) t.start() threads.append(t) #itemを投入。データを渡す。 for item in threadItem: q.put(item) # block until all tasks are done qが空になるのを待っている。 q.join() #スレッド停止命令(None)の投入 for i in range(num_worker_threads): q.put(None) #スレッドの終了まち for t in threads: t.join() #結果の表示 print(commonList) class Job(): #xはリスト,yは書き込み先のリストのインデックス def __init__(self,x,y): self.values=x.copy() self.ID=y if __name__ == '__main__': main()
結果
C:\Python34>python multiThreadTest.py thread Start thread Start thread Start thread None thread End thread None thread End thread None thread End [(0, 6), (1, 12), (2, 21), (3, 10), (4, 9), (5, 26)]