Cucco’s Compute Hack

コンピュータ関係の記事を書いていきます。

時刻ちょうどに実行する その2

マルチプロセス版の時刻ちょうどに実行する
2秒ごとと5秒ごとに実行する。ただし開始は0秒から。

import multiprocessing
import datetime
import time

class Worker(multiprocessing.Process):
	def __init__(self,queue,interval=5):
		self.interval=interval
		self.q=q
		
		self.time_now=datetime.datetime.utcnow()
		self.time_offset=datetime.timedelta(seconds=interval) #繰り返しの間隔(秒)
		
		#秒を0にする。
		self.time_next=datetime.datetime(self.time_now.year, self.time_now.month, self.time_now.day, self.time_now.hour, self.time_now.minute, second=0, microsecond=0, tzinfo=None)
		
		while self.time_next < self.time_now:
			self.time_next=self.time_next+self.time_offset
		
		#super(Worker, self).__init__()
		
	def timedelta2sec(self,t):
		return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000
		
	def run(self):
		
		self.time_now = datetime.datetime.utcnow()
		while self.time_next < self.time_now:
			self.time_next=self.time_next+self.time_offset
		time.sleep(self.timedelta2sec(self.time_next-self.time_now))
			
		while self.q.empty()==True:	#qに何か入ってくれば終了する。
			#今の時間を表示
			self.time_now = datetime.datetime.utcnow()
			print(self.name,self.time_now)
			
			self.time_next = self.time_next + self.time_offset
			self.time_now = datetime.datetime.utcnow()
			
			while self.time_next < self.time_now:
				self.time_next=self.time_next+self.time_offset
			
			time.sleep(self.timedelta2sec(self.time_next-self.time_now))
		return

if __name__ == '__main__':

	q=multiprocessing.SimpleQueue() #終了フラグを入れるために使う
	
	x=Worker(q,interval=2)
	y=Worker(q,interval=5)

	x.start()
	y.start()
	
	time.sleep(20)
	
	print("stop process")
	q.put("STOP")
	
	x.join()
	y.join()
	
	print("end")

実行結果

>C:\Python36\python.exe C:\multiProcessTest.py
Worker-1 2018-03-15 11:36:48.010984
Worker-1 2018-03-15 11:36:50.001188
Worker-2 2018-03-15 11:36:50.001188
Worker-1 2018-03-15 11:36:52.002991
Worker-1 2018-03-15 11:36:54.006795
Worker-2 2018-03-15 11:36:55.006197
Worker-1 2018-03-15 11:36:56.006599
Worker-1 2018-03-15 11:36:58.007403
Worker-2 2018-03-15 11:37:00.009206
Worker-1 2018-03-15 11:37:00.009206
Worker-1 2018-03-15 11:37:02.009010
Worker-1 2018-03-15 11:37:04.007215
Worker-2 2018-03-15 11:37:05.006616
Worker-1 2018-03-15 11:37:06.002419
stop process
end

スーパークラスのコンストラクタを忘れているとこんなエラーになる。

    assert self._popen is None, 'cannot start a process twice'
AttributeError: 'Worker' object has no attribute '_popen'

sqliteをインメモリで使ってみた

やりたいことは以下。全部できた。
インメモリDBは早い。

  1. インメモリで動かす
  2. 日付、日時を格納する
  3. 日時、日時の新しいN件だけをDB内に維持する
  4. 日付、日時の新しいM件を取り出す

ソースコード

import sqlite3
import time
import datetime

start=time.time()

con = sqlite3.connect(":memory:")
# ファイルの場合は280,000件で20秒
#con = sqlite3.connect("sqlitedb.db")
# ファイルの場合は1000件で20秒
con.isolation_level = None # None で自動コミットモード
cur = con.cursor()

# Create table
cur.execute('''CREATE TABLE stocks
             (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''')

for i in range(100):
	# Insert a row of data
	now = datetime.datetime.now()
	cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i)))
	time.sleep(0.01)

# Save (commit) the changes
# con.commit()

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

# 新しいほうから3件だけを表示する。
for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"):
	print(row)

print("-------------------------------------------")

# レコード数を最新の6件だけ残して、古い行を削除する
cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)")

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

for row in cur.execute("SELECT * From stocks"):
	print(row)

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.

con.close()

実行結果

