MQTT リアルタイムフィードトピック
小林
最後の更新7日前
注意:
チャンネルフィードサービスは、UbiBot Plus メンバーシップのブロンズレベル以上のユーザーのみ利用可能です。無料アカウントでは利用できません。
- Host: mqtt-api.ubibot.com
- Port: 1883 or 8883 (for SSL encrypted connection)
- WebSocket Port: 8083 or 8084 (for SSL encrypted connection)
- WebSocket Path:/mqtt
- Username: Format: user_id=USER_ID
- Password: Format: account_key=ACCOUNT_KEY
USER_IDおよびACCOUNT_KEYは、UbiBotコンソールから取得した実際の認証情報に置き換えてください。
3.MQTTサブスクリプションハートビート
リアルタイムフィードデータを受信するには、クライアントは以下のURLに対してハートビートHTTP GETリクエストを送信する必要があります:
https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping
サブスクリプションを維持するためには、少なくとも300秒ごとにハートビートを送信する必要があります。指定された時間内にハートビートが受信されない場合、サーバーはそのアカウントに対してMQTTフィードデータのプッシュを停止します。240秒ごとにハートビートを送信することを推奨します。ハートビートを過度に頻繁に送信しないよう(例:10秒ごと以上)ご注意ください。
4.ハートビートリクエストパラメータ
account_key(文字列、必須)
user_id(任意):他のユーザーIDのカンマ区切りリスト(最大20個)、そのユーザーのデバイスをサブスクライブするために使用します。
5.MQTT トピック
- 自分のアカウントのすべてのフィードデータを購読するには:
- 特定のチャンネルを購読するには:
USER_ID と CHANNEL_ID を適切に置き換えてください。また、他のアカウントのフィードを購読するには、そのユーザーの user_id を指定できます。
注意: 他のユーザーがデータ共有(共有許可)を UbiBot コンソールで自分のアカウントに対して有効にしている必要があります。そうでない場合、そのユーザーのデータは受信できません。
Python – MQTT フィード購読の例
# -*- coding: utf-8 -*-
# UbiBot MQTT Feed Subscription with Heartbeat (Python)
import paho.mqtt.client as mqtt
import threading
import requests
import time
# Replace with your actual credentials
USER_ID = "your_user_id"
ACCOUNT_KEY = "your_account_key"
OTHER_USER_IDS = "" # Optional, e.g., "user1,user2"
# MQTT connection settings
MQTT_HOST = "mqtt-api.ubibot.com"
MQTT_PORT = 1883
MQTT_USERNAME = f"user_id={USER_ID}" MQTT_PASSWORD = f"account_key={ACCOUNT_KEY}"
MQTT_TOPIC = f"/user/{USER_ID}/channel_feeds/#"
# Heartbeat settings
HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping"
HEARTBEAT_INTERVAL = 240 # seconds
# Heartbeat function
def send_heartbeat():
params = {
"account_key": ACCOUNT_KEY
}
if OTHER_USER_IDS:
params["user_id"] = OTHER_USER_IDS
try:
response = requests.get(HEARTBEAT_URL, params=params, timeout=5)
print(f"[HEARTBEAT] Sent. Status: {response.status_code}, Response: {response.text}") except Exception as e:
print(f"[HEARTBEAT] Failed: {e}")
# Schedule next heartbeat
threading.Timer(HEARTBEAT_INTERVAL, send_heartbeat).start()
# MQTT Callbacks
def on_message(client, userdata, msg):
print(f"[RECV] Topic: {msg.topic}")
print(f"[RECV] Payload: {msg.payload.decode()}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("[INFO] Connected successfully.")
client.subscribe(MQTT_TOPIC)
print(f"[INFO] Subscribed to: {MQTT_TOPIC}")
else:
print(f"[ERROR] Connection failed with code {rc}")
# Start MQTT client
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) client.on_connect = on_connect
client.on_message = on_message
print("[INFO] Connecting to MQTT broker...")
client.connect(MQTT_HOST, MQTT_PORT, 60)
# Start heartbeat thread
send_heartbeat()
# Start MQTT loop
client.loop_forever()
NodeJS – MQTT フィード購読の例
// Node.js – MQTT Feed Subscription Example with Heartbeat
const mqtt = require('mqtt');
const https = require('https');
const querystring = require('querystring');
// Replace with your actual credentials
const USER_ID = 'your_user_id';
const ACCOUNT_KEY = 'your_account_key';
const OTHER_USER_IDS = ''; // Optional, e.g., 'user1,user2'
const options = {
username: `user_id=${USER_ID}`,
password: `account_key=${ACCOUNT_KEY}`
};
const topic = `/user/${USER_ID}/channel_feeds/#`;
const client = mqtt.connect('mqtt://mqtt-api.ubibot.com:1883', options);
client.on('connect', () => {
console.log('[INFO] Connected to MQTT broker.');
client.subscribe(topic, (err) => {
if (!err) {
console.log('[INFO] Subscribed to:', topic);
} else {
console.error('[ERROR] Subscribe failed:', err.message);
}
});
// Start sending heartbeat
sendHeartbeat(); setInterval(sendHeartbeat, 240000); // every 240 seconds });
client.on('message', (topic, message) => {
console.log(`[RECV] Topic: ${topic}`);
console.log(`[RECV] Payload: ${message.toString()}`);
});
function sendHeartbeat() {
const params = {
account_key: ACCOUNT_KEY
};
if (OTHER_USER_IDS) {
params.user_id = OTHER_USER_IDS;
}
const query = querystring.stringify(params);
const url = `https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping?${query}`;
https.get(url, (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
console.log(`[HEARTBEAT] Status: ${res.statusCode}, Response: ${data}`);
});
}).on('error', (err) => {
console.error(`[HEARTBEAT] Error: ${err.message}`);
});
}
C# – MQTT フィード購読の例
// C# – MQTT Feed Subscription Example with Heartbeat
// Requires MQTTnet (via NuGet) and System.Net.Http
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class Program
{
private static readonly string USER_ID = "your_user_id";
private static readonly string ACCOUNT_KEY = "your_account_key"; private static readonly string OTHER_USER_IDS = ""; // Optional: "user1,user2"
private static readonly string TOPIC = $"/user/{USER_ID}/channel_feeds/#";
private static readonly string HEARTBEAT_URL = "https://webapi.ubibot.com/mqtt-user-feeds/subcribe-ping";
private static readonly int HEARTBEAT_INTERVAL = 240; // seconds
private static readonly HttpClient httpClient = new HttpClient();
static async Task Main(string[] args)
{
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("mqtt-api.ubibot.com", 1883) .WithCredentials($"user_id={USER_ID}", $"account_key={ACCOUNT_KEY}")
.WithCleanSession()
.Build();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("[INFO] Connected to MQTT broker.");
await mqttClient.SubscribeAsync(TOPIC);
Console.WriteLine($"[INFO] Subscribed to: {TOPIC}");
// Start heartbeat loop
_ = Task.Run(() => StartHeartbeatLoop());
});
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine($"[RECV] Topic: {e.ApplicationMessage.Topic}"); Console.WriteLine($"[RECV] Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
mqttClient.UseDisconnectedHandler(e =>
{
Console.WriteLine("[WARN] Disconnected from MQTT broker.");
});
Console.WriteLine("[INFO] Connecting...");
await mqttClient.ConnectAsync(options);
Console.WriteLine("[INFO] Press any key to exit.");
Console.ReadLine();
}
static async Task StartHeartbeatLoop()
{
while (true)
{
try
{
var uriBuilder = new UriBuilder(HEARTBEAT_URL);
var query = $"account_key={ACCOUNT_KEY}";
if (!string.IsNullOrEmpty(OTHER_USER_IDS))
{
query += $"&user_id={OTHER_USER_IDS}";
}
uriBuilder.Query = query;
var response = await httpClient.GetAsync(uriBuilder.Uri);
var result = await response.Content.ReadAsStringAsync();
Console.WriteLine($"[HEARTBEAT] Status: {response.StatusCode}, Response: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"[HEARTBEAT] Error: {ex.Message}");
}
await Task.Delay(HEARTBEAT_INTERVAL * 1000);
}
}
}