Cucco’s Compute Hack

コンピュータ関係の記事を書いていきます。

複数ファイルを連続して読むプログラム

複数に分割されたファイルを決まった順番で、連続して読む。
csv.readerっぽく。forで1行ずつ読めるようにイテレータとして実装。

import csv

class MyIterator(object):
    def __init__(self, readFileNames, skipHeader=False):
        self.readFileNames = readFileNames
        self.currentFileIndex = -1
        self.fp = None
        self.csv_reader = None
        self.skipHeader = skipHeader

    def __iter__(self):
        # next()はselfが実装してるのでそのままselfを返す
        return self

    def __next__(self):

        if self.csv_reader is None:
            #1個目のファイルを開く
            self.nextfile()

        try:
            value=next(self.csv_reader)
            return value
        except StopIteration:
            #終端まで来たら、このStopIterationが投げられる。
            try:
                self.nextfile()
                value=next(self.csv_reader)
                return value
            except StopIteration:
                # 次のファイルがなかったので、例外を投げてイテレータを終わる。
                raise (StopIteration)
        except:
            # ここには来ない。
            raise (StopIteration)
    
    def nextfile(self):
        self.currentFileIndex = self.currentFileIndex + 1
        if self.fp is not None:
            self.fp.close()
        if self.currentFileIndex > len(self.readFileNames)-1:
            # 次のファイルはないので例外を投げる
            raise(StopIteration)
        else:
            self.fp=open(self.readFileNames[self.currentFileIndex],mode='r',newline='\n')
            self.csv_reader=csv.reader(self.fp, delimiter=',', quotechar='"')

            if self.skipHeader is True:
                next(self.csv_reader)


readFileNames=[]
readFileNames.append("D:/data/data_01.csv")
readFileNames.append("D:/data/data_02.csv")
readFileNames.append("D:/data/data_03.csv")

my_iterator = MyIterator(readFileNames,skipHeader=True)

for value in my_iterator:
    print(value)

my_iterator = MyIterator(readFileNames,skipHeader=False)

for value in my_iterator:
    print(value)

Neural Network Consoleによる学習済みニューラルネットワークの利用

環境構築

ベースはanaconda。nnablaがなく、importに失敗するのでパッケージ追加しました。以下のコマンドを管理者で開いたコンソールで実行。pip自体の更新が必要な場合もあり。

pip install ipykernel
pip install nnabla

コードの作成と推論の実行

以下を参考に、アヤメのデータで学習して、推論するところまで。
学習はGUI使いました。学習済みパラメータファイルが20180520_142204というフォルダにある想定です。

チュートリアル:Neural Network Consoleによる学習済みニューラルネットワークのNeural Network Librariesを用いた利用方法2種 – Docs - Neural Network Console

import nnabla as nn
import nnabla.functions as F
import nnabla.parametric_functions as PF

#loss関数を削るのでyが不要になる。
#def network(x, y,test=False):
def network(x, test=False):
    # Input -> 4
    # BatchNormalization
    with nn.parameter_scope('BatchNormalization'):
        h = PF.batch_normalization(x, (1,), 0.9, 0.0001, not test)
    # Affine -> 50
    with nn.parameter_scope('Affine'):
        h = PF.affine(h, (50,))
    # ReLU
    h = F.relu(h, True)
    # Affine_2 -> 20
    with nn.parameter_scope('Affine_2'):
        h = PF.affine(h, (20,))
    # ReLU_2
    h = F.relu(h, True)
    # Dropout
    if not test:
        h = F.dropout(h)
    # Affine_3 -> 3
    with nn.parameter_scope('Affine_3'):
        h = PF.affine(h, (3,))
    # Softmax
    h = F.softmax(h)
    # SquaredError
    # 不要なのでコメントアウト
    # h = F.squared_error(h, y)
    return h

以降が追加した推論用のコード。実際には上のコードと同じファイルに記載。

# load parameters
# \を/に書き換え必要あり。オリジナルのブログはシングルクオートが全角なので書き換えが必要。
nn.load_parameters('C:/iris_sample/iris_sample.files/20180520_142204/parameters.h5')

# Prepare input variable
# 3つ推論する場合
x=nn.Variable((3,4))
# 1つ推論する場合
x1=nn.Variable((1,4))
x2=nn.Variable((1,4))
x3=nn.Variable((1,4))

