Skip to content

KeiganALI の API と基本的な使い方

APIとは

API(Application Programming Interface) とは 異なるアプリ・ソフトウェアを繋げる仕組みのことを挿します。目には見えないコンセントのようなイメージです。

image

APIの活用事例

Webサイトへのログイン時に、SNSアカウントを利用する機能などに、APIが使用されています。

image

APIを利用するメリット

  • ユーザーの利便性が上がります(異なるアプリ・ソフトウェアの連携を取ることができます)
  • 連携先の最新情報を取得できます
  • 開発を効率化できます

APIを利用するデメリット

  • 仕様が変更された場合作り変える必要があります
  • サービスが終了した場合、利用が不可となります
  • 一部有料のものがあり、ランニングコストがかかります
※ KeiganAMR のAPI は、本サイトで公開しているものについては全て無償となります。

KeiganALIで使用可能なAPI

KeiganALIでは、以下の2種類のAPIを実装しています。

REST API (Representational State Transfer)MQTT API (Message Queueing Telemetry Transport) 
HTTPプロトコルMQTTプロトコル
汎用性が高く扱えるデータの種類も多いのが特長です。HTTPプロトコルよりも送受信するデータ通信量が軽いのが特長です。(CPU負荷、電力消費量が小さい)
画像ファイル等の比較的大きなデータの転送が可能です。双方向、1対多数の通信が可能であり、IoT(Internet of Things)システムでは広く採用されています。
KeiganAMRでは、制御命令を与えたり、マップやタスクセットなど、種々のデータの読み書きを行います。KeiganAMRでは、主にAMRの状態(位置、ステータス、センサー情報など)を取得する場合に用います。

REST APIを利用したALIとの通信 概念図

image

REST API

requests

HTTPにはデータの参照や更新などのメソッドが用意されています。HTTPメソッドを操作しやすくしたものがHTTPライブラリであり、『requests』 は、Python で使われるHTTP ライブラリの一種です。『requests』を使うとHTTPメソッド(WebAPIを使ったデータの取得等の)に対して、人が読みやすく、何を意図しているのかが分かりやすいコードの記述が可能です。

参考HP: https://pypi.org/project/requests/

requestsのインストール

requestsはサードパーティーのライブラリなので、プログラム内で、使用するには、事前に、インストールが必要です。 コマンドプロンプトから、以下のコマンド等にて、『requests』 モジュールのインストールを実行してください。

python
>pip install requests

requestsの使い方(ALIで使用する主な関数)

  • requests.get関数:GETメソッドに対応
  • requests.post関数:POSTメソッドに対応
  • requests.put関数:PUTメソッドに対応
  • requests.delete関数:DELETEメソッドに対応

上記の関数にURL、必要なパラメーターを設定して、コールすると、リクエストがそのURLに送信され、その通信先から、レスポンスがrequests.Responseクラスのインスタンスとして返信されます。

requests.get関数

例:タスクセットリストの取得

PathMethodFunction
/v2/taskset/tasksetsGETtaskSet のリストを取得する

Pythonコード例:

python
import requests                   #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.xxx.x.xx:9085/v2/taskset/tasksets'      #IPアドレスを入力
params = {'mapId':XXX}                                    #mapIdを入力
res = requests.get(url, json=params)
data = json.loads(res.text)
print(data)

requestsライブラリを使用してHTTPリクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import requests
import json

requestsライブラリとJSONライブラリをインポートしています。

URLとパラメータの設定:

python
url = 'http://192.xxx.x.xx:9085/v2/taskset/tasksets'
params = {'mapId': XXX}

URLとパラメータを設定します。URLにはHTTPリクエストを送信する先のアドレスを指定し、paramsにはリクエストに含めるパラメータを設定します。ここでは、mapId というキーにXXXを指定しています。

HTTPリクエストの送信:

python
res = requests.get(url, json=params)

requests.get()関数を使用して、GETメソッドでURLにリクエストを送信します。パラメータはJSON形式で指定しています。

結果の表示:

python
print(data)

変換した結果をコンソールに表示します。

このコードは、指定したURLGETメソッドでリクエストを送信し、パラメータを含めることができます。レスポンスはresオブジェクトに格納されます。

requests.post関数

例:タスクセットの再生

PathMethodFunction
/v2/taskset/execPOST指定idの taskSet を再生する

Pythonコード例:

