ソースに絡まるエスカルゴ

貧弱プログラマの外部記憶装置です。

【python/javascript/html】簡単なローカルAPIサーバを作る

 APIを使って色々やる場面があるかと思います。

 以下の記事でも緯度経度から標高を取得するAPIを使いました。

 開発をしていく中でAPIを利用する側はある程度できていてもAPI側がまだ未実装という場合が割とあると思います。そうすると実装が終わってからでないと実際の動作確認ができないということになります。ですが、簡単なローカルAPIを作って定数でも返すようにすればAPI側がまだ実装されていなくてもある程度動作確認ができるので便利です。

 調べてみるとpythonで割と簡単にローカルAPIサーバを作れることが判明したので、今回はその備忘録になります。

 では、始めます。


0:前提
 今回はAPIとして以下のものを考えていきます。

GET API UPDATE API
URL http://localhost:3000/ http://localhost:4000/
Content-type json, utf-8 json, utf-8
request(例) なし {
  "data" : {
    "name" : "テスト",
    "value" : 100
  }
}
response(例) {
  "data" : {
    "name" : "テスト",
    "value" : 100
  },
  "call_count": 0
}
{
  "result" : "OK",
  "data" : {
    "name" : "テスト",
    "value" : 100
  }
}

 簡単に説明すると以下のような動きを想定したAPIになります。

  • GET API、UDATE APIどちらもjson形式でやりとりする
  • GET APIのcall_countはGET APIが呼ばれた回数の値とする
  • UDATE APIではrequestのjsonの値でnameとvalueが更新される
  • UDATE APIの戻り値のresultは更新できた場合OK、失敗した場合NGの文字列を設定する

 かなりざっくりとしたものですが、とりあえず単純なGETとUPDATEのAPIを作っていくという感じです。


1:ローカルAPIサーバの作り方
 書き方は簡単で以下の記述でjsonを返すローカルAPIサーバを作ることができます。

・sample_server.py

# -*- coding:utf-8 -*-
#
# 本プログラムを実行している状態で以下のアドレスを開くとjsonの戻り値が表示される。
# http://localhost:2000/
# 
# プログラムの終了はCtrl+C。それでも止まらない場合はCtrl+Pauseで終了させる。
from wsgiref.simple_server import make_server
import json

API_PORT = 2000


# 文字列のバイトサイズを計算する関数
def getStringByteSize(str):
    return len(str.encode('utf-8'))


# APIが呼ばれた時に実行される関数
def app(environ, response):
  ### 引数「environ」はrequestの情報を含んでおり、以下の方法で取得できる ###
  # wsgi_input = environ["wsgi.input"]
  # try:
  #   # requestのjsonを取得
  #   request_content_length = int(environ["CONTENT_LENGTH"])
  #   request_data = json.loads(wsgi_input.read(request_content_length))
  #   print(request_data)
  # except ValueError:
  #   print("--- error ---")
  ######################################################################

  json_data = {'message':'hoge'} # jsonのレスポンス
  content_length = getStringByteSize(json.dumps(json_data)) # Content-Lengthを計算

  status = '200 OK'
  header = [
      ('Access-Control-Allow-Origin', '*'),   # 許可するアクセス
      ('Access-Control-Allow-Methods', '*'),  # 許可するメソッド
      ('Access-Control-Allow-Headers', "X-Requested-With, Origin, X-Csrftoken, Content-Type, Accept"), # 許可するヘッダー
      ('Content-type', 'application/json; charset=utf-8'), # utf-8のjson形式
      ('Content-Length', str(content_length)) # Content-Lengthが合っていないとブラウザエラー
  ]
  response(status, header)

  return [json.dumps(json_data).encode("utf-8")]


# APIサーバを実行する関数
with make_server('', API_PORT, app) as httpd:
  print("Start API. URL is http://localhost:" + str(API_PORT) + "/ ...")
  httpd.serve_forever()

 PowerShellなりコマンドプロンプトなりを起動させて上記のプログラムを実行すると、以下のようにURLが表示されてサーバが起動します。

$ python sample_server.py
Start API. URL is http://localhost:2000/ ...

 この状態でブラウザを開いて「http://localhost:2000/」とURLを打ち込んで表示させると、以下のようにソースコードにおいてresponseとして定義したjsonの値が表示されます。
f:id:rikoubou:20210524173053p:plain

 requestでjsonが来た場合は、コメントアウト部分に書いてある方法で取得することができます。

 このサーバを止める場合は「Ctrl + C」キー、それでも止まらない場合は「Ctrl + Pause」キーで止めます。

 ちなみにヘッダー部分ですが、これらについてあまり詳しく調べられていないので今回は詳しく言及しません。
 簡単に言うとセキュリティ関係やデータ形式をどうするかという記述をしている部分なので、これらを記述していないとブラウザ側でエラーとなる場合があるので記述しておいてください。あくまでテスト用として許可しているだけなので、このヘッダー設定のまま実運用するとセキュリティ面で問題が発生するので注意してください。


