ソースに絡まるエスカルゴ

貧弱プログラマの外部記憶装置です。

【Arduino】Seeed Studio XIAO ESP32C3でMQTTのPublishを行う

 過去の記事で色々とMQTTについての記事を書いてきました。

 今回は実際にありそうな組み合わせとして、Seeed Stuido XIAO ESP32C3から別端末のMQTTブローカーへPublishを行う記事になります。
 前提としてArduino IDEでXIAO ESP32C3を使えるようにしておく必要があるので注意してください。
 また、詳しい内容は参考資料に挙げているページ様を参照してください。


 では、始めます。


1:ArduinoMqttClientライブラリのインストール
 ESP32系でMQTTを扱うライブラリは色々と存在しますが、今回はArduino公式が出している「ArduinoMqttClient」を使っていきます。

 Arduino IDEを起動させて「ライブラリマネージャーのアイコン」をクリックし、出てきた検索欄に「ArduinoMqttClient」と入力し、出てきたライブラリの「インストール」ボタンをクリックしてインストールします。

 以下のように「installed」のマークになっていればインストールは完了しています。


2:ArduinoMqttClientを使ったサンプル
 サンプルスケッチは以下のようになります。
 前提として今回のサンプルは過去記事で説明したブローカーに対してPublishするものとなります。つまりポート番号は「1883」、トピック名は「test」という前提で記述しています。またブローカーのIPアドレスWiFiSSDとパスワードは適宜書き換えて動かしてください。

・XIAO_ESP32C3_mqtt_test.ino

#include <WiFi.h>
#include <ArduinoMqttClient.h>

char *ssid     = "******"; // 適宜書き換える
char *password = "******"; // 適宜書き換える

const char broker[]   = "192.168.xxx.xxx"; // 適宜書き換える
const int  brokerPort = 1883;              // 適宜書き換える
const char topic[]    = "test";            // 適宜書き換える

// WiFiクライアントとMQTTクライアントの準備
WiFiClient wifiClient;
MqttClient mqttClient(wifiClient);

void setup() {
  Serial.begin(115200);
  delay(1000);

  Serial.println("--- setup start ---");

  // WiFi接続
  Serial.println("Connecting wifi");
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    Serial.print(".");
    delay(500);
  }
  Serial.println("Connected!");

  // MQTTブローカー接続
  Serial.println("Connecting broker");
  while (!mqttClient.connect(broker, brokerPort)) {
    Serial.print(".");
    delay(500);
  };
  Serial.println("Connected!");

  Serial.println("--- setup end ---");
}

void loop() {
  // メッセージ送信(Publish)
  mqttClient.beginMessage(topic);
  mqttClient.print("Hello from ESP32C3.");
  mqttClient.endMessage();
  // 送信処理終了を表示
  Serial.println("send message!");

  delay(5000);
}

 上記のスケッチの必要な部分を適宜書き換えて実行しシリアルモニタを確認すると、以下のようにWiFiとBrokerに接続した後、5秒間隔でPublishしたことがわかるメッセージが表示されます。

13:54:42.045 -> --- setup start ---
13:54:42.045 -> Connecting wifi
13:54:42.086 -> .Connected!
13:54:42.586 -> Connecting broker
13:54:43.036 -> Connected!
13:54:43.036 -> --- setup end ---
13:54:43.081 -> send message!
13:54:48.091 -> send message!
13:54:53.055 -> send message!

 Subscribe側では以下のように5秒毎に「Hello from ESP32C3.」のメッセージを受信して表示されます。

$ mosquitto_sub -h "192.168.xxx.xxx" -t test
Hello from ESP32C3.
Hello from ESP32C3.
Hello from ESP32C3.


 以上がArduinoを使ってSeeed Studio XIAO ESP32C3でMQTTのPublishを行う方法になります。

 今回はやっていませんが、ライブラリのGithubを見るとSubscribeする関数も用意されているようなので、SubscribeもESP32側でできるみたいです。
 ライブラリがまだBeta版でバグがある可能性もありますが、割と簡単にPublishできることがわかったのでIoTなどで割と使えるのではないかと思います。


・参考資料

【Raspberry Pi】MQTTを使ってみる

 以前の記事でWindowsでMQTTを使ってみた記事を書きました。

 Windows機をMQTTサーバ(ブローカー)にすることは、正直ほとんどないと思います。

 なので今回は割とありそうなRaspberry PiをブローカーとしてMQTTを使ってみる方法の記事になります。
 より詳しく知りたい場合は参考資料に挙げているページ様を参照してください。

 では、始めます。


1:Mosquittoのインストール
 Windowsの時と同じようにMosquittoをインストールします。

 LXTerminalを起動させて以下のコマンドを実行します。

sudo apt install -y mosquitto mosquitto-clients

 このコマンドでMQTTブローカーとクライアントをインストールしています。


2:MQTTのPub/Subを試す
 1のインストールが終わったら、LXTerminalを3つ立ち上げて以下のコマンドをそれぞれのウィンドウで実行します。

・ブローカー用ウィンドウ

