Cucco’s Compute Hack

コンピュータ関係の記事を書いていきます。

seabornを使ってみた ヒートマップを描画する

元データをピボット処理してヒートマップに描画する例。

f:id:Cucco:20201123144251p:plain

ヒートマップの際の並べ替え指定が面倒なので、昇順に並べればよいような行/列名にしておいたほうがよさそう。

各列の最大値に対して、強調枠を設定。枠の場所の指定方法が、dfの並びと同様、左上がゼロ。位置関係の取得はループで回している。

ソース
import pandas as pd
import seaborn as sns

import matplotlib.pyplot as plt
import os

time_5 ='time_5'
time_10 ='time_10'
time_15 = 'time_15'

list_2d =[[time_5,1,9],
[time_5,2,6],
[time_5,3,3],
[time_5,-1,9],
[time_5,-2,6],
[time_5,-3,3],
[time_10,1,8],
[time_10,2,5],
[time_10,3,2],
[time_10,-1,8],
[time_10,-2,5],
[time_10,-3,2],
[time_15,1,7],
[time_15,2,4],
[time_15,3,1],
[time_15,-1,7],
[time_15,-2,4],
[time_15, -3, 1]]

df = pd.DataFrame(list_2d,columns=['Time', 'Class', 'Score'])
print(df)
#       Time  Class  Score
#0    time_5      1      9
#1    time_5      2      6
#2    time_5      3      3
#3    time_5     -1      9
#4    time_5     -2      6
#5    time_5     -3      3
#6   time_10      1      8
#7   time_10      2      5
#8   time_10      3      2
#9   time_10     -1      8
#10  time_10     -2      5
#11  time_10     -3      2
#12  time_15      1      7
#13  time_15      2      4
#14  time_15      3      1
#15  time_15     -1      7
#16  time_15     -2      4
#17  time_15     -3      1

print(df.pivot(index='Time', columns='Class', values='Score'))
# Class    -3  -2  -1   1   2   3
# Time
# time_10   2   5   8   8   5   2
# time_15   1   4   7   7   4   1
# time_5    3   6   9   9   6   3


pivot = df.pivot(index='Class', columns='Time', values='Score')

print("ピボットの順番並べ替え")
pivot = pivot.sort_index(axis=0, ascending=False, inplace=False)
pivot = pivot.loc[:, ['time_5', 'time_10', 'time_15']]

print(pivot)

sns.heatmap(pivot, cmap='Blues', annot=True, fmt='d', linewidths=.5,square=True)

# plt.show()では、次のエラーになる
# 'AxesSubplot' object has no attribute 'show'
# import matplotlib.pyplot as plt が必要。

# snsからpltへの間のデータの受け渡しがどうなっているのか未確認。
plt.show(block=False)

# ファイルに保存
img_file_name= os.path.dirname(os.path.abspath(__file__)) + os.sep + "heatmap.png"
plt.savefig(img_file_name)

# 各列、最大値に対して強調表示
for col_index,col_index_name in enumerate(pivot.columns.values):
    max_val= pivot[col_index_name].max()
    for row_index, row_value in enumerate(pivot[col_index_name] == max_val):
        if row_value==True:
        # Rectangleは、左上が(0,0)
            ax.add_patch(Rectangle((col_index, row_index), 1, 1, fill=False, edgecolor='red', lw=3))

plt.show(block=False)
img_file_name= os.path.dirname(os.path.abspath(__file__)) + os.sep + "heatmap_w_patch_loc.png"
plt.savefig(img_file_name)
結果
f:id:Cucco:20201122175734p:plain
ヒートマップ

並べ替えをしなかった場合

f:id:Cucco:20201122180030p:plain
ヒートマップ。並べ替えをしなかった場合

各列の最大値に対して強調枠設定

f:id:Cucco:20201123144251p:plain
強調枠を設定

リストを列に見立てて、列を拡張(追加)。DBMSにテーブル定義して値も格納する。

リストを列に見立て、列を拡張(追加)していけるクラス。
最終的にはDBMSにテーブル定義して値も格納する。

ファイルの名前とか、クラスの名前とか、どう表現すればよいのかやや悩み。

プログラム list_manager_for_table.py
import sqlite3