# Let input data to x.d
# x.d = ...
x.d=[[5.1, 3.5, 1.4, 0.2],[7,   3.2, 4.7, 1.4],[6.3, 3.3, 6,  2.5]]
#yの1番目が大きな値になる。listを作って渡せばいいのは楽。
x1.d=[5.1, 3.5, 1.4, 0.2]
#yの2番目が大きな値になる。
x2.d=[7,   3.2, 4.7, 1.4]
#yの3番目が大きな値になる。
x3.d=[6.3, 3.3, 6,  2.5]

# Build network for inference
# test=Trueで、ドロップアウトの機能を停止する。BatchNormalizationへの影響は不明。
y = network(x, test=True)

# Execute inference
y.forward()
print(y.d)

実行結果。指数表示なので大小関係わかりにくいがあってるはず。

#[[9.9891686e-01 9.9650992e-04 8.6744032e-05]
# [7.5185834e-03 7.6037079e-01 2.3211062e-01]
# [7.9514321e-06 3.5711315e-02 9.6428072e-01]]

時刻ちょうどに実行する その3

5秒に1回のタスクAと、15秒に1回のタスクBがある。
タスク自体は、マルチプロセスで動く。
タスクの実行中は、メインプロセスはある作業を実施できない。
タスクAまたはタスクBを実行した場合は、メインタスクを1回だけ実行する。

import multiprocessing
import datetime
import time

class Worker(multiprocessing.Process):
	def __init__(self,t,workload=2):
		self.t=t
		self.workload=workload
		super(Worker, self).__init__()
	
	def run(self):
		#print(self.name,self.t)
		time.sleep(self.workload)
		return

def uctdatetime00sec(pattern=None):
	"""
	秒以下が0のdatetimeを返す。
	pattern="future"の場合は、直近の未来の時刻を返す。
	"""
	time_now=datetime.datetime.utcnow()
	if pattern=="future":
		time_offset=datetime.timedelta(minutes=1) # 1分先にずらしておく。
		time_now=time_now + time_offset
		
	time_now=datetime.datetime(time_now.year, time_now.month, time_now.day, time_now.hour, time_now.minute, second=0, microsecond=0, tzinfo=None)
	
	return time_now

if __name__ == '__main__':
	taskA=None
	taskB=None
	run_main_task=False

	#仮の時刻
	time_now=datetime.datetime.utcnow()
	
	time_offset=datetime.timedelta(seconds=60) #繰り返しの間隔(秒)
	
	time_offset_taskA = datetime.timedelta(seconds=5) #繰り返しの間隔(秒)
	time_offset_taskB = datetime.timedelta(seconds=15) #繰り返しの間隔(秒)
	
	time_done_taskA=time_now
	time_done_taskB=time_now
	
	print(time_now)
	
	#0秒ちょうどの時刻から開始。
	time_now=uctdatetime00sec(pattern="future")
	
	time_next_taskA=time_now + time_offset_taskA
	time_next_taskB=time_now + time_offset_taskB
	
	print("demo wills start at ", time_now)
	
	time_end=time_now + time_offset
	
	while time_end > datetime.datetime.utcnow():#デモプログラムなので1分で終わらせる。
		
		time_now=datetime.datetime.utcnow()
		
		if time_now >= time_next_taskA:
			time_next_taskA=time_now + time_offset_taskA
			while time_now > time_next_taskA:
				print("skip taskA")
				time_next_taskA=time_now + time_offset_taskA
				
			#5秒に1回のタスクをここに記述(非同期)
			taskA=Worker(time_now,workload=1) # startは2回呼べないのでオブジェクトを作りなおし
			taskA.start()
			print("taskA start",time_now)
			run_main_task=True
			
		if time_now >= time_next_taskB:
			time_next_taskB=time_now + time_offset_taskB
			while time_now > time_next_taskB:
				print("skip taskB")
				time_next_taskB=time_now + time_offset_taskB
				
			time_next_taskB=time_now + time_offset_taskB
			#15秒に1回のタスクをここに記述(非同期)
			taskB=Worker(time_now,workload=1) # startは2回呼べないのでオブジェクトを作りなおし
			taskB.start()
			print("taskB start",time_now)
			run_main_task=True
		
		#全部のタスクが終わるのを待つ
		#動いていないタスクは待てない。。。
		#ifの評価は左の論理が優先。
		if (taskA is not None) and (taskA.is_alive()):
			taskA.join()
		if (taskB is not None) and (taskB.is_alive()):
			taskB.join()
		
		if run_main_task==True:
			print("main task",datetime.datetime.utcnow())
			#変更後に1回メインタスクを実行したので次の変更まで何もしない。
			run_main_task=False
		
	print("end")