sudo systemctl start mosquitto

・Subscriber用ウインドウ

mosquitto_sub -h localhost -t test

・Publisher用ウインドウ

mosquitto_pub -h localhost -t test -m "test message"

 ブローカーを起動させるコマンドは違いますが、Pub/SubのコマンドはWindowsと変わりません。
 上記を実際に実行すると、Subscriber用ウインドウでPublishされる度に「test message」と表示されます。

 ブローカーが動いているかどうかの確認は以下のコマンドで確認します。

sudo systemctl status mosquitto

 実際に実行して以下のように「Active: active (running)」の文字があれば起動しています。

$ sudo systemctl status mosquitto
● mosquitto.service - Mosquitto MQTT Broker
     Loaded: loaded (/lib/systemd/system/mosquitto.service; enabled; preset: enabled)
     Active: active (running) since Tue 2024-04-02 13:32:25 JST; 16s ago

 また以下のコマンドでプロセスを確認してもブローカーが動いているかどうかの確認ができます。

ps aux | grep mosquitto

 実行した中に以下のようなプロセスがあれば、ブローカーが起動しています。

$ ps aux | grep mosquitto
mosquit+  2101  0.0  0.3  12136  6272 ?        Ss   16:05   0:00 /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf

 ブローカーを止めるコマンドは以下の通りです。

sudo systemctl stop mosquitto


3:別端末からのMQTT通信を許可する
 別端末からのMQTT通信を許可するには、Windowsの時と同じようにmosquitto.confファイルを編集します。

 なんのエディタでも良いですが、以下のようにmosquitto.confファイルを開きます(今回はnano)を使いました。

sudo nano /etc/mosquitto/mosquitto.conf

 ファイルの末尾に以下の2行を追記します。

listener 1883
allow_anonymous true

 実際に追記すると以下のようになります。

 追記後上書き保存をすれば設定は完了です。


4:別端末からのMQTT通信を試す
 この方法もWindowsと同じです。
 LXTerminalを3つ立ち上げて以下のコマンドをそれぞれのウィンドウで実行します。

・ブローカー用ウィンドウ

sudo systemctl start mosquitto

・Subscriber用ウインドウ

mosquitto_sub -h "192.168.0.1" -t test

・Publisher用ウインドウ

mosquitto_pub -h "192.168.0.1" -t test -m "test message"

 上記のようにホスト部分をRaspberry PiIPアドレスに書き換えれば別端末からのPub/Subの通信ができるようになります。


 以上がRaspberry PiでMQTTを使う方法になります。

 インストール方法やブローカーの起動、停止などの手順が違いましたが、それ以外はWindowsと変わらずに扱うことができます。
 実際問題としてこの方法の方が使う機会が多いと思いますし、また手順としても楽なので気軽にMQTTを試すことができると思います。


・参考資料

【Windows】WindowsでMQTTを使ってみる

 今回はWindowsでMQTTの環境を構築してみた時の備忘録になります。

 MQTT自体の簡単な説明も入れますが、詳しくは参考資料に挙げているページ様を参照するか、各自で調べるようお願いします。


 では、始めます。


0:MQTTの簡単な説明
 MQTTは「軽量な非同期通信」方法としてIoTやROSなどで使われています。

 MQTTの構成は以下のようになっています。

名称 説明
Publisher 配信者。データを渡す側。
Subscriber 購読者。データを受け取る側。
Broker ブローカー。PublisherとSubscriberを中継するMQTTサーバ。
Topic トピック。配信、購読する際のキー。

 MQTT通信の大まかな流れは以下のようになります。

  1. Publisher(配信者)がTopicを指定しデータをBrokerに送信する(略してPubと呼ぶ場合もある)。
  2. Subscriber(購読者)がBrokerに指定したTopicのデータが来ていないか問い合わせる(略してSubと呼ぶ場合もある)。
  3. データが来ていた場合、指定したトピックのデータをSubscriber(購読者)がBrokerから受信する。

 このように通信することでPublisher側もSubscriber側もお互いに相手を意識せず、ただトピックのみを知っていれば通信できることがわかると思います。また一度接続をすると継続して送受信を行えます。
 また冒頭で述べたように、MQTTによる通信はMQTTサーバを経由するため「リアルタイム通信ではなく非同期処理」になります。この非同期処理であることは利点でもありますが、場合によっては問題にもなるので注意してください。

 MQTTについてなんとなくわかったと思うので、次からは実際にWindowsでMQTT通信を行うための環境を構築していきます。


1:mosquittoのインストール
 今回はMQTTを使うためにmosquittoをインストールします。まずは以下のURLにアクセスしてください。

 以下のようなページが表示されるので環境に合ったものを選びます。自分の場合はWindowsの64bitなので赤枠で囲んだ方を選択しました。

 ダウンロードしたインストーラをダブルクリックで起動させます。

 インストーラを起動させると以下のような画面になるので「Next」をクリックします。

 インストールする内容選択画面になりますが、デフォルトのままで「Next」をクリックします。

 インストール先を選択して「Install」をクリックします。特にインストール先を気にしない場合はデフォルトのままでよいと思います。

 インストールが開始されます。

 インストールが終わると以下の画面になるので「Finish」をクリックします。

 インストール後は環境変数にパスを通します。

 Windowsの検索欄に「環境変数」と入力して出てきた「システム環境変数の編集」をクリックします。

 システムのプロパティが開くので「詳細設定」タブの「環境変数」をクリックします。

 今回はシステム全体で使えるようにしたいので、下側のシステム環境変数の「Path」を選択した状態で「編集」をクリックします。

