在 2022.9.29 举办的线下 Shifu Meetup 活动中,来自边无际的杨希杰在现场展示用 Shifu 接入多个实际物联网设备,通过这种直观的接入方式展示了 Shifu 框架接入设备迅速、无单点故障、隔离性好、应用开发便捷等优点。
这次活动总共接入了五个设备,分别是 MQTT服务器、RS485的温湿度计 和 RS485的LED、西门子 S7 PLC、海康威视摄像头——这些都是比较常见的物联网设备。下面来让我们回忆一下接入过程吧。


创建集群并安装Shifu
首先我们需要在本地开启 Docker。使用 Windows 或 macOS 的搜索打开 Docker Desktop,最小化到后台即可。

之后我们需要用 kind 创建一个k8s集群。后续 Shifu 和物联网设备的数字孪生都会以 Pod 的形式存在于这个集群中:
# 创建集群
$ sudo kind create cluster --image="kindest/node:v1.24.0"
# 提前准备镜像导入集群
$ sudo docker pull bitnami/kube-rbac-proxy:0.13.1 
$ sudo docker pull edgehub/shifu-controller:v0.1.1
$ sudo docker pull nginx:1.21
$ sudo kind load docker-image bitnami/kube-rbac-proxy:0.13.1 edgehub/shifu-controller:v0.1.1 nginx:1.21

Shifu 支持一键安装,只需要先克隆Shifu仓库,之后用一条命令部署即可:
# 安装shifu
$ git clone https://github.com/Edgenesis/shifu.git
$ cd shifu
$ sudo kubectl apply -f pkg/k8s/crd/install/shifu_install.yml
# 跑一个应用程序 之后会用到
$ sudo kubectl run --image=nginx:1.21 nginx

您也可以查看更详细的本地安装Shifu教程。
设备接入
MQTT
测试MQTT服务器
我们已经部署了一个MQTT服务器,可以先打开两个shell进行测试:
# shellA
$ mosquitto_sub -h 82.157.170.202 -t topic0
# shellB
$ mosquitto_pub -h 82.157.170.202 -t topic0 -m "哈哈哈"
可以看到发送的信息可以被正确接收。

接入设备
接下来我们可以先修改对应的配置,下载对应的镜像,然后用 kubectl apply 命令一键将MQTT服务器作为一个数字孪生接入 Shifu。
修改examples/my_mqtt/mqtt_deploy中的spec.address为82.157.170.202:1883,spec.protocolSettings.MQTTSetting.MQTTTopic为topic0。
$ sudo docker pull edgehub/deviceshifu-http-mqtt:v0.1.1
$ sudo kind load docker-image edgehub/deviceshifu-http-mqtt:v0.1.1
$ sudo kubectl apply -f examples/my_mqtt/mqtt_deploy

读取数据
我们可以通过在集群中启动一个 nginx 应用来与数字孪生交互。
$ sudo kubectl exec -it nginx -- bash
$ curl http://deviceshifu-mqtt.deviceshifu.svc.cluster.local/mqtt_data

连接温度计和LED
连接设备至电脑
- 温度计使用串口服务器通过网线连接至电脑
- LED使用RS485转USB的芯片连接至电脑
本地启动HTTP服务
因为目前 Shifu 还不支持 Modbus协议,所以我们需要将Modbus读取的数据转为HTTP数据。
$ cd api_thermometer
$ uvicorn --host 0.0.0.0 --port 23330 main:app
$ cd api_led
$ uvicorn --host 0.0.0.0 --port 23331 main:app
api_thermometer文件夹内容
main.py
from fastapi import FastAPI
from typing import List
from pymodbus.client.sync import ModbusTcpClient
app = FastAPI()
def getHumidityAndTemperature() -> List[float]:
    """
    返回从TAS-LAN-460得到的温度和湿度
    """
    client = ModbusTcpClient(host='192.168.0.80', port=10123) # TAS-LAN-460的端口
    client.connect()
    SLAVE = 0x01
    r = client.read_holding_registers(address=0x0000, count=2, unit=SLAVE)
    print("自己拿到的数据", r.registers)
    client.close()
    result = [r.registers[0] / 10, r.registers[1] / 10]
    return result
@app.get("/")
def root():
    return { "message": "Hello World" }
@app.get("/temperature")
def getTemperature():
    temperature = getHumidityAndTemperature()[1]
    return { "value": f"{temperature}" }