2:ローカルAPIサーバサンプル
 書き方がわかったので、実際に「0:前提」に書いた想定のローカルAPIサーバサンプルを以下に示します。またそれらがちゃんと動いているかを確認するためのhtmlとjavascriptも載せておきます(2021/05/29:ソースコードを少し修正しました)。

・local_test_api.py

# -*- coding:utf-8 -*-
from wsgiref.simple_server import make_server
import json
import threading
import time

GET_API_PORT    = 3000
UPDATE_API_PORT = 4000

GET_API_JSON_PATH    = "./get_api.json"
UPDATE_API_JSON_PATH = "./update_api.json"

STATUS = '200 OK'

call_count = 0
get_data_json    = None
update_data_json = None


def main():
    # Get APIレスポンス用json
    global get_data_json
    get_data_json = load_json_file(GET_API_JSON_PATH)

    # Update APIレスポンス用json
    global update_data_json
    update_data_json  = load_json_file(UPDATE_API_JSON_PATH)

    # Get APIスレッド実行
    get_app_thread = threading.Thread(target=start_get_api_server)
    get_app_thread.setDaemon(True)
    get_app_thread.start()

    # Update APIスレッド実行
    update_app_thread = threading.Thread(target=start_update_api_server)
    update_app_thread.setDaemon(True)
    update_app_thread.start()

    # 無限ループ
    while True:
        time.sleep(1)


# jsonファイルを読み込む関数
def load_json_file(json_path):
    with open(json_path, "r", encoding="utf-8_sig") as f:
        json_data = json.load(f)
    return json_data


# Get APIサーバを起動する関数
def start_get_api_server():
    with make_server('', GET_API_PORT, get_app) as httpd:
        print("Start Get API. URL is http://localhost:" + str(GET_API_PORT) + "/ ...")
        httpd.serve_forever()


# Get APIの処理
def get_app(environ, response):
    global call_count
    call_count = call_count + 1

    get_data_json['call_count'] = call_count
    content_length = getStringByteSize(json.dumps(get_data_json)) # Content-Lengthを計算

    header = [
        ('Access-Control-Allow-Origin', '*'),   # 許可するアクセス
        ('Access-Control-Allow-Methods', '*'),  # 許可するメソッド
        ('Access-Control-Allow-Headers', "X-Requested-With, Origin, X-Csrftoken, Content-Type, Accept"), # 許可するヘッダー
        ('Content-type', 'application/json; charset=utf-8'), # utf-8のjson形式
        ('Content-Length', str(content_length)) # Content-Lengthが合っていないとブラウザエラー
    ]

    response(STATUS, header) # ヘッダー設定
    return [json.dumps(get_data_json).encode("utf-8")]


# Update APIサーバを起動する関数
def start_update_api_server():
    with make_server('', UPDATE_API_PORT, update_app) as httpd:
        print("Start Update API. URL is http://localhost:" + str(UPDATE_API_PORT) + "/ ...")
        httpd.serve_forever()


# Update APIの処理
def update_app(environ, response):
    wsgi_input = environ["wsgi.input"]
    try:
        # POSTされたjsonを取得
        request_content_length = int(environ["CONTENT_LENGTH"])
        request_data = json.loads(wsgi_input.read(request_content_length))
        print(request_data)

        if request_data['data']['name'] == "":
            # nameが空文字の場合NGとする
            update_data_json['result'] = "NG"
        else:
            # 更新OKの場合は値を設定
            update_data_json['result'] = "OK"
            update_data_json['data']['name'] = request_data['data']['name']
            update_data_json['data']['value'] = int(request_data['data']['value'])
            # GET側も更新
            get_data_json['data']['name'] = request_data['data']['name']
            get_data_json['data']['value'] = int(request_data['data']['value'])

    except ValueError:
        print("--- error ---")

    content_length = getStringByteSize(json.dumps(update_data_json)) # Content-Lengthを計算

    header = [
        ('Access-Control-Allow-Origin', '*'),   # 許可するアクセス
        ('Access-Control-Allow-Methods', '*'),  # 許可するメソッド
        ('Access-Control-Allow-Headers', "X-Requested-With, Origin, X-Csrftoken, Content-Type, Accept"), # 許可するヘッダー
        ('Content-type', 'application/json; charset=utf-8'), # utf-8のjson形式
        ('Content-Length', str(content_length)) # Content-Lengthが合っていないとブラウザエラー
    ]

    response(STATUS, header) # ヘッダー設定
    return [json.dumps(update_data_json).encode("utf-8")]


# 文字列のバイトサイズを計算する関数
def getStringByteSize(str):
    return len(str.encode('utf-8'))


if __name__ == '__main__':
    main()


・get_api.json

{
   "data" : {
      "name"  : "テスト",
      "value" : 100
   },
   "call_count": 0
}


・update_api.json

{
   "result" : "OK",
   "data" : {
      "name"  : "テスト",
      "value" : 100
   }
}


