Cucco’s Compute Hack

コンピュータ関係の記事を書いていきます。

multiprocessingのサンプルコード

multiprocessingのサンプルコード。
マルチコア処理してほしい&処理には共通の情報を利用する、という条件あり。
メンバ変数の書き換えは、returnには反映されるが、実行のたびに1に戻っている感じ。

# -*- coding: utf-8 -*-

from multiprocessing import Pool
import time
import os

def f(x):
	print("f(x)",os.getpid())
	time.sleep(1)
	print(x)
	return x*x

class MyClass():
	def __init__(self):
		print("init MyClass")
		self.foo=1 #共通の情報
	def g(self,x):
		print("MyClass g(x)",os.getpid())
		time.sleep(1)
		#共通の情報にアクセスできる?→できた。
		print(x,self.foo)
		#共通の情報の値を書き換えはできないみたい。returnには反映されるが、1に戻っている感じ。
		self.foo=2
		return x*x+self.foo

if __name__ == '__main__':
	print("main()",os.getpid())
	myclass=MyClass()
	
	#普通の関数を実行
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(f, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])
		
	print("----------------------------")
	
	#クラスの中の関数を実行
	#新しいpoolにしてるのでプロセスIDは変わる。
	with Pool(2) as pool:
		multiple_results = [pool.apply_async(myclass.g, (i,)) for i in range(5)]
		print([res.get() for res in multiple_results])

実行結果

main() 6120
init MyClass
f(x) 4252
f(x) 2784
0
f(x) 4252
1
f(x) 2784
2
f(x) 4252
3
4
[0, 1, 4, 9, 16]
                                                      • -
MyClass g(x) 7020 MyClass g(x) 5204 0 1 MyClass g(x) 7020 1 1 MyClass g(x) 5204 2 1 MyClass g(x) 7020 3 1 4 1 [2, 3, 6, 11, 18]

Threadを使うときの"TypeError: function1() got multiple values for argument 'arg1'"

threading.Threadを使って引数持つ関数を実行すると、以下のようなエラーになることがある。
run()に直接関数を記述せず、すでにある関数をrun()の中で実行しようとすると起きる。

"TypeError: function1() got multiple values for argument 'arg1'"

原因はよくわからない。

とりあえず回避した例。func1でself.kwargsを使うのと何が違うのか・・・。

# -*- coding: utf-8 -*-
import threading

class MyThread(threading.Thread):
	def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None):
		#threading.Threadとしてのコンストラクタ
		threading.Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None)
		
		#threadの中で使う引数(辞書)
		self.kwargs = kwargs
		return
		
	def func1(self,*args,**kwargs):
		x=kwargs["kwargs"]
		a=x["a"]
		b=x["b"]
		c=x["c"]
		print("thread main fun1",a,b,c)
		return
		
	def run(self):
		#実際の処理は、func1で行う。
		self.func1(kwargs=self.kwargs)
		return

def main():
	kwargs={"a":1,"b":2,"c":3}
	t=MyThread(kwargs=kwargs)
	t.start()

if __name__== '__main__':
	main()

時刻ちょうどに実行する

プログラム

# -*- coding: utf-8 -*-
import threading
import datetime
from time import sleep

"""
1秒ごとに交互に実行する。
時刻は現在時刻を取得して、1/100秒単位くらいでx秒ちょうどに開始したい。
→時刻取得の関数が重たいのか、精度が出ているかどうかよくわからない。
"""
class MyThread(threading.Thread):
	def __init__(self, odd = True ,second = 2):
		self.dt_offset=datetime.timedelta(seconds=second) #繰り返しの間隔(秒)
		self.stop_event = threading.Event() #停止させるためのフラグ
		
		self.dt_now=datetime.datetime.utcnow()
		
		self.odd=odd
		if odd==True:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=0, microsecond=0, tzinfo=None)
		else:
			self.dt_next=datetime.datetime(self.dt_now.year, self.dt_now.month, self.dt_now.day, self.dt_now.hour, self.dt_now.minute, second=1, microsecond=0, tzinfo=None)
			
		threading.Thread.__init__(self) #threadとしてのコンストラクタ
		
	def stop(self):
		#終了させるためのもの。
		print("Thread Stop Event")
		self.stop_event.set()
	
	def run(self):
		while not self.stop_event.is_set():
			
			self.dt_next = self.dt_next + self.dt_offset
			self.dt_now = datetime.datetime.utcnow()
			
			if self.dt_now > self.dt_next:
				continue
			
			#時刻になるまで待ち
			sleep( timedelta2sec( self.dt_next - self.dt_now ) )
			
			#実行したい内容をここに記述
			print(self.name,self.dt_offset.seconds,self.odd,datetime.datetime.utcnow())
			
		print("thread end")
		return