python
import requests    #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/exec'    #IPアドレスを入力
params = {"cmdSetId":XXX}    #tasksetIdを入力
res = requests.post(url, json=params)
data = json.loads(res.text)
print(data)

requestsライブラリを使用してHTTPリクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import requests
import json

requestsライブラリとJSONライブラリをインポートしています。

URLとパラメータの設定:

python
url = 'http://192.XXX.X.XX:9085/v2/taskset/exec'
params = {"cmdSetId":XXX, "count":XXX, "startIndex":XXX}

URLとパラメータを設定します。URLにはHTTPリクエストを送信する先のアドレスを指定し、paramsにはリクエストに含めるパラメータを設定します。ここでは、cmdSetIdcountstartIndexというキーにそれぞれXXXを指定しています。

HTTPリクエストの送信:

python
res = requests.post(url, json=params)

requests.post()関数を使用して、POSTメソッドでURLにリクエストを送信します。パラメータはJSON形式で指定しています。

レスポンスの処理:

python
data = json.loads(res.text)

res.textを使用してレスポンスの内容を取得し、JSON.loads()関数を使用してJSON文字列をPythonの辞書オブジェクトに変換します。

結果の表示:

python
print(data)

変換した結果をコンソールに表示します。

このコードは、指定したURLPOSTメソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON形式で取得し、それをPythonの辞書オブジェクトに変換して表示します。

requests.put関数

例:PUTメソッドに対応

PathMethodFunction
/v2/taskset/namePUT指定idのタスクセットの名前変更

Pythonコード例:

python
import requests          #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/name'       #IPアドレスを入力
params = { "cmdSetId":XXX, "name":XXX }                #cmdSetId, nameを入力
res = requests.put(url, json=params)
data = json.loads(res.text)
print(data)

requestsライブラリを使用してHTTPリクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import requests
import json

requestsライブラリとJSONライブラリをインポートしています。

URLとパラメータの設定:

python
url = 'http://192.XXX.X.XX:9085/v2/taskset/name'
params = {"cmdSetId":XXX, "name":XXX}

URLとパラメータを設定します。URLにはHTTPリクエストを送信する先のアドレスを指定し、paramsにはリクエストに含めるパラメータを設定します。ここでは、cmdSetIdnameというキーにそれぞれXXXを指定しています。

HTTPリクエストの送信:

python
res = requests.put(url, json=params)

requests.put()関数を使用して、PUTメソッドでURLにリクエストを送信します。パラメータはJSON形式で指定しています。

レスポンスの処理:

python
data = json.loads(res.text)

res.textを使用してレスポンスの内容を取得し、json.loads()関数を使用してJSON文字列をPythonの辞書オブジェクトに変換します。

結果の表示:

python
print(data)

変換した結果をコンソールに表示します。

このコードは、指定したURLPUTメソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON形式で取得し、それをPythonの辞書オブジェクトに変換して表示します。

requests.delete関数

PathMethodFunction
/v2/taskset/deleteDELETEtaskSet を削除する

Pythonコード例:

python
import requests         #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/delete'        #IPアドレスを入力
params = {'cmdSetId': XXX }                               #cmdSetIdを入力
res = requests.delete(url, json=params)
data = json.loads(res.text)
print(data)

requestsライブラリを使用してHTTPリクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import requests
import json

requestsライブラリとJSONライブラリをインポートしています。

URLとパラメータの設定:

python
url = 'http://192.XXX.X.XX:9085/v2/taskset/delete'
params = {'cmdSetId': XXX}

URLとパラメータを設定します。URLにはHTTPリクエストを送信する先のアドレスを指定し、paramsにはリクエストに含めるパラメータを設定します。ここでは、cmdSetId というキーにXXXを指定しています。

HTTPリクエストの送信:

python
res = requests.delete(url, json=params)

requests.delete()関数を使用して、DELETEメソッドでURLにリクエストを送信します。パラメータはJSON形式で指定しています。

レスポンスの処理:

python
data = json.loads(res.text)

res.textを使用してレスポンスの内容を取得し、json.loads()関数を使用してJSON文字列をPythonの辞書オブジェクトに変換します。

結果の表示:

python
print(data)

変換した結果をコンソールに表示します。

このコードは、指定したURLDELETEメソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON形式で取得し、それをPythonの辞書オブジェクトに変換して表示します。

MQTT API

  • paho.mqtt.client

