参考链接
他们的设计思想更加完善,而我只是初学者...
云逸之: ESP32CAM 视频小车
Yehchitsai: ESP32-CAM 使用 MicroPython
进行开发 - 使用图形化工具 Thonny (Mac)
b站王铭东老师
使用的配件
- esp32-cam
- 杜邦线若干
- L298N 双H桥电机驱动模块
- 智能小车套件
设计思想
利用websocket协议通信来控制小车的前进,后退与转向。其中esp32-cam端websocket通信使用第三方库micropython-async_websocket_client,server端使用fastapi完成开发。
esp32-cam代码
main.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import uasyncio as asyncio
import network
from async_websocket_client import AsyncWebsocketClient
from EspPoint import EspPoint
url = 'ws://192.168.101.7:8000/socket/'
cpid = 'esp32_1'
wifi_name = "ChinaNet-44DM"
wifi_password = "123"
async def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
while not wlan.isconnected():
wlan.connect(wifi_name, wifi_password)
await asyncio.sleep(5)
async def connect():
ws = AsyncWebsocketClient(5)
while True:
try:
await ws.handshake(url+cpid)
if await ws.open():
print('ws start.')
ep = EspPoint(ws, cpid)
await ep.start()
else:
print('ws open error, try reconnect.')
except:
print('waln error!')
await wifi_connect()
if __name__ == '__main__':
asyncio.run(connect())1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104import uasyncio as asyncio
import time
import json
from machine import Pin
import network
p12 = Pin(12, Pin.OUT)
p13 = Pin(13, Pin.OUT)
# 控制1个电机
p14 = Pin(14, Pin.OUT)
p15 = Pin(15, Pin.OUT)
class ResponseCommand:
def __init__(self, recv_message):
"""
:param recv_message:
json字段为message_type, message_id, action, payload
"""
self.origin_message = recv_message
self.json_message = json.loads(recv_message)
self.parse(self.json_message)
def parse(self, res):
if len(res) == 4:
self.message_type = res['message_type']
self.message_id = res['message_id']
self.action = res['action']
self.payload = res['payload']
else:
# 此处为error!
pass
def __str__(self):
return self.origin_message
def display(self):
if len(self.json_message)==4:
print([self.message_type, self.message_id, self.action, self.payload])
else:
print('message error!')
class EspPoint:
def __init__(self, ws, _id) -> None:
self._ws = ws
self.id = _id
async def start(self):
await asyncio.gather(self.erecv(), self.esend())
async def wifi_connect(self):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
while not wlan.isconnected():
wlan.connect(wifi_name, wifi_password)
await asyncio.sleep(10)
async def erecv(self):
while True:
message = await self._ws.recv()
if message:
res = ResponseCommand(message)
self.distribute_res(res, p12, p13, p14, p15)
else:
print('recv message is none.')
async def esend(self):
while True:
await self._ws.send("heartbeat")
await asyncio.sleep(10)
def distribute_res(self, rc: ResponseCommand, p12, p13, p14, p15):
if rc.action == 'move front':
# 前进
p13.value(1)
p12.value(0)
p14.value(1)
p15.value(0)
elif rc.action == 'move back':
# 后退
p13.value(0)
p12.value(1)
p14.value(0)
p15.value(1)
elif rc.action == 'move left':
# 左转
p13.value(0)
p12.value(1)
p14.value(1)
p15.value(0)
elif rc.action == 'move right':
# 右转
p13.value(1)
p12.value(0)
p14.value(0)
p15.value(1)
elif rc.action == 'move stop':
p13.value(0)
p12.value(0)
p14.value(0)
p15.value(0)
else:
rc.display()
Server端代码
websocket_32.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34from fastapi import APIRouter, Depends, WebSocket
import json
router = APIRouter()
client_dict = {}
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
client_dict[client_id] = websocket
while True:
data = await websocket.receive_text()
res = {
'message_type': 'client send',
'message_id': 'client send',
'action': data,
'payload': {}
}
await websocket.send_text(json.dumps(res))
async def run(client_id, action):
if client_id in client_dict.keys():
ws = client_dict[client_id]
res = {
'message_type': 'server run',
'message_id': 'server run',
'action': action,
'payload': {}
}
await ws.send_text(json.dumps(res))
else:
return 'client ont fount!'
End
该项目,未完成,正在进行中...