def timedelta2sec(t):
	return t.days * 3600 * 24 + t.seconds + t.microseconds / 1000000

if __name__ == '__main__':
	x=MyThread(second=2, odd=True)
	y=MyThread(second=2, odd=False)
	
	x.start()
	y.start()
	
	#15秒したら終了させる。
	sleep(15)
	x.stop()
	y.stop()
	
	#スレッドの終了まち
	x.join()
	y.join()
	
	print("プログラム終了")


実行結果

python.exe timerThreadTest.py
Thread-2 2 False 2017-09-06 05:13:45.017042
Thread-1 2 True 2017-09-06 05:13:46.021042
Thread-2 2 False 2017-09-06 05:13:47.020042
Thread-1 2 True 2017-09-06 05:13:48.008042
Thread-2 2 False 2017-09-06 05:13:49.005042
Thread-1 2 True 2017-09-06 05:13:50.018042
Thread-2 2 False 2017-09-06 05:13:51.012042
Thread-1 2 True 2017-09-06 05:13:52.029042
Thread-2 2 False 2017-09-06 05:13:53.026042
Thread-1 2 True 2017-09-06 05:13:54.018042
Thread-2 2 False 2017-09-06 05:13:55.027042
Thread-1 2 True 2017-09-06 05:13:56.021042
Thread-2 2 False 2017-09-06 05:13:57.012042
Thread-1 2 True 2017-09-06 05:13:58.022042
Thread-2 2 False 2017-09-06 05:13:59.027042
Thread Stop Event
Thread Stop Event
Thread-1 2 True 2017-09-06 05:14:00.016042
thread end
Thread-2 2 False 2017-09-06 05:14:01.007042
thread end
プログラム終了

ファイルの最後の1行を読む

ファイルの最後の1行を読みたい時。

# -*- coding: utf-8

file_name="some_file.csv"

with open(file_name,'r') as f:
	num_lines = sum(1 for line in f)
	f.seek(0)
	for line in range(0, num_lines-1):
		f.readline()
	line = f.readline()
print(line)

QueueをつかったPython マルチスレッド

マルチスレッドのテストプログラム。
Listをもらって、加算して、結果をグローバルのリストに書き込む。

import threading
import queue
import time

commonList=[]
q=queue.Queue()

def worker():
	"""
		マルチスレッドで走らせる関数
		Queueからデータをもらう。
		結果をcommonListに書き込む
		計算内容や書き込む場所はqに書いてある
	"""
	global q
	global commonList
	print("thread Start")
	
	while True:
		time.sleep(0.1) #Ctrl+Cで終了させるおまじない。
		item = q.get()
		if item is None:
			print("thread None")
			break
		commonList.append((item.ID,sum(item.values)))
		q.task_done()
	print("thread End")

def main():
	global q
	global commonList
	num_worker_threads=3
	a=Job([1,2,3],0)
	b=Job([3,4,5],1)
	c=Job([6,7,8],2)
	d=Job([9,0,1],3)
	e=Job([2,3,4],4)
	f=Job([5,6,7,8],5)
	
	threadItem=[a,b,c,d,e,f]
	
	threads=[]
	
	#スレッドを立てる。作業はあとでQueueで渡す。
	for i in range(num_worker_threads):
		t = threading.Thread(target=worker)
		t.start()
		threads.append(t)
	
	#itemを投入。データを渡す。
	for item in threadItem:
		q.put(item)
	
	# block until all tasks are done qが空になるのを待っている。
	q.join()

	#スレッド停止命令(None)の投入
	for i in range(num_worker_threads):
		q.put(None)
	
	#スレッドの終了まち
	for t in threads:
		t.join()
	
	#結果の表示
	print(commonList)

class Job():
	#xはリスト,yは書き込み先のリストのインデックス
	def __init__(self,x,y):
		self.values=x.copy()
		self.ID=y

if __name__ == '__main__':
	main()

結果

C:\Python34>python multiThreadTest.py
thread Start
thread Start
thread Start
thread None
thread End
thread None
thread End
thread None
thread End
[(0, 6), (1, 12), (2, 21), (3, 10), (4, 9), (5, 26)]

DBに特定のデータがあるかどうかを確認する

DB内にデータがあるかどうか確認する。
あった時は更新(update)、なかった時は挿入(insert)するように処理すればよい。