class list2d_for_table():
    def __init__(self,table_name):
        self.table_name = table_name
        self.column_names_list = []
        self.column_values_list = []
        self.column_type_list = []

        self.rows=None
    
    def concat_column(self,value_list, column_name, column_type):
        if self.rows is None:
            self.rows = len(value_list)
        elif self.rows != len(value_list):
            raise

        self.column_names_list.append(column_name)
        self.column_values_list.append(value_list)

        if column_type == "INTEGER" or column_type == "REAL" or column_type == "TEXT":
            self.column_type_list.append(column_type)
        else:
            # 型は「INTEGER」「REAL」「TEXT」に限る。
            raise ValueError("型は「INTEGER」「REAL」「TEXT」に限る。")

    def build_sql_create_table(self):
        # CREATE TABLE test_table ( pi INTEGER, alphabet TEXT )
        # 「NULL」「INTEGER」「REAL」「TEXT」「BLOB」

        internal = []
        for i in range(len(self.column_type_list)):
            word="{0} {1}".format(self.column_names_list[i], self.column_type_list[i])
            internal.append(word)
        
        sql = "CREATE TABLE {0} ( {1} )".format(self.table_name, ", ".join(internal))
        
        return sql

    def to_dbms(self, db_cursor):
        for i in range(self.rows):  # 全ての行に対して、
            internal=[]
            for col_index in range(len(self.column_type_list)):  # 各列に対して
                if self.column_type_list[col_index] == "TEXT":
                    internal.append(f"'{self.column_values_list[col_index][i]}'")
                else:
                    if self.column_values_list[col_index][i] is None:
                        internal.append('NULL')
                    else:
                        internal.append(str(self.column_values_list[col_index][i]))
            
            sql=f"INSERT INTO { self.table_name } VALUES ({', '.join(internal)})"
            db_cursor.execute(sql)

if __name__ == "__main__":

    x = list2d_for_table("test_table")
    y = [3, 1, 4, 1, 5, 9,  None,'null']
    z = ['a', 'b', 'c', 'd', 'e', 'f', 'None','null']

    x.concat_column(y,"pi","INTEGER")
    x.concat_column(z,"alphabet","TEXT")
    print(x.build_sql_create_table())


    con = sqlite3.connect(":memory:")
    con.isolation_level = None
    cur = con.cursor()
    cur.execute('PRAGMA temp_store=MEMORY;')
    cur.execute('PRAGMA journal_mode=MEMORY;')

    cur.execute(x.build_sql_create_table())

    for row in cur.execute("select sql from sqlite_master where type='table'"):
        print(row)

    x.to_dbms(cur)

    for row in cur.execute("select * from test_table"):
        print(row)
結果
CREATE TABLE test_table ( pi INTEGER, alphabet TEXT )
('CREATE TABLE test_table ( pi INTEGER, alphabet TEXT )',)
(3, 'a')
(1, 'b')
(4, 'c')
(1, 'd')
(5, 'e')
(9, 'f')
(None, 'None')
(None, 'null')

シフト幅を変更できるリスト一括比較

こんな感じの比較。2つ先とか⁺3つ先とかシフト幅を変更できる。
f:id:Cucco:20201010165817p:plain

プログラム
def compair(x, y, y_shift, negative=0, na=None):
    '''
    リストを利用した一括の比較
    x[i]<=y[i+y_shift]の結果のリストを返す。
    Trueのときには1
    Falseの時にはnegativeの値が入る。
    比較する値がない場合は、naの値が入る。
    '''
    if len(x) != len(y):
        raise
    elif abs(y_shift) >= len(x):
        raise

    z = [na] * len(x)
    
    if y_shift < 0:
        for i in range(len(x) + y_shift):  # y_shiftは負の数なので、ループ回数は減る。
            if x[i - y_shift] is None:
                z[i - y_shift] = na
            elif y[i] is None:
                z[i - y_shift] = na
            elif  x[i - y_shift] <= y[i]:
                z[i - y_shift] = 1
            else:
                z[i- y_shift] = negative
    else:
        for i in range(len(x) - y_shift):
            if x[i] is None:
                z[i] = na
            elif y[i + y_shift] is None:
                z[i] = na
            elif x[i] <= y[i + y_shift]:
                z[i] = 1
            else:
                z[i] = negative
    return z