@app.get("/humidity")
def getHumidity():
    humidity = getHumidityAndTemperature()[0]
    return { "value": f"{humidity}" }
requirements.txt
fastapi
pymodbus
api_led文件夹内容
main.py
from fastapi import FastAPI
from pymodbus.client.sync import ModbusSerialClient
from typing import List, Dict
app = FastAPI()
class ZhongshengLed:
    """
    DEVICE_NAME = "中盛数码管显示屏"
    """
    def __init__(self, device_address: int = 0x01, port: str = '/dev/tty.usbserial-14120') -> None:
        self.device_address = device_address
        self.client = ModbusSerialClient(method='rtu', port=port,  stopbits=1, bytesize=8, parity='N', baudrate=9600, timeout=2.0)
    
    def setLedCharacter(self, position: int, character: str):
        self.setLedAscii(position=position, ascii_value=ZhongshengLed.character2ascii[character])
    def setLedAscii(self, position: int, ascii_value: int):
        self.client.connect()
        self.client.write_register(address=position, value=ascii_value, unit=self.device_address)
        self.client.close()
    def setFourLedsString(self, string: str):
        self.setFourLedsAsciis(ascii_values=[ZhongshengLed.character2ascii[string[0]], ZhongshengLed.character2ascii[string[1]], ZhongshengLed.character2ascii[string[2]], ZhongshengLed.character2ascii[string[3]]])
    def setFourLedsAsciis(self, ascii_values: List[int]):
        self.client.connect()
        self.client.write_registers(address=ZhongshengLed.LedPosition.one, values=ascii_values, unit=self.device_address)
        self.client.close()
    class LedPosition:
        one = 0
        two = 1
        three = 2
        four = 3
    
    character2ascii: Dict[str, int] = {
        "0": 0x30, "1": 0x31, "2": 0x32, "3": 0x33, "4": 0x34, 
        "5": 0x35, "6": 0x36, "7": 0x37, "8": 0x38, "9": 0x39,
        ".": 0x2e, "-": 0x2d, " ": 0x20
    }
    def setDot(self, count: int = 1):
        self.client.connect()
        self.client.write_register(address=16, value=count, unit=self.device_address)
        self.client.close()
    def setNegative(self, isNegative: bool = False):
        self.client.connect()
        self.client.write_register(address=17, value=1 if isNegative else 0, unit=self.device_address)
        self.client.close()
    
    def setFloat(self, value: float):
        """
        显示一位的小数
        """
        self.setDot(count=1)
        if value < 0:
            self.setNegative(True)
        else:
            self.setNegative(False)
        data = int(abs(value) * 10)
        self.client.connect()
        self.client.write_register(address=7, value=data, unit=self.device_address)
        # self.client.write_register(address=16, value=value, unit=self.device_address)
        self.client.close()
    def setBrightness(self, brightness: int = 7):
        self.client.connect()
        self.client.write_register(address=14, value=brightness, unit=self.device_address)
        self.client.close()
device = ZhongshengLed()
@app.get("/")
def root():
    return { "message": "Hello World" }
@app.get("/setfloat/{value}")
def setTemperature(value: float):
    device.setFloat(value=value)
    return { "OK": "OK" }
@app.get("/setfloat/{value}")
def setTemperature(value: float):
    device.setFloat(value=value)
    return { "OK": "OK" }
@app.get("/setfloat")
def setTemperature(value: float = 0.0):
    device.setFloat(value=value)
    return { "OK": "OK" }
requirements.txt
fastapi
pymodbus
本地验证
$ curl http://localhost:23330/temperature
$ curl http://localhost:23330/humidity
$ curl http://localhost:23331/setfloat\?value\=123.4

接入设备
- 修改http_thermometer/deployment/http_edgedevice.yaml中的ip地址。
- 修改http_led/deployment/http_edgedevice.yaml中的ip地址。
$ sudo docker pull edgehub/deviceshifu-http-http:v0.1.1
$ sudo kind load docker-image edgehub/deviceshifu-http-http:v0.1.1
$ sudo kubectl apply -f examples/my_http_led/deployment
$ sudo kubectl apply -f examples/my_http_thermometer/deployment
与设备交互
打开 nginx 与温湿度计交互:
$ sudo kubectl exec -it nginx -- bash
$ curl http://my-thermometer.deviceshifu.svc.cluster.local/temperature
$ curl http://my-thermometer.deviceshifu.svc.cluster.local/humidity
$ curl http://my-led.deviceshifu.svc.cluster.local/setfloat?value=23.4
应用开发
将温度和湿度读取,然后在LED上面间歇显示出来。
$ sudo docker build -t yangxijie/connection:v0.0.1 .
$ sudo docker images | grep connection
yangxijie/connection  v0.0.1  a9526147ddad  2 minutes ago  125MB
$ sudo kind load docker-image yangxijie/connection:v0.0.1
$ sudo kubectl run --image=yangxijie/connection:v0.0.1 connection-name
该应用的图解如下:

可以看到应用跑起来之后,LED显示屏上交替显示

当前文件夹文件
main.py
import time
import requests
import json
isLocal = False
localIp = "192.168.31.138"
flag = -1
while True:
    flag += 1
    # [拿到数据]
    if flag % 2 == 0:
        # 拿到温度
        url = f"http://{localIp}:23330/temperature" if isLocal else "http://my-thermometer.deviceshifu.svc.cluster.local/temperature"
    else:
        # 拿到湿度
        url = f"http://{localIp}:23330/humidity" if isLocal else "http://my-thermometer.deviceshifu.svc.cluster.local/humidity"
    res = requests.get(url)
    # [转换数据]
    try:
        value = json.loads(res.text)['value']
        print("DEBUG", value)
        # [显示数据]
        led_url = f"http://{localIp}:23331/setfloat?value={value}" if isLocal else f"http://my-led.deviceshifu.svc.cluster.local/setfloat?value={value}"
        requests.get(led_url)
    except:
        print("DEBUG", res.text)
    time.sleep(2)
requirements.txt
requests
Dockerfile
FROM python:3.9-slim-bullseye
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["python3", "main.py"]
西门子PLC
接入设备
需要先修改IP地址为PLC的地址。
$ sudo docker pull edgehub/deviceshifu-http-http:v0.1.1
$ sudo docker pull edgehub/plc-device:v0.0.1
$ sudo kind load docker-image edgehub/deviceshifu-http-http:v0.1.1 edgehub/plc-device:v0.0.1
$ sudo kubectl apply -f examples/my_plc/plc-deployment
与设备交互
这里我们修改PLC上的一个比特,可以看到PLC的上的指示灯亮起来。在实际场景中,PLC会控制机械臂等大型设备进行操作。
$ sudo kubectl run nginx --image=nginx:1.21 -n deviceshifu 
$ sudo kubectl exec -it nginx -n deviceshifu -- bash
$ curl "deviceshifu-plc/sendsinglebit?rootaddress=Q&address=0&start=0&digit=1&value=1"; echo

海康威视摄像头
接入设备
获取摄像头ip地址,修改rtsp/camera-deployment/deviceshifu-camera-deployment.yaml中的ip地址
$ sudo docker pull edgehub/deviceshifu-http-http:v0.1.1
$ sudo docker pull edgehub/camera-python:v0.0.1
$ sudo kind load docker-image edgehub/deviceshifu-http-http:v0.1.1 edgehub/camera-python:v0.0.1
$ sudo kubectl apply -f examples/my_rtsp/camera-deployment
与设备交互
在集群中可以使用nginx查看设备信息:
# 集群中使用curl交互
$ sudo kubectl exec -it nginx -- bash
$ curl http://deviceshifu-camera.deviceshifu.svc.cluster.local/info

可以看到摄像头的数字孪生支持的指令为:

查看摄像头捕获的图片,我们需要将端口转发到本机,在浏览器上访问。
# 本机浏览器访问
$ sudo kubectl port-forward svc/deviceshifu-camera -n deviceshifu 8080:80
# 输入`localhost:8080/info`查看信息
# 输入`localhost:8080/capture`获取图片
# 输入`localhost:8080/move/{up|down|left|right}`操控相机位置
# 输入`localhost:8080/stream?timeout=0`获取实时视频流
总结
此次 Shifu Meetup 活动的顺利举办。可以看到 Shifu 能够让开发者快速接入设备、将各种协议统一转为HTTP方便管理和后续应用开发。Shifu 也有无单点故障、隔离性好等多种优势。
如果您对 Shifu 产生兴趣,欢迎访问 Shifu官网 了解更多。也欢迎您在 Shifu的GitHub仓库 给项目一个star!