「新規」をクリックしてmosquittoのパスを入力して追加し「OK」をクリックします。

 デフォルトのままインストールした場合は以下のパスを設定します。

C:\Program Files\mosquitto

 環境変数のパスを設定できたら、実際に動くか試してみます。

 コマンドプロンプトPowerShellを立ち上げて以下のコマンドを実行してみてください。

mosquitto -v

 実行して以下のように「running」のメッセージが表示されればmosquittoのブローカーが起動していることになります。ちなみに「Ctrl + Cキー」でブローカーを停止できます。

>mosquitto -v
~~色々なメッセージ~~
1711348549: mosquitto version 2.0.18 running

 コマンド実行時にエラーなどが出た場合はインストールが失敗しているか、環境変数のパスが通っていないのでもう一度確認してみてください。


2:MQTT通信を試してみる
 ブローカーの起動が確認できたので、次は実際にMQTTでの通信を試してみます。

 0のところで説明したように、MQTTにはブローカー、Subscriber、Publisherの3つが必要です。

 なので、まずはコマンドプロンプトまたはPowerShellを3つ立ち上げます。そして以下のコマンドを上から順番にそれぞれのウインドウで実行します。

・ブローカー用ウィンドウ

mosquitto -v

・Subscriber用ウインドウ

mosquitto_sub -h localhost -t test

・Publisher用ウインドウ

mosquitto_pub -h localhost -t test -m "test message"

 ブローカーを起動した状態でSubscribeとPublishのコマンドを実行すると、Subscribe側のウインドウに以下のように送信したメッセージが表示されます。

> mosquitto_sub -h localhost -t test
test message

 PC内のローカルでMQTTを行うのであればここまでの手順で終わりになります。ですが、別PCとのMQTT通信などを行う場合は以降の設定が必要になります。


3:mosquitto.confの編集
 mosquittoのインストールフォルダ内に「mosquitto.conf」ファイルがあるので開きます。

 ファイルの末尾に以下の2行を追加します。

listener 1883
allow_anonymous true

 少し解説すると「listener 1883」でMQTTで外部との通信で使用するポートを指定しています。このlistenerの行を複数書くことで複数のポートを指定することもできます。
「allow_anonymous true」ではログイン認証を行わない設定にしています。セキュリティを高めるのであれば「false」と設定してください。

 このファイルがProgram Files配下にある場合は編集時に管理者権限を要求される可能性がありますが、その場合は権利者権限でファイルを編集してください。


4:ファイアウォールの許可
 同一ネットワーク上の別の端末からMQTTを受け取りたい場合はファイアウォールの設定をする必要があります。

 Windowsの検索欄に「ファイアウォール」と入力して出てきた中にある「セキュリティが強化されたWindows Defender ファイアウォール」をクリックします。

 アプリが立ち上がるので左側にある「受信の規則」をクリックし、右側にある「新しい規則」をクリックします。

 受信規則の追加画面が出てきます。今回は特定ポートでの受信を設定したいので「ポート」を選択して「次へ」をクリックします。

 プロトコルおよびポートでは「TCP」を選択して「特定のローカルポート」にmosquitto.confで設定したポート番号を入力し「次へ」をクリックします。

 操作では「接続を許可する」を選択して「次へ」をクリックします。

 プロファイルではどのネットワークに接続している時に使用するかにチェックを入れます。基本的にはパブリックのみで良いかと思います。チェックを入れたら「次へ」をクリックします。

 名前では名前と説明を入力します。今回はわかりやすく名前は「mosquitto_port」としました。説明も記述してもよいですが今回は空にしました。記入したら「完了」をクリックします。

 完了すると「受信の規則」に設定した名前の規則が追加されます。

 これでファイアウォールの許可は完了です。


5:サービスからのブローカー起動
 2の手順ではブローカーをmosquittoのコマンドから起動させていましたが、このコマンドではローカルサーバとして起動するだけで許可したファイアウォールが適応されないようでした。

 なので4で設定したファイアウォールが適応されるやり方でブローカーを起動します。

 Windowsの検索欄に「サービス」と入力して出てきた「サービス」をクリックします。

 サービス画面が立ち上がるので、一覧の中にある「Mosquitto Broker」を右クリックして出てきた「開始」をクリックします。

 状態が「実行中」となり、ファイアウォールの設定でのブローカーが起動されます。

 停止させたい時は右クリックして出てきた中から「停止」を選択すれば停止されます。

 Windowsが立ち上がった時に自動起動させたい時や手動起動に切り替えたい時などは、右クリックして出てきた中にある「プロパティ」をクリックします。

 スタートアップの種類のプルダウンから必要なものに切り替えて「OK」をクリックすると、適応されます。

 また、毎回サービスから手動で起動や停止をさせるのは面倒な場合は、管理者権限でPowerShellを立ち上げて以下のコマンドで起動と停止を行うこともできます。