MQTTは、軽量メッセージングプロトコルで、最小限のコードと帯域幅で信頼性の高いリアルタイム通信が可能です。paho.mqtt.clientは、さまざまなセキュリティ対応、使いやすい、幅広く使用されている等の利点があります。

参考HP:https://pypi.org/project/paho-mqtt/#client

  • paho.mqtt.clientのインストール

Paho.mqtt.clientはサードパーティーのライブラリなので、プログラム内で、使用するには、事前に、インストールが必要です。 コマンドプロンプトから、以下のコマンド等にて、Paho.mqtt.client モジュールのインストールを実行してください。

python
>pip install paho.mqtt

Publish

TopicFunctionDirection送信タイミング
statusマシンのステータスを取得するPublish100ms毎

Pythonコード例:

python
import paho.mqtt.client as paho     #「paho.mqtt.client」ライブラリのインストールが必要です。

def on_connect(client, userdata, flags, rc):
    print("rc: "+str(rc))

def on_message(self, obj, msg):
    print(msg.topic + " " + str(msg.payload))

def on_publish(self, obj, mid):
    print("mid: "+str(mid))

def on_subscribe(self, obj, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))

def on_log(self, obj, level, string):
    print(string)

if __name__ == '__main__':
    mqttc = paho.Client()#mqtt
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_subscribe = on_subscribe
    mqttc.connect("192.XXX.X.XX", 9075, 60)  #IPアドレス, ポート, キープアライブ
    mqttc.subscribe("status", 0)                 #TOPIC
    mqttc.loop_forever()

paho.mqtt.clientライブラリを使用してMQTT通信を行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import paho.mqtt.client as paho

paho.mqtt.clientライブラリをpahoという別名でインポートしています。このライブラリを使用することで、MQTT通信を行うための機能を利用できます。

コールバック関数の定義:

python
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))

on_connect関数は、サーバーへの接続が確立されたときに実行されるコールバック関数です。この場合、接続の結果コード(rc)を表示しています。

コールバック関数の定義:

python
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))

on_message関数は、メッセージが受信されたときに実行されるコールバック関数です。受信したトピック名とペイロードを表示しています。

コールバック関数の定義:

python
def on_publish(self, obj, mid):
print("mid: "+str(mid))

on_publish関数は、メッセージのパブリッシュが完了したときに実行されるコールバック関数です。パブリッシュのメッセージIDを表示しています。

コールバック関数の定義:

python
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))

on_subscribe関数は、購読が完了したときに実行されるコールバック関数です。購読のメッセージIDとQoSのレベルを表示しています。

コールバック関数の定義:

python
def on_log(self, obj, level, string):
print(string)

on_log関数は、デバッグログを表示するためのコールバック関数です。

メインの処理:

python
if __name__ == '__main__':
    mqttc = paho.Client()
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_subscribe = on_subscribe
    mqttc.connect("192.XXX.X.XX", 9075, 60)    #IPアドレス, ポート, キープアライブ
    mqttc.subscribe("status", 0)                         #TOPIC
    mqttc.loop_forever()

メインの処理では、paho.Client()を使用してMQTTクライアントを作成し、各コールバック関数を設定しています。そして、指定したIPアドレスとポートに接続し、statusというトピックを購読します。最後に、mqttc.loop_forever()を呼び出すことで、メッセージの受信やコールバック関数の処理を継続的に行います。

このコードは、指定したIPアドレスとポートで動作するMQTTブローカーに接続し、statusトピックからメッセージを受信する基本的な処理を行います。

Subscribe

TopicFunctionDirection
control/joyマニュアル操作を行うSubscribe

Pythonコード例:

python
import paho.mqtt.client as paho             #「paho.mqtt.client」ライブラリのインストールが必要です。
import json
def on_connect(client, userdata, flags, rc):
    print("rc: "+str(rc))
def on_message(self, obj, msg):
    print(msg.topic + " " + str(msg.payload))
def on_publish(self, obj, mid):
    print("mid: "+str(mid))
