1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import 'dart:async';
import 'dart:developer';
import 'dart:io';
import 'package:web_socket_channel/io.dart';
import 'socket_message.dart';
typedef MessageCallBack = void Function(dynamic message);
class Socket {
// io通道
IOWebSocketChannel? channel;
// 接受的消息体
SocketMessageBody? _lastMessage;
// 心跳包定时器
Timer? heartbeatTimer;
// 心跳包发送间隔
Duration heartbeatInterval = const Duration(seconds: 15); // 心跳包发送间隔
// 私有构造函数
Socket._internal();
MessageCallBack? onReceivedMessage;
// 静态私有实例
static Socket? _instance;
// 公共静态方法
static Socket getInstance() {
_instance ??= Socket._internal();
return _instance!;
}
/// 私有变量
String? url;
Map<String, String>? params;
Map<String, String>? headers;
Future<void> connect(
String url,
Map<String, String> params,
Map<String, String> headers,
) async {
if (url.isEmpty) {
return;
}
// 缓存参数用于重连
this.url = url;
this.params = params;
this.headers = headers;
var uri = generateUrlWithParams(
baseUrl: url,
params: params,
);
var socket = await WebSocket.connect(
uri.toString(),
headers: headers,
);
channel = IOWebSocketChannel(socket);
startHeartbeat();
channel?.stream.listen(
(message) {
log("connectId: ${params['connectId']} Received message: $message");
if (message is String) {
if (message == '') {
_lastMessage = null;
return;
}
var msg = SocketMessageBody.fromJson(message);
if (msg.type == 3) {
onReceivedMessage?.call(msg.content);
}
_lastMessage = msg;
}
},
onError: (error) {
log("WebSocket Error: $error");
},
onDone: () {
log("WebSocket connection closed");
if (_lastMessage?.type == 2) {
/// 服务器主动断开连接、如果可以重连
if (_lastMessage?.content?['canReconnect'] == 1) {
reconnectWebSocket();
} else {
_clean();
}
return;
} else {
if (heartbeatTimer != null) {
reconnectWebSocket();
}
}
},
);
}
/// 关闭连接
Future close() async {
_clean();
heartbeatTimer?.cancel();
heartbeatTimer = null;
await channel?.sink.close();
channel = null;
return Future.value();
}
/// 清除参数
void _clean() {
url = null;
params = null;
onReceivedMessage = null;
}
/// 启动心跳包定时器
void startHeartbeat() {
heartbeatTimer = Timer.periodic(heartbeatInterval, (timer) {
if (channel != null) {
log('Sending heartbeat');
channel?.sink.add('{"type":1}');
}
});
}
/// 重新连接 WebSocket
void reconnectWebSocket() {
// 停止心跳包定时器
heartbeatTimer?.cancel();
channel?.sink.close();
channel = null;
heartbeatTimer = null;
// 重新连接 WebSocket
Future.delayed(const Duration(seconds: 1), () {
connect(
url ?? '',
params ?? {},
headers ?? {},
);
});
}
/// 生成带参数的 URL
static Uri generateUrlWithParams({
required String baseUrl,
required Map<String, String> params,
}) {
final Uri baseUri = Uri.parse(baseUrl);
final newUri = baseUri.replace(queryParameters: {
...baseUri.queryParameters,
...params,
});
log(newUri.toString());
return newUri;
}
}