複数ファイルを連続して読むプログラム
複数に分割されたファイルを決まった順番で、連続して読む。
csv.readerっぽく。forで1行ずつ読めるようにイテレータとして実装。
import csv class MyIterator(object): def __init__(self, readFileNames, skipHeader=False): self.readFileNames = readFileNames self.currentFileIndex = -1 self.fp = None self.csv_reader = None self.skipHeader = skipHeader def __iter__(self): # next()はselfが実装してるのでそのままselfを返す return self def __next__(self): if self.csv_reader is None: #1個目のファイルを開く self.nextfile() try: value=next(self.csv_reader) return value except StopIteration: #終端まで来たら、このStopIterationが投げられる。 try: self.nextfile() value=next(self.csv_reader) return value except StopIteration: # 次のファイルがなかったので、例外を投げてイテレータを終わる。 raise (StopIteration) except: # ここには来ない。 raise (StopIteration) def nextfile(self): self.currentFileIndex = self.currentFileIndex + 1 if self.fp is not None: self.fp.close() if self.currentFileIndex > len(self.readFileNames)-1: # 次のファイルはないので例外を投げる raise(StopIteration) else: self.fp=open(self.readFileNames[self.currentFileIndex],mode='r',newline='\n') self.csv_reader=csv.reader(self.fp, delimiter=',', quotechar='"') if self.skipHeader is True: next(self.csv_reader) readFileNames=[] readFileNames.append("D:/data/data_01.csv") readFileNames.append("D:/data/data_02.csv") readFileNames.append("D:/data/data_03.csv") my_iterator = MyIterator(readFileNames,skipHeader=True) for value in my_iterator: print(value) my_iterator = MyIterator(readFileNames,skipHeader=False) for value in my_iterator: print(value)
Neural Network Consoleによる学習済みニューラルネットワークの利用
環境構築
ベースはanaconda。nnablaがなく、importに失敗するのでパッケージ追加しました。以下のコマンドを管理者で開いたコンソールで実行。
pip install ipykernel pip install nnabla
事前にpip自体の更新が必要な場合もあり。
python -m pip install --upgrade pip
コードの作成と推論の実行
以下を参考に、アヤメのデータで学習して、推論するところまで。
学習はGUI使いました。学習済みパラメータファイルが20180520_142204というフォルダにある想定です。
import nnabla as nn import nnabla.functions as F import nnabla.parametric_functions as PF #loss関数を削るのでyが不要になる。 #def network(x, y,test=False): def network(x, test=False): # Input -> 4 # BatchNormalization with nn.parameter_scope('BatchNormalization'): h = PF.batch_normalization(x, (1,), 0.9, 0.0001, not test) # Affine -> 50 with nn.parameter_scope('Affine'): h = PF.affine(h, (50,)) # ReLU h = F.relu(h, True) # Affine_2 -> 20 with nn.parameter_scope('Affine_2'): h = PF.affine(h, (20,)) # ReLU_2 h = F.relu(h, True) # Dropout if not test: h = F.dropout(h) # Affine_3 -> 3 with nn.parameter_scope('Affine_3'): h = PF.affine(h, (3,)) # Softmax h = F.softmax(h) # SquaredError # 不要なのでコメントアウト # h = F.squared_error(h, y) return h
以降が追加した推論用のコード。実際には上のコードと同じファイルに記載。
# load parameters # \を/に書き換え必要あり。オリジナルのブログはシングルクオートが全角なので書き換えが必要。 nn.load_parameters('C:/iris_sample/iris_sample.files/20180520_142204/parameters.h5') # Prepare input variable # 3つ推論する場合 x=nn.Variable((3,4)) # 1つ推論する場合 x1=nn.Variable((1,4)) x2=nn.Variable((1,4)) x3=nn.Variable((1,4)) # Let input data to x.d # x.d = ... x.d=[[5.1, 3.5, 1.4, 0.2],[7, 3.2, 4.7, 1.4],[6.3, 3.3, 6, 2.5]] #yの1番目が大きな値になる。listを作って渡せばいいのは楽。 x1.d=[5.1, 3.5, 1.4, 0.2] #yの2番目が大きな値になる。 x2.d=[7, 3.2, 4.7, 1.4] #yの3番目が大きな値になる。 x3.d=[6.3, 3.3, 6, 2.5] # Build network for inference # test=Trueで、ドロップアウトの機能を停止する。BatchNormalizationへの影響は不明。 y = network(x, test=True) # Execute inference y.forward() print(y.d)
実行結果。指数表示なので大小関係わかりにくいがあってるはず。
#[[9.9891686e-01 9.9650992e-04 8.6744032e-05] # [7.5185834e-03 7.6037079e-01 2.3211062e-01] # [7.9514321e-06 3.5711315e-02 9.6428072e-01]]
時刻ちょうどに実行する その3
5秒に1回のタスクAと、15秒に1回のタスクBがある。
タスク自体は、マルチプロセスで動く。
タスクの実行中は、メインプロセスはある作業を実施できない。
タスクAまたはタスクBを実行した場合は、メインタスクを1回だけ実行する。
import multiprocessing import datetime import time class Worker(multiprocessing.Process): def __init__(self,t,workload=2): self.t=t self.workload=workload super(Worker, self).__init__() def run(self): #print(self.name,self.t) time.sleep(self.workload) return def uctdatetime00sec(pattern=None): """ 秒以下が0のdatetimeを返す。 pattern="future"の場合は、直近の未来の時刻を返す。 """ time_now=datetime.datetime.utcnow() if pattern=="future": time_offset=datetime.timedelta(minutes=1) # 1分先にずらしておく。 time_now=time_now + time_offset time_now=datetime.datetime(time_now.year, time_now.month, time_now.day, time_now.hour, time_now.minute, second=0, microsecond=0, tzinfo=None) return time_now if __name__ == '__main__': taskA=None taskB=None run_main_task=False #仮の時刻 time_now=datetime.datetime.utcnow() time_offset=datetime.timedelta(seconds=60) #繰り返しの間隔(秒) time_offset_taskA = datetime.timedelta(seconds=5) #繰り返しの間隔(秒) time_offset_taskB = datetime.timedelta(seconds=15) #繰り返しの間隔(秒) time_done_taskA=time_now time_done_taskB=time_now print(time_now) #0秒ちょうどの時刻から開始。 time_now=uctdatetime00sec(pattern="future") time_next_taskA=time_now + time_offset_taskA time_next_taskB=time_now + time_offset_taskB print("demo wills start at ", time_now) time_end=time_now + time_offset while time_end > datetime.datetime.utcnow():#デモプログラムなので1分で終わらせる。 time_now=datetime.datetime.utcnow() if time_now >= time_next_taskA: time_next_taskA=time_now + time_offset_taskA while time_now > time_next_taskA: print("skip taskA") time_next_taskA=time_now + time_offset_taskA #5秒に1回のタスクをここに記述(非同期) taskA=Worker(time_now,workload=1) # startは2回呼べないのでオブジェクトを作りなおし taskA.start() print("taskA start",time_now) run_main_task=True if time_now >= time_next_taskB: time_next_taskB=time_now + time_offset_taskB while time_now > time_next_taskB: print("skip taskB") time_next_taskB=time_now + time_offset_taskB time_next_taskB=time_now + time_offset_taskB #15秒に1回のタスクをここに記述(非同期) taskB=Worker(time_now,workload=1) # startは2回呼べないのでオブジェクトを作りなおし taskB.start() print("taskB start",time_now) run_main_task=True #全部のタスクが終わるのを待つ #動いていないタスクは待てない。。。 #ifの評価は左の論理が優先。 if (taskA is not None) and (taskA.is_alive()): taskA.join() if (taskB is not None) and (taskB.is_alive()): taskB.join() if run_main_task==True: print("main task",datetime.datetime.utcnow()) #変更後に1回メインタスクを実行したので次の変更まで何もしない。 run_main_task=False print("end")
実行結果
C:\>python C:\multiProcessTest2.py 2018-03-16 14:13:07.936899 demo wills start at 2018-03-16 14:14:00 taskA start 2018-03-16 14:14:05.000162 main task 2018-03-16 14:14:06.433244 taskA start 2018-03-16 14:14:10.000448 main task 2018-03-16 14:14:11.359526 taskA start 2018-03-16 14:14:15.000734 taskB start 2018-03-16 14:14:15.000734 main task 2018-03-16 14:14:16.387814 taskA start 2018-03-16 14:14:20.001020 main task 2018-03-16 14:14:21.356098 taskA start 2018-03-16 14:14:25.001306 main task 2018-03-16 14:14:26.358384 taskA start 2018-03-16 14:14:30.001592 taskB start 2018-03-16 14:14:30.001592 main task 2018-03-16 14:14:31.419673 taskA start 2018-03-16 14:14:35.001878 main task 2018-03-16 14:14:36.352956 taskA start 2018-03-16 14:14:40.002164 main task 2018-03-16 14:14:41.358242 taskA start 2018-03-16 14:14:45.002450 taskB start 2018-03-16 14:14:45.002450 main task 2018-03-16 14:14:46.407531 taskA start 2018-03-16 14:14:50.002736 main task 2018-03-16 14:14:51.363814 taskA start 2018-03-16 14:14:55.003022 main task 2018-03-16 14:14:56.368100 end
時刻ちょうどに実行する その2
マルチプロセス版の時刻ちょうどに実行する
2秒ごとと5秒ごとに実行する。ただし開始は0秒から。
import multiprocessing import datetime import time class Worker(multiprocessing.Process): def __init__(self,queue,interval=5): self.interval=interval self.q=q self.time_now=datetime.datetime.utcnow() self.time_offset=datetime.timedelta(seconds=interval) #繰り返しの間隔(秒) #秒を0にする。 self.time_next=datetime.datetime(self.time_now.year, self.time_now.month, self.time_now.day, self.time_now.hour, self.time_now.minute, second=0, microsecond=0, tzinfo=None) while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset #super(Worker, self).__init__() def timedelta2sec(self,t): return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000 def run(self): self.time_now = datetime.datetime.utcnow() while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset time.sleep(self.timedelta2sec(self.time_next-self.time_now)) while self.q.empty()==True: #qに何か入ってくれば終了する。 #今の時間を表示 self.time_now = datetime.datetime.utcnow() print(self.name,self.time_now) self.time_next = self.time_next + self.time_offset self.time_now = datetime.datetime.utcnow() while self.time_next < self.time_now: self.time_next=self.time_next+self.time_offset time.sleep(self.timedelta2sec(self.time_next-self.time_now)) return if __name__ == '__main__': q=multiprocessing.SimpleQueue() #終了フラグを入れるために使う x=Worker(q,interval=2) y=Worker(q,interval=5) x.start() y.start() time.sleep(20) print("stop process") q.put("STOP") x.join() y.join() print("end")
実行結果
>C:\Python36\python.exe C:\multiProcessTest.py Worker-1 2018-03-15 11:36:48.010984 Worker-1 2018-03-15 11:36:50.001188 Worker-2 2018-03-15 11:36:50.001188 Worker-1 2018-03-15 11:36:52.002991 Worker-1 2018-03-15 11:36:54.006795 Worker-2 2018-03-15 11:36:55.006197 Worker-1 2018-03-15 11:36:56.006599 Worker-1 2018-03-15 11:36:58.007403 Worker-2 2018-03-15 11:37:00.009206 Worker-1 2018-03-15 11:37:00.009206 Worker-1 2018-03-15 11:37:02.009010 Worker-1 2018-03-15 11:37:04.007215 Worker-2 2018-03-15 11:37:05.006616 Worker-1 2018-03-15 11:37:06.002419 stop process end
スーパークラスのコンストラクタを忘れているとこんなエラーになる。
assert self._popen is None, 'cannot start a process twice' AttributeError: 'Worker' object has no attribute '_popen'
sqliteをインメモリで使ってみた
やりたいことは以下。全部できた。
インメモリDBは早い。
- インメモリで動かす
- 日付、日時を格納する
- 日時、日時の新しいN件だけをDB内に維持する
- 日付、日時の新しいM件を取り出す
import sqlite3 import time import datetime start=time.time() con = sqlite3.connect(":memory:") # ファイルの場合は280,000件で20秒 #con = sqlite3.connect("sqlitedb.db") # ファイルの場合は1000件で20秒 con.isolation_level = None # None で自動コミットモード cur = con.cursor() # Create table cur.execute('''CREATE TABLE stocks (date text, ts timestamp, trans text, symbol text, qty real, price real, hoge integer)''') for i in range(100): # Insert a row of data now = datetime.datetime.now() cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)",(now,str(i))) time.sleep(0.01) # Save (commit) the changes # con.commit() elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") # 新しいほうから3件だけを表示する。 for row in cur.execute("SELECT * From stocks ORDER BY ts DESC LIMIT 3"): print(row) print("-------------------------------------------") # レコード数を最新の6件だけ残して、古い行を削除する cur.execute("delete from stocks where ts IN (select ts from stocks order by ts desc LIMIT -1 OFFSET 6)") elapsed_time=time.time()-start print ("elapsed_time:{0}".format(elapsed_time) + "[sec]") for row in cur.execute("SELECT * From stocks"): print(row) # We can also close the connection if we are done with it. # Just be sure any changes have been committed or they will be lost. con.close()
実行結果
C:\Python34>C:\Python34\inmemorysqlite.py elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ------------------------------------------- elapsed_time:1.5490062236785889[sec] ('2006-01-05', '2018-02-21 23:29:50.665637', 'BUY', 'RHAT', 100.0, 35.14, 94) ('2006-01-05', '2018-02-21 23:29:50.681237', 'BUY', 'RHAT', 100.0, 35.14, 95) ('2006-01-05', '2018-02-21 23:29:50.696837', 'BUY', 'RHAT', 100.0, 35.14, 96) ('2006-01-05', '2018-02-21 23:29:50.712437', 'BUY', 'RHAT', 100.0, 35.14, 97) ('2006-01-05', '2018-02-21 23:29:50.728037', 'BUY', 'RHAT', 100.0, 35.14, 98) ('2006-01-05', '2018-02-21 23:29:50.743637', 'BUY', 'RHAT', 100.0, 35.14, 99)
ログをディスクに吐いてるようなので、それもメモリにすればもうちょっと早くなる??未評価。
dbconnection =sqlite3.connect(":memory:", check_same_thread = False) dbconnection.isolation_level = None dbcursor = dbconnection.cursor() dbcursor.execute('PRAGMA temp_store=MEMORY;') dbcursor.execute('PRAGMA journal_mode=MEMORY;')
リングバッファと計算
# -*- coding: utf-8 -*- class RingBuffer(): def __init__(self,bufferSize): self.size = bufferSize self.pc= 0 #次に使うバッファの番号 self.data=[None]*self.size #固定長さのリスト self.valid=False #リングバッファがいっぱいになったらTrueにする。 def push(self,value): print("pc=",self.pc) self.data[self.pc]=value self.pc=self.pc+1 if self.pc==self.size: self.pc=0 self.valid=True def max(self): if self.valid: return max(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return max(self.data[0:self.pc-1]) def min(self): if self.valid: return min(self.data) else: if self.pc==0: return None else: print("pc=",self.pc) return min(self.data[0:self.pc-1]) def average(self): if self.valid: return sum(self.data)/self.size else: if self.pc==0: return None else: print("pc=",self.pc) return sum(self.data[0:self.pc-1])/self.size def dump(self): return self.data def main(): rb=RingBuffer(5) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(2) rb.push(3) rb.push(4) print(rb.dump()) print("max=",rb.max()) #バッファ内の最大を返す print("min=",rb.min()) #バッファ内の最小を返す rb.push(1) rb.push(1) rb.push(2) rb.push(2) print(rb.dump()) print(rb.max()) #バッファ内の最大を返す print(rb.min()) #バッファ内の最小を返す print(rb.average()) #バッファ内の最小を返す if __name__== '__main__': main()