def compair_offset(x, y, y_shift, y_offset, negative=0, na=None):
    '''
    yの値は、xより、y_offset以上大きい場合。
    '''
    for i in range(len(y)):
        y[i] = y[i] - y_offset
    
    z = compair(x, y, y_shift, negative, na)
    return z


if __name__ == "__main__":

    x = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5, 9]
    y = [2, 7, 1, 8, 2, 8, 1, 8, 2, 8, 4, 6]

    print(compair(x, y, 0))
    print(compair(x, y, 1))

    print(compair(x, y, 2))
    print(compair(x, y, 10))
    print(compair(x, y, 11))

    print(compair(x, y, -1))
    print(compair(x, y, -2))
    print(compair(x, y, -10))
    print(compair(x, y, -11))

    print(compair(x, y, 3, na=0))
    print(compair(x, y, -3, negative=0))

    print(compair_offset(x, y, y_shift=0, y_offset=2, negative=0))

    print(compair_offset(x, y, y_shift=2, y_offset=2, negative=0))
結果
[0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0]
[1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, None]
[0, 1, 0, 1, 0, 0, 1, 1, 0, 1, None, None]
[1, 1, None, None, None, None, None, None, None, None, None, None]
[1, None, None, None, None, None, None, None, None, None, None, None]
[None, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0]
[None, None, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0]
[None, None, None, None, None, None, None, None, None, None, 0, 0]
[None, None, None, None, None, None, None, None, None, None, None, 0]
[1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0]
[None, None, None, 1, 1, 0, 1, 0, 1, 0, 1, 0]
[0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0]
[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, None, None]

テーブルの定義とデータをファイルに。インポートとエクスポート。

テーブルの定義とデータをファイルに。インポートとエクスポート。

import sqlite3
import datetime
import os
import csv

import time # ダミーデータを作るときのsleep用。

class tabledata_file_converter():
    """テーブルの情報をファイルに書き出す、ファイルからテーブルを復元する
    """
    @staticmethod
    def get_table_definition_from_sqlite_master(db_cursor, table_name):
        # dbmsからテーブル定義の情報を取得する
        sql = f"select sql from sqlite_master where type='table' and name='{table_name}'"
        db_cursor.execute(sql)
        row = db_cursor.fetchone()
        if row is None:
            raise
        return row[0]
        
    @staticmethod
    def build_fullpath_table_def_file(table_name, folder_path=None):
        # テーブル定義を書き込むファイルのパスを作る
        post_fix="_table_def.txt"
        if folder_path is None:
            file_fullpath = os.path.dirname(os.path.abspath(__file__)) + os.sep + table_name + post_fix
        else:
            file_fullpath = folder_path + os.sep + table_name + post_fix
        return file_fullpath

    @staticmethod
    def build_fullpath_table_dat_csv(table_name, folder_path=None):
        # テーブルのデータを書き込むファイルのパスを作る
        post_fix="_table_dat.csv"
        if folder_path is None:
            file_fullpath = os.path.dirname(os.path.abspath(__file__)) + os.sep + table_name + post_fix
        else:
            file_fullpath = folder_path + os.sep + table_name + post_fix
        return file_fullpath

    @staticmethod
    def save_def_file(table_name, table_definition, folder_path=None):
        # テーブル定義をファイルに書き込む
        save_file_fullpath = tabledata_file_converter.build_fullpath_table_def_file(table_name, folder_path)

        with open(save_file_fullpath, mode='w', encoding='utf-8') as f:
            f.write(table_definition)

    @staticmethod
    def read_from_def_file(table_name, folder_path=None):
        # テーブル定義をファイルから読み出す。
        read_file_fullpath = tabledata_file_converter.build_fullpath_table_def_file(table_name, folder_path)

        if os.path.isfile(read_file_fullpath) is False:
            raise
        
        with open(read_file_fullpath, mode='r', encoding='utf-8') as f:
            row = f.readline()
        return row
    
    @staticmethod
    def is_exist_table(db_cursor, tale_name):
        #テーブルの存在確認
        db_cursor.execute(f'SELECT COUNT(*) FROM sqlite_master WHERE TYPE="table" AND NAME="{tale_name}"')
        if db_cursor.fetchone() == (0,): #存在しないとき
            return False
        else:
            return True

    @staticmethod
    def list_to_sql_str(list_data):
        '''
        list_to_sql_str(["hoge", 3, 0.14])
        "'hoge',3,0.14" 
        '''
        y=[]
        for item in list_data:
            if type(item) is str:
                y.append("'"+item+"'")
            else:
                y.append(str(item))
        
        return ','.join(y)

    @staticmethod
    def load_from_csv(db_cursor, table_name, folder_path=None):
        # ファイル(データと定義)からテーブルを復元する

        # テーブルの有無確認 テーブルがあればエラー終了させる
        if __class__.is_exist_table(db_cursor, table_name) is True:
            raise

        # テーブルを定義する
        table_def_sql = __class__.read_from_def_file(table_name)
        db_cursor.execute(table_def_sql)

        read_file_fullpath = __class__.build_fullpath_table_dat_csv(table_name, folder_path)

        f = open(read_file_fullpath,mode='r',encoding='utf-8', newline='')
        csv_reader = csv.reader(f, delimiter=',', quotechar='"', lineterminator='\n')

        for row in  csv_reader:
            insert_sql = f"INSERT INTO {table_name} VALUES({ __class__.list_to_sql_str(row)})"
            db_cursor.execute(insert_sql)
        
    @staticmethod
    def save_csv(db_cursor, table_name, folder_path=None):
        save_file_fullpath=tabledata_file_converter.build_fullpath_table_dat_csv(table_name=table_name, folder_path=folder_path)
        
        f = open(save_file_fullpath,mode='w',encoding='utf-8', newline='')
        csv_writer = csv.writer(f, delimiter=',', quotechar='"', lineterminator='\n')
        
        for row in db_cursor.execute(f'SELECT * FROM {table_name}'):
            csv_writer.writerow(row)

        f.close()