cur.execute(sql,data)に渡した時にSQL文がうまく展開されない?問題で困った。

# -*- coding: utf-8

import mysql.connector
import datetime

config = {
    'user': 'root',
    'password': 'password',
    'host': 'localhost',
    'database':'testdb',
    'charset':'utf8'
}
cnx = mysql.connector.connect(**config)
cur = cnx.cursor(buffered=True)

#tableの削除と作成
sql = 'DROP TABLE IF EXISTS TEST_UPDATE;'
cur.execute(sql)
cnx.commit()
sql = 'CREATE TABLE IF NOT EXISTS TEST_UPDATE (\
							id INT UNSIGNED NOT NULL AUTO_INCREMENT,\
							time DATETIME DEFAULT NULL, \
							value1 INT UNSIGNED DEFAULT NULL ,\
							value2 INT UNSIGNED DEFAULT NULL ,\
							primary key(id),\
							unique (time));'
cur.execute(sql)
cnx.commit()

#初期データのインサート
sql = 'INSERT INTO TEST_UPDATE (time, value1,value2) VALUES (%s, %s, %s);'
data= ('2017-5-01T05:50:00','1','1')
cur.execute(sql, data)

sql = 'INSERT INTO TEST_UPDATE (time, value1,value2) VALUES (%s, %s, %s);'
data= ('2017-5-01T05:50:05','2','2')
cur.execute(sql, data)
cnx.commit()

#この時点でのテーブルの中身
#mysql> select * from test_update;
#+----+---------------------+--------+--------+
#’ id | time                | value1 | value2 |
#+----+---------------------+--------+--------+
#|  1 | 2017-05-01 05:50:00 |      1 |      1 |
#|  2 | 2017-05-01 05:50:05 |      2 |      2 |
#+----+---------------------+--------+--------+
#2 rows in set (0.00 sec)

#データの有無の確認1
sql= "select exists(select * from test_update where time=%s);"
#data=(datetime.datetime(2017,5,1,5,50,00),)
data= ('2017-5-01T05:50:00',)#なぜかカンマが必要。
try:
	cur.execute(sql,data)
except mysql.connector.Error as err:
	print(cur.statement)#実行したSQL文の確認
	raise

#print(cur.fetchone())
#あった時…(1,)
#なかった時…(0,)
if cur.fetchone()[0]==0:
	print("なかった")
else:
	print("あった")

#データの有無の確認2
#テーブル名をdataで渡そうとすると、クォートがついてしまってうまく動かない。
tablename='test_update'
sql= "select exists(select * from " + tablename +" where time=%s);"
data= ('2017-5-01T05:50:05',)
try:
	cur.execute(sql,data)
except mysql.connector.Error as err:
	print(cur.statement)#実行したSQL文の確認
	raise

if cur.fetchone()[0]==0:
	print("なかった")
else:
	print("あった")

#データの有無の確認3
tablename='test_update'
sql= "select exists(select * from " + tablename +" where time=%s);"
data= ('2017-5-01T05:50:15',)
try:
	cur.execute(sql,data)
except mysql.connector.Error as err:
	print(cur.statement)#実行したSQL文の確認
	raise

if cur.fetchone()[0]==0:
	print("なかった")
else:
	print("あった")

cur.close()
cnx.close()
print('end')

pythonのunittestのコード

ちょっと前に書いた記事compute-cucco.hatenablog.comのテストコード。というか抽象クラスは関係ないので、ただのテストコード。
raiseに対するテストコードの書き方が分からない。。。

# -*- coding: utf-8 -*-
import unittest
import movingCalcs
from movingCalcs import factory

class TestmovingCalcs(unittest.TestCase):
	"""test class of movingCalcs.py
	"""

	def test_add(self):
		"""test method for Add
		"""
		node = factory("Add")
		value1 = 1
		value2 = 2
		expected = 3
		actual = node.calc(value1, value2)
		self.assertEqual(expected, actual)

	def test_sub(self):
		"""test method for sub
		"""
		node = factory("Sub")
		value1 = 1
		value2 = 2
		expected = -1
		actual = node.calc(value1, value2)
		self.assertEqual(expected, actual)

	def test_zero(self):
		"""test method for Zero
		"""
		node = factory("Zero")
		value1 = 1
		value2 = 2
		expected = 0
		actual = node.calc(value1, value2)
		self.assertEqual(expected, actual)

	def test_other(self):
		"""test method for other
		"""
		with self.assertRaises(factory):
			node = factory("Other")

if __name__ == "__main__":
    unittest.main()