・起動

Start-Service -Name "mosquitto"

・停止

Stop-Service -Name "mosquitto"

 このサービスからブローカーを起動させた状態であれば、以下のようにブローカーが起動しているPCのIPアドレスなどを指定した形でPub/Subの通信ができます。

・Subscriber

mosquitto_sub -h "192.168.0.1" -t test

・Publisher

mosquitto_pub -h "192.168.0.1" -t test -m "test message"

 

 以上がWindowsでのMQTTの環境構築と使ってみた内容になります。

 Windows機自体をMQTTのブローカーとして使うことがあるかはわかりませんが、IoTなど色々なところで使われる軽めの通信方式なので、知っておくと色々使えるかと思います。


・参考資料

【Arduino】Seeed Studio XIAO ESP32C3でマルチスレッド

 今回はArduinoの記述でSeeed Studio XIAO ESP32C3のマルチスレッドをやってみた備忘録になります。
 詳しくは参考資料に挙げているページ様を参照してください。


 では、始めます。


1:マルチスレッドの書き方
 書き方は簡単で以下のxTaskCreatePinnedToCore関数を使用します。

TaskHandle_t p_task;

xTaskCreatePinnedToCore(
  task,       // タスク関数へのポインタ(スレッドで実行させたい関数を設定)
  "taskname", // タスク名(最大16文字まで)
  2048,       // スタックサイズ(Byte)
  NULL,       // タスクのパラメータのポインタ
  1,          // タスクの優先順位(0:低 - 25:高)
  &p_task,    // タスクのHandleへのポインタ
  0           // 利用するCPUコア(0か1を指定)
);

 第1引数には別スレッドで実行させたい関数を指定します。第2引数にはタスク名を文字列で指定し、第3引数はスタックサイズを指定します。
 第4引数には第1引数へ渡すパラメータ、第5引数にはタスクの優先順位、第6引数にはタスクHandleのポインタを指定します。
 第7引数にはタスクを実行するコア番号を指定しますが、XIAO ESP32C3の場合であればシングルコアなので0を指定することになります。

 また注意点として、別スレッドで実行する関数は「delayを1以上入れる」必要があるようです。delayを入れない場合は、他の処理を行う時間がなくなってしまうために動かなくなるようなので必ずdelayを入れるようにしてください。


2:マルチスレッドのサンプル
 書き方がわかったので実際にマルチスレッドを試してみます。
 Arduino IDEを開いて以下のスケッチを作成してみてください。

・XIAO_ESP32C3_multi_thread_test.ino

#include <Arduino.h>

// メインループ用カウンタ
unsigned long _loop_count = 0;
// スレッド用カウンタ
unsigned long _thread_count = 0;

// マルチスレッド用のタスク
TaskHandle_t _p_task;

void setup() {
  Serial.begin(115200);
  delay(1000);
  Serial.println("setup start");

  // マルチスレッドでタスクを実行
  xTaskCreatePinnedToCore(
    printThread,   // タスク関数へのポインタ(スレッドで実行させたい関数を設定)
    "printThread", // タスク名(最大16文字まで)
    2048,          // スタックサイズ(Byte)
    NULL,          // タスクのパラメータのポインタ
    1,             // タスクの優先順位(0:低 - 25:高)
    &_p_task,      // タスクのHandleへのポインタ
    0              // 利用するCPUコア(0か1を指定できるがXIAO ESP32C3は1つのCPUしかないので0となる)
  );

  Serial.println("setup end");
}

void loop() {
  Serial.print("loopCount: ");
  Serial.println(_loop_count);
  _loop_count++;
	delay(1000); // 1秒待つ
}

// マルチスレッドで実行する関数
void printThread(void *param) {
  while(true) {
    Serial.print("threadCount: ");
    Serial.println(_thread_count);
    _thread_count++;
    delay(3000); // 割込時間確保のためにdelayを入れる
  }
}

 このスケッチをXIAO ESP32C3に書き込んでシリアルモニタで確認すると、以下のように1秒毎にloopCountのカウンタが表示され、3秒毎にthreadCountのカウンタが表示されます。

14:06:13.025 -> setup start
14:06:13.025 -> setup end
14:06:13.025 -> loopCount: 0
14:06:13.025 -> threadCount: 0
14:06:13.822 -> loopCount: 1
14:06:14.839 -> loopCount: 2
14:06:15.827 -> threadCount: 1
14:06:15.827 -> loopCount: 3
14:06:16.828 -> loopCount: 4
14:06:17.809 -> loopCount: 5
14:06:18.809 -> threadCount: 2
14:06:18.809 -> loopCount: 6
14:06:19.814 -> loopCount: 7
14:06:20.814 -> loopCount: 8
14:06:21.794 -> threadCount: 3
14:06:21.838 -> loopCount: 9


 以上がArduinoにおけるSeeed Studio XIAO ESP32C3でマルチスレッドを使う方法になります。

 マイコンでもマルチスレッドが使えると何かと便利だと思うので、覚えておいて損はないかと思います。