実行結果

C:\>python C:\multiProcessTest2.py
2018-03-16 14:13:07.936899
demo wills start at  2018-03-16 14:14:00
taskA start 2018-03-16 14:14:05.000162
main task 2018-03-16 14:14:06.433244
taskA start 2018-03-16 14:14:10.000448
main task 2018-03-16 14:14:11.359526
taskA start 2018-03-16 14:14:15.000734
taskB start 2018-03-16 14:14:15.000734
main task 2018-03-16 14:14:16.387814
taskA start 2018-03-16 14:14:20.001020
main task 2018-03-16 14:14:21.356098
taskA start 2018-03-16 14:14:25.001306
main task 2018-03-16 14:14:26.358384
taskA start 2018-03-16 14:14:30.001592
taskB start 2018-03-16 14:14:30.001592
main task 2018-03-16 14:14:31.419673
taskA start 2018-03-16 14:14:35.001878
main task 2018-03-16 14:14:36.352956
taskA start 2018-03-16 14:14:40.002164
main task 2018-03-16 14:14:41.358242
taskA start 2018-03-16 14:14:45.002450
taskB start 2018-03-16 14:14:45.002450
main task 2018-03-16 14:14:46.407531
taskA start 2018-03-16 14:14:50.002736
main task 2018-03-16 14:14:51.363814
taskA start 2018-03-16 14:14:55.003022
main task 2018-03-16 14:14:56.368100
end

時刻ちょうどに実行する その2

マルチプロセス版の時刻ちょうどに実行する
2秒ごとと5秒ごとに実行する。ただし開始は0秒から。

import multiprocessing
import datetime
import time

class Worker(multiprocessing.Process):
	def __init__(self,queue,interval=5):
		self.interval=interval
		self.q=q
		
		self.time_now=datetime.datetime.utcnow()
		self.time_offset=datetime.timedelta(seconds=interval) #繰り返しの間隔(秒)
		
		#秒を0にする。
		self.time_next=datetime.datetime(self.time_now.year, self.time_now.month, self.time_now.day, self.time_now.hour, self.time_now.minute, second=0, microsecond=0, tzinfo=None)
		
		while self.time_next < self.time_now:
			self.time_next=self.time_next+self.time_offset
		
		#super(Worker, self).__init__()
		
	def timedelta2sec(self,t):
		return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000
		
	def run(self):
		
		self.time_now = datetime.datetime.utcnow()
		while self.time_next < self.time_now:
			self.time_next=self.time_next+self.time_offset
		time.sleep(self.timedelta2sec(self.time_next-self.time_now))
			
		while self.q.empty()==True:	#qに何か入ってくれば終了する。
			#今の時間を表示
			self.time_now = datetime.datetime.utcnow()
			print(self.name,self.time_now)
			
			self.time_next = self.time_next + self.time_offset
			self.time_now = datetime.datetime.utcnow()
			
			while self.time_next < self.time_now:
				self.time_next=self.time_next+self.time_offset
			
			time.sleep(self.timedelta2sec(self.time_next-self.time_now))
		return

if __name__ == '__main__':

	q=multiprocessing.SimpleQueue() #終了フラグを入れるために使う
	
	x=Worker(q,interval=2)
	y=Worker(q,interval=5)

	x.start()
	y.start()
	
	time.sleep(20)
	
	print("stop process")
	q.put("STOP")
	
	x.join()
	y.join()
	
	print("end")

実行結果

>C:\Python36\python.exe C:\multiProcessTest.py
Worker-1 2018-03-15 11:36:48.010984
Worker-1 2018-03-15 11:36:50.001188
Worker-2 2018-03-15 11:36:50.001188
Worker-1 2018-03-15 11:36:52.002991
Worker-1 2018-03-15 11:36:54.006795
Worker-2 2018-03-15 11:36:55.006197
Worker-1 2018-03-15 11:36:56.006599
Worker-1 2018-03-15 11:36:58.007403
Worker-2 2018-03-15 11:37:00.009206
Worker-1 2018-03-15 11:37:00.009206
Worker-1 2018-03-15 11:37:02.009010
Worker-1 2018-03-15 11:37:04.007215
Worker-2 2018-03-15 11:37:05.006616
Worker-1 2018-03-15 11:37:06.002419
stop process
end

スーパークラスのコンストラクタを忘れているとこんなエラーになる。

    assert self._popen is None, 'cannot start a process twice'