・index.html

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset = "utf-8">
  <meta http-equiv="Cache-Control" content="no-cache">
  <title>ローカルAPIサーバテスト</title>
  <!-- <script src="./jquery-3.6.0.js"></script> -->
  <script src="https://code.jquery.com/jquery-3.6.0.js"></script>
</head>
<body>
  name:<input id="name" value="">
  <br>
  value:<input id="value" value="">
  <br>
  <button id="update_button" onclick="update()">UPDATE</button>
  <button id="get_button" onclick="get()">GET</button>
  <br>
  <div id="text">ここに結果が表示されます</div>
<script src="./script.js"></script>
</body>
</html>


・script.js

var update_connect_flg = false;
var get_connect_flg    = false;

// Get APIを実行する関数
function getApi() {
   return $.ajax({
      url           : 'http://localhost:3000/',
      type          : 'GET',
      dataType      : 'JSON',
      scriptCharset : 'utf-8',
      timespan      : 100
   });
}

// Update APIを実行する関数
function updateApi(send_json) {
   // JSONにエンコード
   var json_data = JSON.stringify(send_json);

   return $.ajax({
      url           : 'http://localhost:4000/',
      type          : 'POST',
      data          : json_data,
      dataType      : 'JSON',
      contentType   : 'application/json; charset=utf-8',
      contentLength : json_data.length,
      timespan      : 100
   });
}

// Getを実行する関数
function get() {
   if (get_connect_flg) { return; }

   get_connect_flg = true;

   getApi().done(function(res) {
      // getApiの結果を処理
      console.log(res);
      let res_data = "[GET response]<br>";
      res_data    += "name: "   + res.data.name + "<br>";
      res_data    += "value: "  + String(res.data.value) + "<br>";
      res_data    += "call_count: " + String(res.call_count);
      $('#text').html(res_data); // 内容を表示
   }).fail(function() {
      // 失敗した時の処理
      $('#text').html("GETに失敗しました");
   }).always(function() {
      // 常に実行される処理
      get_connect_flg = false;
   });
}

// Updateを実行する関数
function update() {
   if (update_connect_flg) { return; }

   // 各値を取得
   const name  = $('#name').val();
   const value = $('#value').val();

   // json作成
   const send_json = {
      'data' : {
         'name'  : name,
         'value' : parseInt(value)
      }
   };
   console.log(send_json);

   update_connect_flg = true;
   updateApi(send_json).done(function(res) {
      // updateApiの結果を処理
      console.log(res);
      if ("OK" == res.result) {
         // OKの処理
         let res_data = "[UPDATE response]<br>";
         res_data    += "result: " + res.result + "<br>";
         res_data    += "name: " + res.data.name + "<br>";
         res_data    += "value: " + String(res.data.value) + "<br>";
         $('#text').html(res_data); // 内容を表示
      } else {
         // NGの処理
         let res_data = "[UPDATE response]<br>";
         res_data    += "result: " + res.result + "<br>";
         res_data    += "name: " + res.data.name + "<br>";
         res_data    += "value: " + String(res.data.value) + "<br>";
         $('#text').html("更新に失敗しました<br>" + res_data); // 内容を表示
      }
   }).fail(function(res) {
      // 失敗した時の処理
      $('#text').html("UPDATEに失敗しました");
   }).always(function() {
      // 常に実行される処理
      update_connect_flg = false;
   });
}

 使い方として、まず「local_test_api.py」を実行してGetとUpdateのAPIサーバを起動させます。

$ python local_test_api.py
Start Update API. URL is http://localhost:4000/ ...
Start Get API. URL is http://localhost:3000/ ...

 このサーバを起動させた状態で「index.html」を立ち上げて「GET」ボタンをクリックするとGetのAPIが、「UPDATE」ボタンをクリックするとinputタグに入力された値でUpdate APIが呼ばれ、結果がdivタグ部分に表示されます。

 実際に動かすと以下のようになります。
f:id:rikoubou:20210524181335g:plain

 少し説明すると「local_test_api.py」でGetとUpdateのAPIサーバをスレッドとして個別に実行しています。
 APIサーバが動いている状態で「index.html」を立ち上げて「GET」ボタンをクリックするとjavascriptのget( )関数が実行され、GetのAPIの結果を取得してdivタグ部分に表示させています。「UPDATE」ボタンをクリックするとjavascriptのupdate( )関数が実行され、inputタグに入力された値で作ったjsonを使ってUpdate APIが呼ばれ、その結果をdivタグ部分に表示させています。
 またNGを再現するために、nameの値が空の場合にNGを返すようにAPIサーバ側で処理をしています。


 以上がpythonjavascript、htmlで簡単なローカルAPIサーバを作る方法です。

 本当はpythonだけで完結しても良かったのですが、画面に表示させたりした方がいいかと思いそちらも載せてみました。
 その結果ソースコード部分がかなり長くなってしまいましたが、このサンプルを少し改造すれば割と応用が利くかと思います。


・2021/05/25:追記
 今回のサンプルソースを一応公開しておきます。


・参考資料