・参考資料

【python】ロガーについて

 今までにpythonで色々とやってきましたが、ロガーをまともに実装したことがなかったので今回はロガーについての記事になります。

 あくまで自分で必要だと思った部分のみを記述しているだけなので、詳しくは参考資料に挙げているページ様を参照してください。


 では、始めます。


1:pythonのロガー
 pythonには標準ライブラリの「logging」があるので、これを使うのが楽だと思います。

 python公式のloggingの内容ほぼそのままですが、最低限のロガーを書くと以下のようになります。

・logger_test.py

# -*- coding: utf-8 -*-
import logging

logger = logging.getLogger(__name__)

def main():
  logging.basicConfig(filename='myapp.log', level=logging.INFO)
  logger.info('Started')
  logger.debug('debug output') # ログレベルがinfoなのでdebugのメッセージは表示されない
  logger.info('Finished')

if __name__ == '__main__':
  main()

 logging.getLogger関数でロガーのインスタンスを作成しています。引数にはログの名前を設定しますが、面倒な場合は今回のように「__name__」としてモジュール名を渡すのが楽かと思います。
 そしてこのロガーのインスタンスに対してファイル名やログレベルを設定し、ログの内容を出力しています。

 このソースを実行すると同階層に「myapp.log」というファイルが作成され、中を開くと以下のようになっていると思います。

INFO:__main__:Started
INFO:__main__:Finished

 debugのログが出ていないことが確認できますが、これは設定したログレベルによるものです。


2:loggingのログレベル
 ログレベルは「出力するログ」をコントロールするために設定します。

 例えば「開発中にはログとして出しておきたいが、正式リリース時には出さないようにしたい」という時などにはこのログレベルを適切に切り替えることで対応できます。

 python公式のloggingページから抜き出し簡単にした表になりますが、ログレベルは以下のようになっています。

ログレベル 数値 目的 用途
logging.NOTSET 0 - DEBUGよりも更に詳しく出したい時に使う
logging.DEBUG 10 開発時に必要なログを出力 開発時に値がちゃんと入っているかなどを確認したい時に使う
logging.INFO 20 想定通りの動作に必要なログを出力 クラスや関数名を出すことでその処理がちゃんと実行されているかなどの時に使う
logging.WARNING 30 ソフトウェア上で想定される異常などのログを出力 バリデーションエラーなど、想定されうる異常などの時に使う
logging.ERROR 40 ソフトウェアの機能が実行できない場合のログを出力 try exceptなどで処理するような明らかなエラーが出る時などに使う
logging.CRITICAL 50 ソフトウェアが実行できない重大エラーの場合のログを出力 -

 自分の経験ではDEBUG~ERRORをよく使い、NOTSETとCRITICALはほとんど使わない印象です。CRITICALはこのレベルのログを出す場合はそもそもソフトウェアがまともに動かない場合だと思うので、ロガーに出す余裕もないんじゃないかと個人的には思っています。NOTSETは全てのログレベルのものを出力したい時にloggerのインスタンスに設定する時に使う印象です。

 この表を頭に入れた上で1の結果をもう一度見てみると、「level=logging.INFO」でログレベルをINFOに設定しているため、INFOよりも値が低いDEBUGのログが出力されていないことがわかるかと思います。


3:loggingのサンプル
 loggingの方法がわかったので、ある程度使えるような形のロガーサンプルを動かしてみます。
 以下の2つのファイルを同階層に作成してください。

・common_logger.py

# -*- coding: utf-8 -*-
import os
import logging