def on_subscribe(self, obj, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(self, obj, level, string):
    print(string)
topic = "control/joy"
message = {
    "data":x  #:動作モード
    }
        # 0:No operation
                #1:forward
                # 2:Diagonal right forward(Curve diameter is constant)
                # 4:Right back retreat(Curve diameter is constant)
                # 5:back
                # 6:Left back retreat(Curve diameter is constant)
                # 7:Turn left
                # 8:Diagonal left forward(Curve diameter is constant
if __name__ == '__main__':
    mqttc = paho.Client()    #mqtt
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish
    message_json = json.dumps(message)
    mqttc.connect("xxx.xxx.xxx.xxx", 9075, 60)     #IPアドレス, ポート, キープアライブ
    mqttc.publish(topic, message_json)           #TOPIC
    mqttc.loop_forever()

paho.mqtt.clientライブラリを使用してMQTT通信を行うためのサンプルコードです。以下で各部分の機能と動作を解説します。

ライブラリのインポート:

python
import paho.mqtt.client as paho
import json

paho.mqtt.clientライブラリとJSONライブラリをインポートしています。

コールバック関数の定義:

python
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))

on_connect関数は、サーバーへの接続が確立されたときに実行されるコールバック関数です。この場合、接続の結果コード(rc)を表示しています。

コールバック関数の定義:

python
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))

on_message関数は、メッセージが受信されたときに実行されるコールバック関数です。受信したトピック名とペイロードを表示しています。

コールバック関数の定義:

python
def on_publish(self, obj, mid):
print("mid: "+str(mid))

on_publish関数は、メッセージのパブリッシュが完了したときに実行されるコールバック関数です。パブリッシュのメッセージIDを表示しています。

コールバック関数の定義:

python
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))

on_subscribe関数は、購読が完了したときに実行されるコールバック関数です。購読のメッセージIDQoSのレベルを表示しています。

コールバック関数の定義:

python
def on_log(self, obj, level, string):
print(string)

on_log関数は、デバッグログを表示するためのコールバック関数です。

トピックとメッセージの設定:

python
topic = "control/joy"
message = {
"data": x
}

トピック名とメッセージを設定します。メッセージ内のdataキーには、xに対応する動作モードが入ることを想定しています。

メインの処理:

python
if __name__ == '__main__':
    mqttc = paho.Client()
    mqttc.on_message = on_message
    mqttc.on_connect = on_connect
    mqttc.on_publish = on_publish
    message_json = json.dumps(message)
    mqttc.connect("xxx.xxx.xxx.xxx", 9075, 60)     #IPアドレス, ポート, キープアライブ
    mqttc.publish(topic, message_json)   #TOPIC
    mqttc.loop_forever()

メインの処理では、paho.Client()を使用してMQTTクライアントを作成し、各コールバック関数を設定しています。メッセージをJSON形式に変換し、指定したIPアドレスとポートに接続し、指定したトピックにメッセージをパブリッシュします。最後に、mqttc.loop_forever()を呼び出すことで、メッセージの送信やコールバック関数の処理を継続的に行います。

このコードは、指定したIPアドレスとポートで動作するMQTTブローカーに接続し、指定したトピックにメッセージをパブリッシュする基本的な処理を行います。

参考コード例 Python

  • REST API:タスクセットの実行

  • MQTT API:ステータス情報の取得

  • Ctl + C で停止

python
import requests # Need to install "requests" library
import json
import paho.mqtt.client as mqtt
import time

url = 'http://XXX.XXX.XXX.XXX:9085/v2/taskset/exec' #Enter IP address
params = {"cmdSetId": XXX } #enter cmdSetId
res = requests.post(url, json=params)
data = json.loads(res.text)
print(data)
PROTOCOL_VERSION = mqtt.MQTTv311

# MQTT broker address
BROKER_ADDRESS = "XXX.XXX.XXX.XXX" # Please change this to ALI's IP address.

# MQTT broker port
BROKER_PORT = 9075

# MQTT topics and run
TOPIC = "status"
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT broker")
        time.sleep(5)
        client.subscribe(TOPIC)
    else:
        print("Connection failed")
def on_message(client, userdata, msg):
    if msg.topic=="status":
        payload=json.loads(msg.payload.decode('utf-8'))
        #print(str(payload) + "\n") #Normal String view
        message = json.dumps(payload, indent=4)
        print(message + "\n") #For JSON format view
def main():
    client = mqtt.Client(protocol=PROTOCOL_VERSION)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER_ADDRESS, BROKER_PORT)

    try:
        client.loop_forever()
    except KeyboardInterrupt: #Terminate by using ctl^c on keyboard
        print("Terminating")
        client.disconnect()

if __name__ == "__main__":
    main()

関連記事

© 株式会社Keigan