C:\Python34>C:\Python34\inmemorysqlite.py
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
-------------------------------------------
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94)
('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95)
('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)

ログをディスクに吐いてるようなので、それもメモリにすればもうちょっと早くなる??未評価。

    dbconnection =sqlite3.connect(":memory:", check_same_thread = False)
    dbconnection.isolation_level = None
    dbcursor = dbconnection.cursor()
    dbcursor.execute('PRAGMA temp_store=MEMORY;')
    dbcursor.execute('PRAGMA journal_mode=MEMORY;')

リングバッファと計算

# -*- coding: utf-8 -*-

class RingBuffer():
	def __init__(self,bufferSize):
		self.size = bufferSize
		self.pc= 0 #次に使うバッファの番号
		self.data=[None]*self.size #固定長さのリスト
		self.valid=False #リングバッファがいっぱいになったらTrueにする。
	
	def push(self,value):
		print("pc=",self.pc)
		self.data[self.pc]=value
		self.pc=self.pc+1
		
		if self.pc==self.size:
			self.pc=0
			self.valid=True
			
	def max(self):
		if self.valid:
			return max(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return max(self.data[0:self.pc-1])
		
	def min(self):
		if self.valid:
			return min(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return min(self.data[0:self.pc-1])
				
	def average(self):
		if self.valid:
			return sum(self.data)/self.size
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return sum(self.data[0:self.pc-1])/self.size
				
	def dump(self):
		return self.data

def main():
	rb=RingBuffer(5)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	
	rb.push(1)
	rb.push(2)
	rb.push(3)
	rb.push(4)
	print(rb.dump())
	print("max=",rb.max()) #バッファ内の最大を返す
	print("min=",rb.min()) #バッファ内の最小を返す
	rb.push(1)
	rb.push(1)
	rb.push(2)
	rb.push(2)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	print(rb.average()) #バッファ内の最小を返す
if __name__== '__main__':
	main()

multiprocessingのサンプルコード

multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。

# -*- coding: utf-8 -*-

from multiprocessing import Pool
import time
import os

def f(x):
	print("f(x)",os.getpid())
	time.sleep(1)
	print(x)
	return x*x

class MyClass():
	def __init__(self):
		print("init MyClass")
		self.foo=1 #共通の情報
	def g(self,x):
		print("MyClass g(x)",os.getpid())
		time.sleep(1)
		#共通の情報にアクセスできる?→できた。
		print(x,self.foo)
		#共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。
		self.foo=2
		return x*x+self.foo

if __name__ == '__main__':
	print("main()",os.getpid())
	myclass=MyClass()
	
	#普通の関数を実行
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(f, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])
		
	print("----------------------------")
	
	#クラスの中の関数を実行
	#新しいpoolにしてるのでプロセスIDは変わる。
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])

実行結果

main() 6120
init MyClass
f(x) 4252
f(x) 2784
0
f(x) 4252
1
f(x) 2784
2
f(x) 4252
3
4
[0, 1, 4, 9, 16]
                                                      • -
MyClass g(x) 7020 MyClass g(x) 5204 0 1 MyClass g(x) 7020 1 1 MyClass g(x) 5204 2 1 MyClass g(x) 7020 3 1 4 1 [2, 3, 6, 11, 18]

Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"

threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。

"TypeError: function1() got multiple values for argument 'arg1'"

原因はよくわからない。

とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。

# -*- coding: utf-8 -*-
import threading

class MyThread(threading.Thread):
	def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None):
		#threading.Threadとしてのコンストラクタ
		threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None)
		
		#threadの中で使う引数(辞書)
		self.kwargs = kwargs
		return
		
	def func1(self,*args,**kwargs):
		x=kwargs["kwargs"]
		a=x["a"]
		b=x["b"]
		c=x["c"]
		print("thread main fun1",a,b,c)
		return
		
	def run(self):
		#実際の処理は、func1で行う。
		self.func1(kwargs=self.kwargs)
		return

def main():
	kwargs={"a":1,"b":2,"c":3}
	t=MyThread(kwargs=kwargs)
	t.start()

if __name__== '__main__':
	main()

時刻ちょうどに実行する

プログラム

# -*- coding: utf-8 -*-
import threading
import datetime
from time import sleep

"""
1秒ごとに交互に実行する。
時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。
→時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。
"""
class MyThread(threading.Thread):
	def __init__(self, odd = True ,second = 2):
		self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒)
		self.stop_event = threading.Event() #停止させるためのフラグ
		
		self.dt_now=datetime.datetime.utcnow()
		
		self.odd=odd
		if odd==True:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None)
		else:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None)
			
		threading.Thread.__init__(self) #threadとしてのコンストラクタ
		
	def stop(self):
		#終了させるためのもの。
		print("Thread Stop Event")
		self.stop_event.set()
	
	def run(self):
		while not self.stop_event.is_set():
			
			self.dt_next = self.dt_next + self.dt_offset
			self.dt_now = datetime.datetime.utcnow()
			
			if self.dt_now > self.dt_next:
				continue
			
			#時刻になるまで待ち
			sleep( timedelta2sec( self.dt_next - self.dt_now ) )
			
			#実行したい内容をここに記述
			print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow())
			
		print("thread end")
		return

def timedelta2sec(t):
	return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000

if __name__ == '__main__':
	x=MyThread(second=2, odd=True)
	y=MyThread(second=2, odd=False)
	
	x.start()
	y.start()
	
	#15秒したら終了させる。
	sleep(15)
	x.stop()
	y.stop()
	
	#スレッドの終了まち
	x.join()
	y.join()
	
	print("プログラム終了")


実行結果

python.exe timerThreadTest.py
Thread-2 2 False 2017-09-06 05:13:45.017042
Thread-1 2 True 2017-09-06 05:13:46.021042
Thread-2 2 False 2017-09-06 05:13:47.020042
Thread-1 2 True 2017-09-06 05:13:48.008042
Thread-2 2 False 2017-09-06 05:13:49.005042
Thread-1 2 True 2017-09-06 05:13:50.018042
Thread-2 2 False 2017-09-06 05:13:51.012042
Thread-1 2 True 2017-09-06 05:13:52.029042
Thread-2 2 False 2017-09-06 05:13:53.026042
Thread-1 2 True 2017-09-06 05:13:54.018042
Thread-2 2 False 2017-09-06 05:13:55.027042
Thread-1 2 True 2017-09-06 05:13:56.021042
Thread-2 2 False 2017-09-06 05:13:57.012042
Thread-1 2 True 2017-09-06 05:13:58.022042
Thread-2 2 False 2017-09-06 05:13:59.027042
Thread Stop Event
Thread Stop Event
Thread-1 2 True 2017-09-06 05:14:00.016042
thread end
Thread-2 2 False 2017-09-06 05:14:01.007042
thread end
プログラム終了

ファイルの最後の1行を読む

ファイルの最後の1行を読みたい時。

# -*- coding: utf-8

file_name="some_file.csv"

with open(file_name,'r') as f:
	num_lines = sum(1 for line in f)
	f.seek(0)
	for line in range(0, num_lines-1):
		f.readline()
	line = f.readline()
print(line)