class CommonLogger():
  """共通ロガークラス"""
  # ログ出力ディレクトリ(本ファイルがある場所からの相対パス)
  LOG_DIR    = "log/"
  # 出力ログファイル名
  LOG_FILE   = "common.log"
  # ログフォーマット
  LOG_FORMAT = "%(asctime)s %(levelname)-8s %(message)s"
  # ログ文字コード
  LOG_ENCODE = "utf-8"
  # ログ出力のレベル
  LOG_LEVEL = logging.DEBUG

  _instance     = None
  _log_dir_path = ""
  _log_path     = ""
  _logger       = None


  def __new__(cls):
    """インスタンス生成前に実行される関数"""
    # 以下の判定はシングルトンにするために行っている
    if cls._instance is None:
        print("create logger")
        # 初めてインスタンスが作成される時の処理
        cls._instance = super().__new__(cls)
        # ログディレクトリを作成
        cls._log_dir_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), cls.LOG_DIR)
        os.makedirs(cls._log_dir_path, exist_ok=True)
        # ログファイルパス作成
        cls._log_dir_path = os.path.join(cls._log_dir_path, cls.LOG_FILE)
        # 各種ログの設定
        cls._setup_logger(cls)

    return cls._instance


  def _setup_logger(cls):
    """ログの設定を行う関数"""
    # ログの設定
    cls._logger = logging.getLogger(__name__)
    cls._logger.setLevel(cls.LOG_LEVEL)
    # 標準出力のログの設定
    st_handler = logging.StreamHandler()
    st_handler.setLevel(cls.LOG_LEVEL)
    st_handler.setFormatter(logging.Formatter(cls.LOG_FORMAT))
    # ログファイル出力の設定
    fl_handler = logging.FileHandler(filename=cls._log_dir_path, encoding=cls.LOG_ENCODE)
    fl_handler.setLevel(cls.LOG_LEVEL)
    fl_handler.setFormatter(logging.Formatter(cls.LOG_FORMAT))
    # ハンドラーを設定
    cls._logger.addHandler(st_handler)
    cls._logger.addHandler(fl_handler)


  def __init__(self):
    """コンストラクタ"""
    pass


  def debug(self, output_class, message):
    """DEBUGレベルのログを出力する関数"""
    output = "[" + output_class.__name__ + "] " + message
    self._logger.debug(output)


  def info(self, output_class, message):
    """INFOレベルのログを出力する関数"""
    output = "[" + output_class.__name__ + "] " + message
    self._logger.info(output)


  def warning(self, output_class, message):
    """WARNINGレベルのログを出力する関数"""
    output = "[" + output_class.__name__ + "] " + message
    self._logger.warning(output)


  def error(self, output_class, message):
    """ERRORレベルのログを出力する関数"""
    output = "[" + output_class.__name__ + "] " + message
    self._logger.error(output)


  def critical(self, output_class, message):
    """CRITICALレベルのログを出力する関数"""
    output = "[" + output_class.__name__ + "] " + message
    self._logger.critical(output)


・common_logger_test.py

# -*- coding: utf-8 -*-
from common_logger import CommonLogger

class LogClass:
  """ログを出力するクラス1"""
  def __init__(self):
    """コンストラクタ"""
    self.logger = CommonLogger()

  def output_log(self):
    self.logger.debug(self.__class__, "debugです")
    self.logger.info(self.__class__, "infoです")
    self.logger.warning(self.__class__, "warningです")
    self.logger.error(self.__class__, "errorです")
    self.logger.critical(self.__class__, "criticalです")


class Log2Class:
  """ログを出力するクラス2"""
  def __init__(self):
    """コンストラクタ"""
    self.logger = CommonLogger()

  def output_log(self):
    self.logger.debug(self.__class__, "debug2です")
    self.logger.info(self.__class__, "info2です")
    self.logger.warning(self.__class__, "warning2です")
    self.logger.error(self.__class__, "error2です")
    self.logger.critical(self.__class__, "critical2です")


if __name__ == '__main__':
  log1 = LogClass()  # ロガーを作成
  log2 = Log2Class() # ロガーを作成(シングルトンなのでLogClassと同じインスタンスを取得)

  # それぞれログ出力
  log1.output_log()
  log2.output_log()


 上記の2つのファイルを同階層に作成し「common_logger_test.py」を実行すると、logディレクトリが作成されその中に「common.log」ファイルが出力されています。そしてそのcommon.logの中身を見ると以下のようになっているはずです。

2024-03-21 15:53:31,180 DEBUG    [LogClass] debugです
2024-03-21 15:53:31,181 INFO     [LogClass] infoです
2024-03-21 15:53:31,181 WARNING  [LogClass] warningです
2024-03-21 15:53:31,181 ERROR    [LogClass] errorです
2024-03-21 15:53:31,181 CRITICAL [LogClass] criticalです
2024-03-21 15:53:31,181 DEBUG    [Log2Class] debug2です
2024-03-21 15:53:31,181 INFO     [Log2Class] info2です
2024-03-21 15:53:31,181 WARNING  [Log2Class] warning2です
2024-03-21 15:53:31,184 ERROR    [Log2Class] error2です
2024-03-21 15:53:31,184 CRITICAL [Log2Class] critical2です

 少し解説するとcommon_logger.pyが今回自分が作成したある程度汎用的なロガーのクラスです。それをcommon_logger_test.pyから呼び出して使っています。
 common_logger.pyのCommonLoggerクラスは一応シングルトンとなるようにしています。シングルトンは「一度インスタンスが作成されていれば、コンストラクタが何度呼ばれても同じそのインスタンスを返す」ものです。ロガーなので、毎回同じインスタンスを返すようにするのが適切だと思いシングルトンとなるようにしています。
 またログ出力のフォーマットも日時、ログレベル、クラス名、メッセージ内容の4つを出力させるようにしています。これぐらいの情報をログとして出していれば、何か問題が起こったときでも問題個所を特定しやすくなるかと思います。


 以上がpythonでのロガー実装方法になります。

 詳しい人からすると全然足りていないような作りのサンプルコードだとは思いますが、必要最低限だとこのような形になると思います。
 またログのHandlerとして「TimedRotatingFileHandler」を使用するとログローテーションなどもpython側で設定できるようなので、それを使うのも良いかと思います。

 ちなみにマルチプロセスやマルチスレッドを使っている場合はシングルトンでもログ出力がぶつかってしまう可能性も考えられるので、その場合はQueueなどを使ってそこに順次突っ込んでいく形が良いような気がします。

 このようにロガーと一言で言っても、用途や使い方、どの程度までログに出すかの取り決めなど、ちゃんとやろうとすると考えることが多くなりそうなので、詳しい方がいれば色々と教えていただきたいと思いました。


