KeiganALI の API と基本的な使い方
APIとは
API(Application Programming Interface)
とは 異なるアプリ・ソフトウェアを繋げる仕組みのことを挿します。目には見えないコンセントのようなイメージです。
APIの活用事例
Web
サイトへのログイン時に、SNS
アカウントを利用する機能などに、API
が使用されています。
APIを利用するメリット
- ユーザーの利便性が上がります(異なるアプリ・ソフトウェアの連携を取ることができます)
- 連携先の最新情報を取得できます
- 開発を効率化できます
APIを利用するデメリット
- 仕様が変更された場合作り変える必要があります
- サービスが終了した場合、利用が不可となります
- 一部有料のものがあり、ランニングコストがかかります
KeiganALIで使用可能なAPI
KeiganALIでは、以下の2種類のAPI
を実装しています。
REST API (Representational State Transfer) | MQTT API (Message Queueing Telemetry Transport) |
---|---|
HTTPプロトコル | MQTTプロトコル |
汎用性が高く扱えるデータの種類も多いのが特長です。 | HTTPプロトコルよりも送受信するデータ通信量が軽いのが特長です。(CPU負荷、電力消費量が小さい) |
画像ファイル等の比較的大きなデータの転送が可能です。 | 双方向、1対多数の通信が可能であり、IoT(Internet of Things)システムでは広く採用されています。 |
KeiganAMRでは、制御命令を与えたり、マップやタスクセットなど、種々のデータの読み書きを行います。 | KeiganAMRでは、主にAMRの状態(位置、ステータス、センサー情報など)を取得する場合に用います。 |
REST APIを利用したALIとの通信 概念図
REST API
requests
HTTP
にはデータの参照や更新などのメソッドが用意されています。HTTP
メソッドを操作しやすくしたものがHTTP
ライブラリであり、『requests』
は、Python
で使われるHTTP
ライブラリの一種です。『requests』
を使うとHTTP
メソッド(WebAPI
を使ったデータの取得等の)に対して、人が読みやすく、何を意図しているのかが分かりやすいコードの記述が可能です。
参考HP: https://pypi.org/project/requests/
requestsのインストール
requests
はサードパーティーのライブラリなので、プログラム内で、使用するには、事前に、インストールが必要です。 コマンドプロンプトから、以下のコマンド等にて、『requests』
モジュールのインストールを実行してください。
>pip install requests
requestsの使い方(ALIで使用する主な関数)
- requests.get関数:
GET
メソッドに対応 - requests.post関数:
POST
メソッドに対応 - requests.put関数:
PUT
メソッドに対応 - requests.delete関数:
DELETE
メソッドに対応
上記の関数にURL
、必要なパラメーターを設定して、コールすると、リクエストがそのURL
に送信され、その通信先から、レスポンスがrequests.Response
クラスのインスタンスとして返信されます。
requests.get関数
例:タスクセットリストの取得
Path | Method | Function |
---|---|---|
/v2/taskset/tasksets | GET | taskSet のリストを取得する |
Pythonコード例:
import requests #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.xxx.x.xx:9085/v2/taskset/tasksets' #IPアドレスを入力
params = {'mapId':XXX} #mapIdを入力
res = requests.get(url, json=params)
data = json.loads(res.text)
print(data)
requests
ライブラリを使用してHTTP
リクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import requests
import json
requests
ライブラリとJSON
ライブラリをインポートしています。
URLとパラメータの設定:
url = 'http://192.xxx.x.xx:9085/v2/taskset/tasksets'
params = {'mapId': XXX}
URL
とパラメータを設定します。URL
にはHTTP
リクエストを送信する先のアドレスを指定し、params
にはリクエストに含めるパラメータを設定します。ここでは、mapId
というキーにXXX
を指定しています。
HTTPリクエストの送信:
res = requests.get(url, json=params)
requests.get()
関数を使用して、GET
メソッドでURL
にリクエストを送信します。パラメータはJSON
形式で指定しています。
結果の表示:
print(data)
変換した結果をコンソールに表示します。
このコードは、指定したURL
にGET
メソッドでリクエストを送信し、パラメータを含めることができます。レスポンスはresオブジェクトに格納されます。
requests.post関数
例:タスクセットの再生
Path | Method | Function |
---|---|---|
/v2/taskset/exec | POST | 指定idの taskSet を再生する |
Pythonコード例:
import requests #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/exec' #IPアドレスを入力
params = {"cmdSetId":XXX} #tasksetIdを入力
res = requests.post(url, json=params)
data = json.loads(res.text)
print(data)
requests
ライブラリを使用してHTTP
リクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import requests
import json
requests
ライブラリとJSON
ライブラリをインポートしています。
URLとパラメータの設定:
url = 'http://192.XXX.X.XX:9085/v2/taskset/exec'
params = {"cmdSetId":XXX, "count":XXX, "startIndex":XXX}
URL
とパラメータを設定します。URL
にはHTTP
リクエストを送信する先のアドレスを指定し、params
にはリクエストに含めるパラメータを設定します。ここでは、cmdSetId
、count
、startIndex
というキーにそれぞれXXX
を指定しています。
HTTPリクエストの送信:
res = requests.post(url, json=params)
requests.post()
関数を使用して、POST
メソッドでURL
にリクエストを送信します。パラメータはJSON
形式で指定しています。
レスポンスの処理:
data = json.loads(res.text)
res.text
を使用してレスポンスの内容を取得し、JSON.loads()
関数を使用してJSON
文字列をPython
の辞書オブジェクトに変換します。
結果の表示:
print(data)
変換した結果をコンソールに表示します。
このコードは、指定したURL
にPOST
メソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON
形式で取得し、それをPython
の辞書オブジェクトに変換して表示します。
requests.put関数
例:PUTメソッドに対応
Path | Method | Function |
---|---|---|
/v2/taskset/name | PUT | 指定idのタスクセットの名前変更 |
Pythonコード例:
import requests #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/name' #IPアドレスを入力
params = { "cmdSetId":XXX, "name":XXX } #cmdSetId, nameを入力
res = requests.put(url, json=params)
data = json.loads(res.text)
print(data)
requests
ライブラリを使用してHTTP
リクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import requests
import json
requests
ライブラリとJSON
ライブラリをインポートしています。
URLとパラメータの設定:
url = 'http://192.XXX.X.XX:9085/v2/taskset/name'
params = {"cmdSetId":XXX, "name":XXX}
URL
とパラメータを設定します。URL
にはHTTP
リクエストを送信する先のアドレスを指定し、params
にはリクエストに含めるパラメータを設定します。ここでは、cmdSetId
とname
というキーにそれぞれXXX
を指定しています。
HTTPリクエストの送信:
res = requests.put(url, json=params)
requests.put()
関数を使用して、PUT
メソッドでURL
にリクエストを送信します。パラメータはJSON
形式で指定しています。
レスポンスの処理:
data = json.loads(res.text)
res.text
を使用してレスポンスの内容を取得し、json.loads()
関数を使用してJSON
文字列をPython
の辞書オブジェクトに変換します。
結果の表示:
print(data)
変換した結果をコンソールに表示します。
このコードは、指定したURL
にPUT
メソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON
形式で取得し、それをPython
の辞書オブジェクトに変換して表示します。
requests.delete関数
Path | Method | Function |
---|---|---|
/v2/taskset/delete | DELETE | taskSet を削除する |
Pythonコード例:
import requests #「requests」ライブラリのインストールが必要です
import json
url = 'http://192.XXX.X.XX:9085/v2/taskset/delete' #IPアドレスを入力
params = {'cmdSetId': XXX } #cmdSetIdを入力
res = requests.delete(url, json=params)
data = json.loads(res.text)
print(data)
requests
ライブラリを使用してHTTP
リクエストを行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import requests
import json
requests
ライブラリとJSON
ライブラリをインポートしています。
URLとパラメータの設定:
url = 'http://192.XXX.X.XX:9085/v2/taskset/delete'
params = {'cmdSetId': XXX}
URL
とパラメータを設定します。URL
にはHTTP
リクエストを送信する先のアドレスを指定し、params
にはリクエストに含めるパラメータを設定します。ここでは、cmdSetId
というキーにXXX
を指定しています。
HTTPリクエストの送信:
res = requests.delete(url, json=params)
requests.delete()
関数を使用して、DELETE
メソッドでURL
にリクエストを送信します。パラメータはJSON
形式で指定しています。
レスポンスの処理:
data = json.loads(res.text)
res.text
を使用してレスポンスの内容を取得し、json.loads()
関数を使用してJSON
文字列をPython
の辞書オブジェクトに変換します。
結果の表示:
print(data)
変換した結果をコンソールに表示します。
このコードは、指定したURL
にDELETE
メソッドでリクエストを送信し、パラメータを含めることができます。レスポンスの結果はJSON
形式で取得し、それをPython
の辞書オブジェクトに変換して表示します。
MQTT API
- paho.mqtt.client
MQTT
は、軽量メッセージングプロトコルで、最小限のコードと帯域幅で信頼性の高いリアルタイム通信が可能です。paho.mqtt.client
は、さまざまなセキュリティ対応、使いやすい、幅広く使用されている等の利点があります。
参考HP:https://pypi.org/project/paho-mqtt/#client
- paho.mqtt.clientのインストール
Paho.mqtt.client
はサードパーティーのライブラリなので、プログラム内で、使用するには、事前に、インストールが必要です。 コマンドプロンプトから、以下のコマンド等にて、Paho.mqtt.client
モジュールのインストールを実行してください。
>pip install paho.mqtt
Publish
Topic | Function | Direction | 送信タイミング |
---|---|---|---|
status | マシンのステータスを取得する | Publish | 100ms毎 |
Pythonコード例:
import paho.mqtt.client as paho #「paho.mqtt.client」ライブラリのインストールが必要です。
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))
def on_publish(self, obj, mid):
print("mid: "+str(mid))
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(self, obj, level, string):
print(string)
if __name__ == '__main__':
mqttc = paho.Client()#mqtt
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe
mqttc.connect("192.XXX.X.XX", 9075, 60) #IPアドレス, ポート, キープアライブ
mqttc.subscribe("status", 0) #TOPIC
mqttc.loop_forever()
paho.mqtt.client
ライブラリを使用してMQTT
通信を行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import paho.mqtt.client as paho
paho.mqtt.client
ライブラリをpaho
という別名でインポートしています。このライブラリを使用することで、MQTT
通信を行うための機能を利用できます。
コールバック関数の定義:
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))
on_connect
関数は、サーバーへの接続が確立されたときに実行されるコールバック関数です。この場合、接続の結果コード(rc)
を表示しています。
コールバック関数の定義:
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))
on_message
関数は、メッセージが受信されたときに実行されるコールバック関数です。受信したトピック名とペイロードを表示しています。
コールバック関数の定義:
def on_publish(self, obj, mid):
print("mid: "+str(mid))
on_publish
関数は、メッセージのパブリッシュが完了したときに実行されるコールバック関数です。パブリッシュのメッセージIDを表示しています。
コールバック関数の定義:
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
on_subscribe
関数は、購読が完了したときに実行されるコールバック関数です。購読のメッセージIDとQoSのレベルを表示しています。
コールバック関数の定義:
def on_log(self, obj, level, string):
print(string)
on_log
関数は、デバッグログを表示するためのコールバック関数です。
メインの処理:
if __name__ == '__main__':
mqttc = paho.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe
mqttc.connect("192.XXX.X.XX", 9075, 60) #IPアドレス, ポート, キープアライブ
mqttc.subscribe("status", 0) #TOPIC
mqttc.loop_forever()
メインの処理では、paho.Client()
を使用してMQTT
クライアントを作成し、各コールバック関数を設定しています。そして、指定したIPアドレス
とポートに接続し、status
というトピックを購読します。最後に、mqttc.loop_forever()
を呼び出すことで、メッセージの受信やコールバック関数の処理を継続的に行います。
このコードは、指定したIPアドレス
とポートで動作するMQTT
ブローカーに接続し、status
トピックからメッセージを受信する基本的な処理を行います。
Subscribe
Topic | Function | Direction |
---|---|---|
control/joy | マニュアル操作を行う | Subscribe |
Pythonコード例:
import paho.mqtt.client as paho #「paho.mqtt.client」ライブラリのインストールが必要です。
import json
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))
def on_publish(self, obj, mid):
print("mid: "+str(mid))
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_log(self, obj, level, string):
print(string)
topic = "control/joy"
message = {
"data":x #:動作モード
}
# 0:No operation
#1:forward
# 2:Diagonal right forward(Curve diameter is constant)
# 4:Right back retreat(Curve diameter is constant)
# 5:back
# 6:Left back retreat(Curve diameter is constant)
# 7:Turn left
# 8:Diagonal left forward(Curve diameter is constant
if __name__ == '__main__':
mqttc = paho.Client() #mqtt
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
message_json = json.dumps(message)
mqttc.connect("xxx.xxx.xxx.xxx", 9075, 60) #IPアドレス, ポート, キープアライブ
mqttc.publish(topic, message_json) #TOPIC
mqttc.loop_forever()
paho.mqtt.client
ライブラリを使用してMQTT
通信を行うためのサンプルコードです。以下で各部分の機能と動作を解説します。
ライブラリのインポート:
import paho.mqtt.client as paho
import json
paho.mqtt.client
ライブラリとJSON
ライブラリをインポートしています。
コールバック関数の定義:
def on_connect(client, userdata, flags, rc):
print("rc: "+str(rc))
on_connect
関数は、サーバーへの接続が確立されたときに実行されるコールバック関数です。この場合、接続の結果コード(rc)
を表示しています。
コールバック関数の定義:
def on_message(self, obj, msg):
print(msg.topic + " " + str(msg.payload))
on_message
関数は、メッセージが受信されたときに実行されるコールバック関数です。受信したトピック名とペイロードを表示しています。
コールバック関数の定義:
def on_publish(self, obj, mid):
print("mid: "+str(mid))
on_publish
関数は、メッセージのパブリッシュが完了したときに実行されるコールバック関数です。パブリッシュのメッセージID
を表示しています。
コールバック関数の定義:
def on_subscribe(self, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos))
on_subscribe
関数は、購読が完了したときに実行されるコールバック関数です。購読のメッセージID
とQoS
のレベルを表示しています。
コールバック関数の定義:
def on_log(self, obj, level, string):
print(string)
on_log
関数は、デバッグログを表示するためのコールバック関数です。
トピックとメッセージの設定:
topic = "control/joy"
message = {
"data": x
}
トピック名とメッセージを設定します。メッセージ内のdata
キーには、xに対応する動作モードが入ることを想定しています。
メインの処理:
if __name__ == '__main__':
mqttc = paho.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
message_json = json.dumps(message)
mqttc.connect("xxx.xxx.xxx.xxx", 9075, 60) #IPアドレス, ポート, キープアライブ
mqttc.publish(topic, message_json) #TOPIC
mqttc.loop_forever()
メインの処理では、paho.Client()
を使用してMQTT
クライアントを作成し、各コールバック関数を設定しています。メッセージをJSON
形式に変換し、指定したIPアドレス
とポートに接続し、指定したトピックにメッセージをパブリッシュします。最後に、mqttc.loop_forever()
を呼び出すことで、メッセージの送信やコールバック関数の処理を継続的に行います。
このコードは、指定したIPアドレス
とポートで動作するMQTT
ブローカーに接続し、指定したトピックにメッセージをパブリッシュする基本的な処理を行います。
参考コード例 Python
REST API:タスクセットの実行
MQTT API:ステータス情報の取得
Ctl + C で停止
import requests # Need to install "requests" library
import json
import paho.mqtt.client as mqtt
import time
url = 'http://XXX.XXX.XXX.XXX:9085/v2/taskset/exec' #Enter IP address
params = {"cmdSetId": XXX } #enter cmdSetId
res = requests.post(url, json=params)
data = json.loads(res.text)
print(data)
PROTOCOL_VERSION = mqtt.MQTTv311
# MQTT broker address
BROKER_ADDRESS = "XXX.XXX.XXX.XXX" # Please change this to ALI's IP address.
# MQTT broker port
BROKER_PORT = 9075
# MQTT topics and run
TOPIC = "status"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
time.sleep(5)
client.subscribe(TOPIC)
else:
print("Connection failed")
def on_message(client, userdata, msg):
if msg.topic=="status":
payload=json.loads(msg.payload.decode('utf-8'))
#print(str(payload) + "\n") #Normal String view
message = json.dumps(payload, indent=4)
print(message + "\n") #For JSON format view
def main():
client = mqtt.Client(protocol=PROTOCOL_VERSION)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER_ADDRESS, BROKER_PORT)
try:
client.loop_forever()
except KeyboardInterrupt: #Terminate by using ctl^c on keyboard
print("Terminating")
client.disconnect()
if __name__ == "__main__":
main()