if __name__ == '__main__':
    con = sqlite3.connect(":memory:", check_same_thread=False)
    con.isolation_level = None

    cur = con.cursor()
    cur.execute('PRAGMA temp_store=MEMORY;')
    cur.execute('PRAGMA journal_mode=MEMORY;')

    # Create table
    cur.execute('CREATE TABLE stocks (date text, ts timestamp, trans text, symbol text, qty real, price real, add_col integer)')

    # ダミーデータを作る
    for i in range(100):
        # Insert a row of data
        now = datetime.datetime.now()
        cur.execute("INSERT INTO stocks VALUES ('2006-01-05',?,'BUY','RHAT',100,35.14,?)", (now, str(i)))
        time.sleep(0.01)

    # 新しいほうから3件だけを表示する。
    for row in cur.execute("SELECT * From stocks ORDER BY add_col DESC LIMIT 3"):
        print(row)

    # SQL定義の確認
    for row in cur.execute("select sql from sqlite_master where type='table' and name='stocks'"):
        print(row)

    table_definision = tabledata_file_converter.get_table_definition_from_sqlite_master(db_cursor=cur, table_name='stocks')
    print(table_definision)

    # テーブル定義の書き出し
    tabledata_file_converter.save_def_file(table_name='stocks', table_definition=table_definision)

    # テーブル定義の読み出し
    table_definision2 = tabledata_file_converter.read_from_def_file(table_name='stocks')
    print(table_definision2)

    # テーブルデータの書き出し
    tabledata_file_converter.save_csv(cur, "stocks")

    # テーブルの有無
    print(tabledata_file_converter.is_exist_table(cur,"stocks"))
    print(tabledata_file_converter.is_exist_table(cur, "stocksA"))

    # テーブルデータの復元(stocksA向けのファイルを事前に作っておく)
    tabledata_file_converter.load_from_csv(cur, "stocksA")

    # 復元されたデータの表示。新しいほうから3件だけを表示する。
    for row in cur.execute("SELECT * From stocksA ORDER BY add_col DESC LIMIT 3"):
        print(row)

    # 復元されたデータの表示。件数を表示する
    for row in cur.execute("SELECT COUNT(*) From stocksA ORDER BY add_col DESC LIMIT 3"):
        print(row)

    # SQL定義の確認
    for row in cur.execute("select sql from sqlite_master where type='table'"):
        print(row)

PlantUMLの配置図の位置調整

PlantUMLの配置図の、レイアウトというか位置調整がうまくいかなかったのでメモ

配置図の構文と機能

生成された図
f:id:Cucco:20200810202741p:plain
生成された図