・参考資料

【python】livereloadを使って画面を自動更新するWebページを作ってみる

 最近pythonでの簡単なサーバの作り方をよく調べています。
 その中で画面を自動更新できるライブラリである「livereload」というものを知ったので、今回はそれについての備忘録になります。

 詳しい情報は参考資料に挙げているページ制作者様のGithubなどを参照してください。


 では、始めます。


1:livereloadについて
 livereloadは対象ファイルが更新された場合に処理を実行させ、その処理が終わった後にWebページのreloadが行われるようにするためのライブラリです。

 日本語で説明すると少しごちゃごちゃしていますが、要は以下のような処理ができるようになります。

  1. 対象ファイルが更新される
  2. 更新時の処理を実行
  3. Webページ更新

 例えば「1つページしか存在しないようなWebページサーバで定期的に何かデータを受信してその値を画面に表示させる」というようなものを作りたい場合、このlivereloadを使えば比較的簡単に実装できます。


2:livereloadのインストール
 インストールは簡単で以下のpipコマンドでインストールできます。

pip install livereload


3:livereloadの使い方
 livereloadの使い方は簡単で、以下のように書きます。

from livereload import Server # livereloadライブラリ読み込み

def reload():
  # 画面更新前に行いたい処理の関数
  pass

server = Server()
# 引数filepathには監視対象のファイルパス、funcには監視対象ファイルが更新された時に実行する関数を設定
server.watch(filepath="target.txt", func=reload)
# 引数hostにはホスト名、portにはポート番号、rootには表示するHTMLファイルを設定
server.serve(host="ホスト名", port=4000, root="index.html")

 上記の内容を見ればわかるかと思いますが、livereloadのserver.watchのfilepath引数でlivereloadで監視するファイルを指定します。そしてfunc引数には画面更新前に実行させたい関数を指定します。
 その後server.serveでホスト名とポート、表示させたいHTMLファイルを指定して実行することで、livereloadのサーバを起動させることができます。


4:livereloadを使ったサンプル
 では実際にlivereloadのサンプルを作って実行してみます。

 以下の「template.html」と「page_livereload.py」のファイルを作成し、同じフォルダ内(同じ階層)に格納してください。

・template.html

<html lang="ja">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <meta http-equiv="Cache-Control" content="no-cache">
    <title>livereloadのテスト</title>
</head>
<body>
画面更新回数:$count
</body>
</html>

・page_livereload.py

# -*- coding: utf-8 -*-
import os
import platform
import socket
import threading
import time
from string import Template

from livereload import Server # livereloadライブラリ読み込み


# 画面を自動更新するクラス
class PageLiveReload:
  TEMPLATE_PATH     = "template.html"
  UPDATE_FILE_PATH  = "target.txt"
  DISPLAY_HTML_PATH = "index.html"
  PORT_NUM          = 4000

  _ipaddress         = ""
  _template          = None
  _target_path       = ""
  _display_html_path = ""
  _update_count      = 0 # 画面更新カウント数


  def __init__(self):
    # コンストラクタ(何もしない)
    pass


  def get_ipadress(self):
    # IPアドレスを取得する関数
    ipaddress = ""
    if platform.system() == "Windows":
        # Windowsの場合のIPアドレス取得
        host = socket.gethostname()
        ipaddress = socket.gethostbyname(host)
    elif platform.system() == "Linux":
        # Linuxの場合のIPアドレス取得
        connect_interface = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        connect_interface.connect(("8.8.8.8", 80))
        ipaddress = connect_interface.getsockname()[0]
        connect_interface.close()
    return ipaddress


  def setup(self):
    # 本ファイルまでのディレクトリのパスを取得
    base_dir = os.path.dirname(os.path.abspath(__file__))
    # テンプレートとなるhtmlを読み込む
    template_html_path = os.path.join(base_dir, self.TEMPLATE_PATH)
    self._template = Template(open(template_html_path, "r", encoding="utf-8").read())
    # 画面更新判断用ファイルパスの設定
    self._target_path = os.path.join(base_dir, self.UPDATE_FILE_PATH)
    # 画面表示させるHTMLファイルパスの設定
    self._display_html_path = os.path.join(base_dir, self.DISPLAY_HTML_PATH)
    # IPアドレスを取得
    self._ipaddress = self.get_ipadress()
    # 更新判断用ファイルを作成
    with open(self._target_path, mode='w', encoding="utf-8") as f:
      f.write("")
    # HTMLを初期化(作成)
    self.update_html()


  def main(self):
    server = Server()
    # 引数filepathには監視対象のファイルパス、funcには監視対象ファイルが更新された時に実行する関数を設定
    server.watch(filepath=self._target_path, func=self.reload)
    # Webページのアドレスをprintで表示
    print("http://" + self._ipaddress + ":" + str(self.PORT_NUM))
    # 引数hostにはホスト名、portにはポート番号、rootには表示するHTMLファイルを設定
    server.serve(host=self._ipaddress, port=self.PORT_NUM, root=self._display_html_path)


  def reload(self):
    # 監視対象ファイルが更新された時に実行する関数(この関数実行後にページがreloadされる)

    # カウントアップ
    self._update_count += 1
    # HTML更新
    self.update_html()


  def update_html(self):
    # テンプレートから表示させるHTMLの内容を取得
    html_content = self._template.safe_substitute(count=str(self._update_count))
    # HTMLファイルに書き込み
    with open(self._display_html_path, mode='w', encoding="utf-8") as f:
      f.write(html_content)


