Cucco’s Compute Hack

コンピュータ関係の記事を書いていきます。

sqliteをインメモリで使ってみた

やりたいことは以下。全部できた。
インメモリDBは早い。

  1. インメモリで動かす
  2. 日付、日時を格納する
  3. 日時、日時の新しいN件だけをDB内に維持する
  4. 日付、日時の新しいM件を取り出す

ソースコード

import sqlite3
import time
import datetime

start=time.time()

con = sqlite3.connect(":memory:")
# ファイルの場合は280,000件で20秒
#con = sqlite3.connect("sqlitedb.db")
# ファイルの場合は1000件で20秒
con.isolation_level = None # None で自動コミットモード
cur = con.cursor()

# Create table
cur.execute('''CREATE TABLE stocks
             (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''')

for i in range(100):
	# Insert a row of data
	now = datetime.datetime.now()
	cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i)))
	time.sleep(0.01)

# Save (commit) the changes
# con.commit()

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

# 新しいほうから3件だけを表示する。
for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"):
	print(row)

print("-------------------------------------------")

# レコード数を最新の6件だけ残して、古い行を削除する
cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)")

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

for row in cur.execute("SELECT * From stocks"):
	print(row)

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.

con.close()

実行結果

C:\Python34>C:\Python34\inmemorysqlite.py
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
-------------------------------------------
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94)
('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95)
('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)

ログをディスクに吐いてるようなので、それもメモリにすればもうちょっと早くなる??未評価。

    dbconnection =sqlite3.connect(":memory:", check_same_thread = False)
    dbconnection.isolation_level = None
    dbcursor = dbconnection.cursor()
    dbcursor.execute('PRAGMA temp_store=MEMORY;')
    dbcursor.execute('PRAGMA journal_mode=MEMORY;')

リングバッファと計算

# -*- coding: utf-8 -*-

class RingBuffer():
	def __init__(self,bufferSize):
		self.size = bufferSize
		self.pc= 0 #次に使うバッファの番号
		self.data=[None]*self.size #固定長さのリスト
		self.valid=False #リングバッファがいっぱいになったらTrueにする。
	
	def push(self,value):
		print("pc=",self.pc)
		self.data[self.pc]=value
		self.pc=self.pc+1
		
		if self.pc==self.size:
			self.pc=0
			self.valid=True
			
	def max(self):
		if self.valid:
			return max(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return max(self.data[0:self.pc-1])
		
	def min(self):
		if self.valid:
			return min(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return min(self.data[0:self.pc-1])
				
	def average(self):
		if self.valid:
			return sum(self.data)/self.size
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return sum(self.data[0:self.pc-1])/self.size
				
	def dump(self):
		return self.data

def main():
	rb=RingBuffer(5)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	
	rb.push(1)
	rb.push(2)
	rb.push(3)
	rb.push(4)
	print(rb.dump())
	print("max=",rb.max()) #バッファ内の最大を返す
	print("min=",rb.min()) #バッファ内の最小を返す
	rb.push(1)
	rb.push(1)
	rb.push(2)
	rb.push(2)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	print(rb.average()) #バッファ内の最小を返す
if __name__== '__main__':
	main()

multiprocessingのサンプルコード

multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。

# -*- coding: utf-8 -*-

from multiprocessing import Pool
import time
import os

def f(x):
	print("f(x)",os.getpid())
	time.sleep(1)
	print(x)
	return x*x

class MyClass():
	def __init__(self):
		print("init MyClass")
		self.foo=1 #共通の情報
	def g(self,x):
		print("MyClass g(x)",os.getpid())
		time.sleep(1)
		#共通の情報にアクセスできる?→できた。
		print(x,self.foo)
		#共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。
		self.foo=2
		return x*x+self.foo

if __name__ == '__main__':
	print("main()",os.getpid())
	myclass=MyClass()
	
	#普通の関数を実行
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(f, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])
		
	print("----------------------------")
	
	#クラスの中の関数を実行
	#新しいpoolにしてるのでプロセスIDは変わる。
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])

実行結果

main() 6120
init MyClass
f(x) 4252
f(x) 2784
0
f(x) 4252
1
f(x) 2784
2
f(x) 4252
3
4
[0, 1, 4, 9, 16]
                                                      • -
MyClass g(x) 7020 MyClass g(x) 5204 0 1 MyClass g(x) 7020 1 1 MyClass g(x) 5204 2 1 MyClass g(x) 7020 3 1 4 1 [2, 3, 6, 11, 18]

Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"

threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。

"TypeError: function1() got multiple values for argument 'arg1'"

原因はよくわからない。

とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。

# -*- coding: utf-8 -*-
import threading

class MyThread(threading.Thread):
	def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None):
		#threading.Threadとしてのコンストラクタ
		threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None)
		
		#threadの中で使う引数(辞書)
		self.kwargs = kwargs
		return
		
	def func1(self,*args,**kwargs):
		x=kwargs["kwargs"]
		a=x["a"]
		b=x["b"]
		c=x["c"]
		print("thread main fun1",a,b,c)
		return
		
	def run(self):
		#実際の処理は、func1で行う。
		self.func1(kwargs=self.kwargs)
		return

def main():
	kwargs={"a":1,"b":2,"c":3}
	t=MyThread(kwargs=kwargs)
	t.start()