AttributeError: 'Worker' object has no attribute '_popen'

sqliteをインメモリで使ってみた

やりたいことは以下。全部できた。
インメモリDBは早い。

  1. インメモリで動かす
  2. 日付、日時を格納する
  3. 日時、日時の新しいN件だけをDB内に維持する
  4. 日付、日時の新しいM件を取り出す

ソースコード

import sqlite3
import time
import datetime

start=time.time()

con = sqlite3.connect(":memory:")
# ファイルの場合は280,000件で20秒
#con = sqlite3.connect("sqlitedb.db")
# ファイルの場合は1000件で20秒
con.isolation_level = None # None で自動コミットモード
cur = con.cursor()

# Create table
cur.execute('''CREATE TABLE stocks
             (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''')

for i in range(100):
	# Insert a row of data
	now = datetime.datetime.now()
	cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i)))
	time.sleep(0.01)

# Save (commit) the changes
# con.commit()

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

# 新しいほうから3件だけを表示する。
for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"):
	print(row)

print("-------------------------------------------")

# レコード数を最新の6件だけ残して、古い行を削除する
cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)")

elapsed_time=time.time()-start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

for row in cur.execute("SELECT * From stocks"):
	print(row)

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.

con.close()

実行結果

C:\Python34>C:\Python34\inmemorysqlite.py
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
-------------------------------------------
elapsed_time:1.5490062236785889[sec]
('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94)
('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95)
('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96)
('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97)
('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98)
('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)

リングバッファと計算

# -*- coding: utf-8 -*-

class RingBuffer():
	def __init__(self,bufferSize):
		self.size = bufferSize
		self.pc= 0 #次に使うバッファの番号
		self.data=[None]*self.size #固定長さのリスト
		self.valid=False #リングバッファがいっぱいになったらTrueにする。
	
	def push(self,value):
		print("pc=",self.pc)
		self.data[self.pc]=value
		self.pc=self.pc+1
		
		if self.pc==self.size:
			self.pc=0
			self.valid=True
			
	def max(self):
		if self.valid:
			return max(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return max(self.data[0:self.pc-1])
		
	def min(self):
		if self.valid:
			return min(self.data)
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return min(self.data[0:self.pc-1])
				
	def average(self):
		if self.valid:
			return sum(self.data)/self.size
		else:
			if self.pc==0:
				return None
			else:
				print("pc=",self.pc)
				return sum(self.data[0:self.pc-1])/self.size
				
	def dump(self):
		return self.data

def main():
	rb=RingBuffer(5)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	
	rb.push(1)
	rb.push(2)
	rb.push(3)
	rb.push(4)
	print(rb.dump())
	print("max=",rb.max()) #バッファ内の最大を返す
	print("min=",rb.min()) #バッファ内の最小を返す
	rb.push(1)
	rb.push(1)
	rb.push(2)
	rb.push(2)
	
	print(rb.dump())
	print(rb.max()) #バッファ内の最大を返す
	print(rb.min()) #バッファ内の最小を返す
	print(rb.average()) #バッファ内の最小を返す
if __name__== '__main__':
	main()

multiprocessingのサンプルコード

multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。

# -*- coding: utf-8 -*-

from multiprocessing import Pool
import time
import os

def f(x):
	print("f(x)",os.getpid())
	time.sleep(1)
	print(x)
	return x*x

class MyClass():
	def __init__(self):
		print("init MyClass")
		self.foo=1 #共通の情報
	def g(self,x):
		print("MyClass g(x)",os.getpid())
		time.sleep(1)
		#共通の情報にアクセスできる?→できた。
		print(x,self.foo)
		#共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。
		self.foo=2
		return x*x+self.foo

if __name__ == '__main__':
	print("main()",os.getpid())
	myclass=MyClass()
	
	#普通の関数を実行
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(f, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])
		
	print("----------------------------")
	
	#クラスの中の関数を実行
	#新しいpoolにしてるのでプロセスIDは変わる。
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])

実行結果

main() 6120
init MyClass
f(x) 4252
f(x) 2784
0
f(x) 4252
1
f(x) 2784
2
f(x) 4252
3
4
[0, 1, 4, 9, 16]
                                                      • -
MyClass g(x) 7020 MyClass g(x) 5204 0 1 MyClass g(x) 7020 1 1 MyClass g(x) 5204 2 1 MyClass g(x) 7020 3 1 4 1 [2, 3, 6, 11, 18]