if __name__ == '__main__':
  # 準備
  page_reload = PageLiveReload()
  page_reload.setup()

  # マルチスレッドでlivereloadのサーバを実行
  web_reload_thread = threading.Thread(target=page_reload.main, daemon=True)
  web_reload_thread.start()

  # 無限ループ
  while True:
    time.sleep(10)

    # 10秒毎に画面を更新させる
    with open(page_reload._target_path, mode='w', encoding="utf-8") as f:
      f.write("")

 内容を少し解説すると、templare.htmlをテンプレートとして読み込み、livereloadで画面更新する際にカウントアップしたものを表示させるということをやっています。
 また画面の更新は「if __name__ == '__main__':」内に書いてあるようにおよそ10秒毎にしています。

 この2つのファイルを同じ階層に配置できたら、その階層まで移動して以下のコマンドで実行します。

python page_livereload.py

 実行するとURLがprint表示されるのでそのページをブラウザで開きます。

 ブラウザを開くと以下のように画面更新回数が表示されます。

 ブラウザを開いた状態で何もせずに待っていると、およそ10秒毎に画面が更新されて数値がカウントアップされていくようになっているはずです。


5:livereloadを使う際の注意点
 livereloadは便利ですが、注意点があります。

 それは以下のissueに書いてあるように「短い時間に監視対象ファイルが更新された場合、画面更新が無視される」というものです。

 ソースコードを見ると3秒以内に監視対象ファイルが更新された場合はlivereloadによる処理は実行されないようです。

 なので、更新タイミングが短くなることが予想される場合には今回のサンプルのように別途10秒毎に画面を更新するなどの対策が必要になります。

 また自分の環境だけかもしれませんが、Raspberry Pi OS(64bit)の環境でこのlivereloadを使おうとするとエラーが出て使えませんでした。
 Raspberry Pi OS(32bit)だと使えたので、32bitと64bitの違いで使えたり使えなかったりする可能性もあるので注意してください。

 以上がlivereloadを使って画面を自動更新する方法になります。

 限定的といえば限定的かもしれませんが、ちょっとしたWebページ用サーバであれば割と簡単にかけるこの方法でやってもよいのではないかと思います。


・参考資料

【python】PCのIPアドレスを取得する

 過去に簡単なAPIサーバを作る記事を書きました。

 この記事ではホスト名をlocalhostにしていましたが、ローカルネットワーク上のサーバのAPIを叩いて確認したい場合などがあります。
 その場合はホスト名にIPアドレスを設定すると思いますが、一々調べてソースコード内の定数を書き換えるのは面倒です。
 実行しているPCのIPアドレスを自動的に取得するようにしていれば一々書き直す必要がなくなって楽になると思ったので、今回は実行しているPCのIPアドレスを取得する方法の備忘録になります。

 詳しくは参考資料に挙げているページ様を参照してください。


 では、始めます。


・実行しているPCのIPアドレスを取得する方法
 結論から書きますが、以下の方法で取得できます。

・get_ipaddress.py

# -*- coding: utf-8 -*-
import platform
import socket


# 実行PCのIPアドレスを取得する関数
def get_my_ipadress():
    ipaddress = ""

    if platform.system() == "Windows":
        # Windowsの場合のIPアドレス取得
        host = socket.gethostname()
        ipaddress = socket.gethostbyname(host)
    elif platform.system() == "Linux":
        # Linuxの場合のIPアドレス取得
        connect_interface = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        connect_interface.connect(("8.8.8.8", 80))
        ipaddress = connect_interface.getsockname()[0]
        connect_interface.close()

    return ipaddress


if __name__ == '__main__':
   ipaddress = get_my_ipadress()
   print(ipaddress)

 ソースの中身を見ればわかると思いますが、WindowsLinuxでそれぞれ取得方法が違います。

 これを実行すると「192.168.XXX.XXX」のような形のIPアドレスを取得することができます。


 以上が実行中のPCのIPアドレスを取得する方法になります。

 ソースコードを書いてみれば単純ですが、ちょっとしたテストサーバをRaspberry Piに立てる時などには使うことになりそうです。


・参考資料