if __name__== '__main__':
	main()

時刻ちょうどに実行する

プログラム

# -*- coding: utf-8 -*-
import threading
import datetime
from time import sleep

"""
1秒ごとに交互に実行する。
時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。
→時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。
"""
class MyThread(threading.Thread):
	def __init__(self, odd = True ,second = 2):
		self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒)
		self.stop_event = threading.Event() #停止させるためのフラグ
		
		self.dt_now=datetime.datetime.utcnow()
		
		self.odd=odd
		if odd==True:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None)
		else:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None)
			
		threading.Thread.__init__(self) #threadとしてのコンストラクタ
		
	def stop(self):
		#終了させるためのもの。
		print("Thread Stop Event")
		self.stop_event.set()
	
	def run(self):
		while not self.stop_event.is_set():
			
			self.dt_next = self.dt_next + self.dt_offset
			self.dt_now = datetime.datetime.utcnow()
			
			if self.dt_now > self.dt_next:
				continue
			
			#時刻になるまで待ち
			sleep( timedelta2sec( self.dt_next - self.dt_now ) )
			
			#実行したい内容をここに記述
			print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow())
			
		print("thread end")
		return

def timedelta2sec(t):
	return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000

if __name__ == '__main__':
	x=MyThread(second=2, odd=True)
	y=MyThread(second=2, odd=False)
	
	x.start()
	y.start()
	
	#15秒したら終了させる。
	sleep(15)
	x.stop()
	y.stop()
	
	#スレッドの終了まち
	x.join()
	y.join()
	
	print("プログラム終了")


実行結果

python.exe timerThreadTest.py
Thread-2 2 False 2017-09-06 05:13:45.017042
Thread-1 2 True 2017-09-06 05:13:46.021042
Thread-2 2 False 2017-09-06 05:13:47.020042
Thread-1 2 True 2017-09-06 05:13:48.008042
Thread-2 2 False 2017-09-06 05:13:49.005042
Thread-1 2 True 2017-09-06 05:13:50.018042
Thread-2 2 False 2017-09-06 05:13:51.012042
Thread-1 2 True 2017-09-06 05:13:52.029042
Thread-2 2 False 2017-09-06 05:13:53.026042
Thread-1 2 True 2017-09-06 05:13:54.018042
Thread-2 2 False 2017-09-06 05:13:55.027042
Thread-1 2 True 2017-09-06 05:13:56.021042
Thread-2 2 False 2017-09-06 05:13:57.012042
Thread-1 2 True 2017-09-06 05:13:58.022042
Thread-2 2 False 2017-09-06 05:13:59.027042
Thread Stop Event
Thread Stop Event
Thread-1 2 True 2017-09-06 05:14:00.016042
thread end
Thread-2 2 False 2017-09-06 05:14:01.007042
thread end
プログラム終了

ファイルの最後の1行を読む

ファイルの最後の1行を読みたい時。

# -*- coding: utf-8

file_name="some_file.csv"

with open(file_name,'r') as f:
	num_lines = sum(1 for line in f)
	f.seek(0)
	for line in range(0, num_lines-1):
		f.readline()
	line = f.readline()
print(line)

QueueをつかったPython マルチスレッド

マルチスレッドのテストプログラム。
Listをもらって、加算して、結果をグローバルのリストに書き込む。

import threading
import queue
import time

commonList=[]
q=queue.Queue()

def worker():
	"""
		マルチスレッドで走らせる関数
		Queueからデータをもらう。
		結果をcommonListに書き込む
		計算内容や書き込む場所はqに書いてある
	"""
	global q
	global commonList
	print("thread Start")
	
	while True:
		time.sleep(0.1) #Ctrl+Cで終了させるおまじない。
		item = q.get()
		if item is None:
			print("thread None")
			break
		commonList.append((item.ID,sum(item.values)))
		q.task_done()
	print("thread End")

def main():
	global q
	global commonList
	num_worker_threads=3
	a=Job([1,2,3],0)
	b=Job([3,4,5],1)
	c=Job([6,7,8],2)
	d=Job([9,0,1],3)
	e=Job([2,3,4],4)
	f=Job([5,6,7,8],5)
	
	threadItem=[a,b,c,d,e,f]
	
	threads=[]
	
	#スレッドを立てる。作業はあとでQueueで渡す。
	for i in range(num_worker_threads):
		t = threading.Thread(target=worker)
		t.start()
		threads.append(t)
	
	#itemを投入。データを渡す。
	for item in threadItem:
		q.put(item)
	
	# block until all tasks are done qが空になるのを待っている。
	q.join()

	#スレッド停止命令(None)の投入
	for i in range(num_worker_threads):
		q.put(None)
	
	#スレッドの終了まち
	for t in threads:
		t.join()
	
	#結果の表示
	print(commonList)

class Job():
	#xはリスト,yは書き込み先のリストのインデックス
	def __init__(self,x,y):
		self.values=x.copy()
		self.ID=y

if __name__ == '__main__':
	main()

結果

C:\Python34>python multiThreadTest.py
thread Start
thread Start
thread Start
thread None
thread End
thread None
thread End
thread None
thread End
[(0, 6), (1, 12), (2, 21), (3, 10), (4, 9), (5, 26)]