MQTT リアルタイムフィードトピック

小林

最後の更新7日前

1.概要
UbiBot MQTT リアルタイムフィードサービスは、MQTT プロトコルを介して UbiBot プラットフォームからリアルタイムでデータフィードを配信するサービスです。クライアントは自分自身または共有チャンネルのフィードトピックを購読することで、オリジナルのセンサーデータを即座に受信できます。購読状態を維持するためには、ハートビート機構が必要です。


注意:
 チャンネルフィードサービスは、UbiBot Plus メンバーシップのブロンズレベル以上のユーザーのみ利用可能です。無料アカウントでは利用できません。

2.MQTT接続詳細
  • Host: mqtt-api.ubibot.com
  • Port: 1883 or 8883 (for SSL encrypted connection)
  • WebSocket Port: 8083 or 8084 (for SSL encrypted connection)
  • WebSocket Path:/mqtt
  • Username: Format: user_id=USER_ID
  • Password: Format: account_key=ACCOUNT_KEY

USER_IDおよびACCOUNT_KEYは、UbiBotコンソールから取得した実際の認証情報に置き換えてください。

3.MQTTサブスクリプションハートビート

リアルタイムフィードデータを受信するには、クライアントは以下のURLに対してハートビートHTTP GETリクエストを送信する必要があります:

https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping

サブスクリプションを維持するためには、少なくとも300秒ごとにハートビートを送信する必要があります。指定された時間内にハートビートが受信されない場合、サーバーはそのアカウントに対してMQTTフィードデータのプッシュを停止します。240秒ごとにハートビートを送信することを推奨します。ハートビートを過度に頻繁に送信しないよう(例:10秒ごと以上)ご注意ください。

4.ハートビートリクエストパラメータ

account_key(文字列、必須)
user_id(任意):他のユーザーIDのカンマ区切りリスト(最大20個)、そのユーザーのデバイスをサブスクライブするために使用します。

5.MQTT トピック

  • 自分のアカウントのすべてのフィードデータを購読するには:
        /user/USER_ID/channel_feeds/#
  • 特定のチャンネルを購読するには:
        /user/USER_ID/channel_feeds/CHANNEL_ID


USER_ID と CHANNEL_ID を適切に置き換えてください。また、他のアカウントのフィードを購読するには、そのユーザーの user_id を指定できます。
注意: 他のユーザーがデータ共有(共有許可)を UbiBot コンソールで自分のアカウントに対して有効にしている必要があります。そうでない場合、そのユーザーのデータは受信できません。

Python – MQTT フィード購読の例

# -*- coding: utf-8 -*-
# UbiBot MQTT Feed Subscription with Heartbeat (Python)


import paho.mqtt.client as mqtt

import threading
import requests

import time


# Replace with your actual credentials

USER_ID = "your_user_id"

ACCOUNT_KEY = "your_account_key"

OTHER_USER_IDS = "" # Optional, e.g., "user1,user2"


# MQTT connection settings

MQTT_HOST = "mqtt-api.ubibot.com"

MQTT_PORT = 1883

MQTT_USERNAME = f"user_id={USER_ID}" MQTT_PASSWORD = f"account_key={ACCOUNT_KEY}"

MQTT_TOPIC = f"/user/{USER_ID}/channel_feeds/#"


# Heartbeat settings

HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping"

HEARTBEAT_INTERVAL = 240 # seconds


# Heartbeat function

def send_heartbeat():

params = {

"account_key": ACCOUNT_KEY

}

if OTHER_USER_IDS:

params["user_id"] = OTHER_USER_IDS

try:

response = requests.get(HEARTBEAT_URL, params=params, timeout=5)

print(f"[HEARTBEAT] Sent. Status: {response.status_code}, Response: {response.text}") except Exception as e:

print(f"[HEARTBEAT] Failed: {e}")

# Schedule next heartbeat

threading.Timer(HEARTBEAT_INTERVAL, send_heartbeat).start()

# MQTT Callbacks

def on_message(client, userdata, msg):

print(f"[RECV] Topic: {msg.topic}")

print(f"[RECV] Payload: {msg.payload.decode()}")

def on_connect(client, userdata, flags, rc):

if rc == 0:

print("[INFO] Connected successfully.")

client.subscribe(MQTT_TOPIC)

print(f"[INFO] Subscribed to: {MQTT_TOPIC}")

else:

print(f"[ERROR] Connection failed with code {rc}")

# Start MQTT client

client = mqtt.Client()

client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) client.on_connect = on_connect

client.on_message = on_message

print("[INFO] Connecting to MQTT broker...")

client.connect(MQTT_HOST, MQTT_PORT, 60)

# Start heartbeat thread

send_heartbeat()

# Start MQTT loop

client.loop_forever()

NodeJS – MQTT フィード購読の例