umlファイルの上に書いたものが左に配置される。
矢印でつなぐ際に「-」では横でつながるが、「--」や「..」など、2個でつなぐと上下関係が生まれる。

ソースコード
@startuml deployment-diagram

node ノード1[
    node1
]
' 2段目の左にあるブロックから記載していく。
node ノード2[
    node2
]
node ノード3[
    node3
]
node ノード4[
    node4
]

' 矢印でつなぐ
ノード1 --> ノード3 :上下関係での接続 
ノード2 <- ノード3: 横関係での接続(左向き)
ノード3 -> ノード4: 横関係での接続(右向き)

' ノートの記載は他の図と同じ。
note left of ノード1 : 親のノード

@enduml

Python サブモジュールのディレクトリ構造とunittestの話

プログラムとunittestを同じディレクトリに置くパタンのテスト。
パッケージ(hello_submodule)の同列にtestディレクトリを作って、unittestのファイルを置く構成が標準らしいが、パスの設定関係がとても面倒なので。

プログラムとunittestを同じディレクトリに置くパタンの場合、カレントディレクトリがhello_submoduleにあるものとしてfromなどを記述すればよさそう。

ディレクトリ構造
─hello_submodule
  │  program_main.py
  │  program_main_test.py
  │  
  ├─.vscode
  │      launch.json
  │      settings.json
  │      
  ├─sub_module_1
  │      program_submod1.py
  │      program_submod1_test.py
  │      __init__.py
  │          
  └─sub_module_2
          program_submod2.py
          program_submod2_test.py
          __init__.py
ソースコード

1. program_main.py

from sub_module_1 import program_submod1
from sub_module_2.program_submod2 import Class_Submod2

def main_func():
    x = program_submod1.func1()
    y = Class_Submod2()
    z = y.func2()

    print("this function is program_main.main_func()")
    print(f"x={x}, y={y},z={z}")

    return 3

if __name__ == '__main__':
    main_func()

2. program_main_test.py

import unittest
import program_main

class Testprogram_main(unittest.TestCase):

    def test_main_func(self):
        self.assertEqual(program_main.main_func(), 3)


if __name__ == '__main__':
    unittest.main()

3. program_submod1.py

def func1():
    print("this function is sub_module_1.func1()")
    return (1)

4. program_submod1_test.py

import unittest
from sub_module_1 import program_submod1

class Testprogram_submod1(unittest.TestCase):

    def test_func1(self):
        self.assertEqual(program_submod1.func1(), 1)

if __name__ == '__main__':
    unittest.main()

5. program_submod2.py

class Class_Submod2:
    def __init__(self):
        self.v = 2
        pass

    def func2(self):
        print("this function is sub_module_2.func2() in Class_Submod2")
        return (self.v)

6. program_submod2_test.py

import unittest
from sub_module_2 import program_submod2

class Testprogram_submod2(unittest.TestCase):

    def test_func1(self):
        x=program_submod2.Class_Submod2()
        self.assertEqual(x.func2(), 2)

if __name__ == '__main__':
    unittest.main()

7. __init__.py
空のファイル。

8. setting.json(vscodeの設定ファイル)

{
    "python.testing.unittestArgs": [
        "-v",
        "-s",
        ".",
        "-p",
        "*_test.py"
    ],
    "python.testing.pytestEnabled": false,
    "python.testing.nosetestsEnabled": false,
    "python.testing.unittestEnabled": true
}
コマンドラインでのテスト実行例
(base) C:\dev\SampleCodes\hello_submodule>python -m unittest discover -p "*_test.py"
this function is sub_module_1.func1()
this function is sub_module_2.func2() in Class_Submod2
this function is program_main.main_func()
x=1, y=<sub_module_2.program_submod2.Class_Submod2 object at 0x0000021BF07B07C8>,z=2
.this function is sub_module_1.func1()
.this function is sub_module_2.func2() in Class_Submod2
.
----------------------------------------------------------------------
Ran 3 tests in 0.005s

OK
||>

*** パスの設定関係がとても面倒な理由
unittestファイル側から、テスト対象の関数が書いてある.pyファイルをimportしないといけないが、親ディレクトリをたどってimportする設定が手間なため。
全てのunittestファイルに以下を記載してもよいが手間。lynterが問題として検出することもある。

>||
import sys
sys.path.append(r"<パッケージのディレクトリ>")