// Node.js – MQTT Feed Subscription Example with Heartbeat

const mqtt = require('mqtt');

const https = require('https');

const querystring = require('querystring');

// Replace with your actual credentials

const USER_ID = 'your_user_id';

const ACCOUNT_KEY = 'your_account_key';

const OTHER_USER_IDS = ''; // Optional, e.g., 'user1,user2'

const options = {

username: `user_id=${USER_ID}`,

password: `account_key=${ACCOUNT_KEY}`

};

const topic = `/user/${USER_ID}/channel_feeds/#`;

const client = mqtt.connect('mqtt://mqtt-api.ubibot.com:1883', options);

client.on('connect', () => {

console.log('[INFO] Connected to MQTT broker.');

client.subscribe(topic, (err) => {

if (!err) {

console.log('[INFO] Subscribed to:', topic);

} else {

console.error('[ERROR] Subscribe failed:', err.message);

}
});

// Start sending heartbeat

sendHeartbeat(); setInterval(sendHeartbeat, 240000); // every 240 seconds });

client.on('message', (topic, message) => {

console.log(`[RECV] Topic: ${topic}`);

console.log(`[RECV] Payload: ${message.toString()}`);

});

function sendHeartbeat() {

const params = {

account_key: ACCOUNT_KEY

};

if (OTHER_USER_IDS) {

params.user_id = OTHER_USER_IDS;

}

const query = querystring.stringify(params);

const url = `https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping?${query}`;

https.get(url, (res) => {

let data = '';

res.on('data', (chunk) => { data += chunk; });

res.on('end', () => {

console.log(`[HEARTBEAT] Status: ${res.statusCode}, Response: ${data}`);

});

}).on('error', (err) => {

console.error(`[HEARTBEAT] Error: ${err.message}`);

});

}

C# – MQTT フィード購読の例

// C# – MQTT Feed Subscription Example with Heartbeat
// Requires MQTTnet (via NuGet) and System.Net.Http

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Client.Options;

using System;

using System.Net.Http;

using System.Text;

using System.Threading;

using System.Threading.Tasks;

class Program

{

private static readonly string USER_ID = "your_user_id";

private static readonly string ACCOUNT_KEY = "your_account_key"; private static readonly string OTHER_USER_IDS = ""; // Optional: "user1,user2"

private static readonly string TOPIC = $"/user/{USER_ID}/channel_feeds/#";

private static readonly string HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping";

private static readonly int HEARTBEAT_INTERVAL = 240; // seconds

private static readonly HttpClient httpClient = new HttpClient();

static async Task Main(string[] args)

{

var factory = new MqttFactory();

var mqttClient = factory.CreateMqttClient();

var options = new MqttClientOptionsBuilder()

.WithTcpServer("mqtt-api.ubibot.com", 1883) .WithCredentials($"user_id={USER_ID}", $"account_key={ACCOUNT_KEY}")

.WithCleanSession()

.Build();

mqttClient.UseConnectedHandler(async e =>

{

Console.WriteLine("[INFO] Connected to MQTT broker.");

await mqttClient.SubscribeAsync(TOPIC);

Console.WriteLine($"[INFO] Subscribed to: {TOPIC}");

// Start heartbeat loop

_ = Task.Run(() => StartHeartbeatLoop());

});

mqttClient.UseApplicationMessageReceivedHandler(e =>

{

Console.WriteLine($"[RECV] Topic: {e.ApplicationMessage.Topic}"); Console.WriteLine($"[RECV] Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

});

mqttClient.UseDisconnectedHandler(e =>

{

Console.WriteLine("[WARN] Disconnected from MQTT broker.");

});

Console.WriteLine("[INFO] Connecting...");

await mqttClient.ConnectAsync(options);

Console.WriteLine("[INFO] Press any key to exit.");

Console.ReadLine();

}

static async Task StartHeartbeatLoop()

{

while (true)

{

try

{

var uriBuilder = new UriBuilder(HEARTBEAT_URL);

var query = $"account_key={ACCOUNT_KEY}";

if (!string.IsNullOrEmpty(OTHER_USER_IDS))

{

query += $"&user_id={OTHER_USER_IDS}";

}

uriBuilder.Query = query;

var response = await httpClient.GetAsync(uriBuilder.Uri);

var result = await response.Content.ReadAsStringAsync();

Console.WriteLine($"[HEARTBEAT] Status: {response.StatusCode}, Response: {result}");

}
catch (Exception ex)

{

Console.WriteLine($"[HEARTBEAT] Error: {ex.Message}");

}

await Task.Delay(HEARTBEAT_INTERVAL * 1000);

}

}

}

この記事は役に立ちましたか。

0 人中 0 人がこの記事を気に入っています

ヘルプが必要ですか?メッセージをお送りください