socket_io.dart 3.5 KB
import 'dart:async';
import 'dart:developer';
import 'dart:io';

import 'package:web_socket_channel/io.dart';
import 'socket_message.dart';

typedef MessageCallBack = void Function(dynamic message);

class Socket {
  // io通道
  IOWebSocketChannel? channel;
  // 接受的消息体
  SocketMessageBody? _lastMessage;
  // 心跳包定时器
  Timer? heartbeatTimer;
  // 心跳包发送间隔
  Duration heartbeatInterval = const Duration(seconds: 15); // 心跳包发送间隔
  // 私有构造函数
  Socket._internal();

  MessageCallBack? onReceivedMessage;

  // 静态私有实例
  static Socket? _instance;

  // 公共静态方法
  static Socket getInstance() {
    _instance ??= Socket._internal();
    return _instance!;
  }

  /// 私有变量
  String? url;

  Map<String, String>? params;
  Map<String, String>? headers;

  Future<void> connect(
    String url,
    Map<String, String> params,
    Map<String, String> headers,
  ) async {
    if (url.isEmpty) {
      return;
    }
    // 缓存参数用于重连
    this.url = url;
    this.params = params;
    this.headers = headers;

    var uri = generateUrlWithParams(
      baseUrl: url,
      params: params,
    );
    var socket = await WebSocket.connect(
      uri.toString(),
      headers: headers,
    );
    channel = IOWebSocketChannel(socket);
    startHeartbeat();
    channel?.stream.listen(
      (message) {
        log("connectId: ${params['connectId']} Received message: $message");
        if (message is String) {
          if (message == '') {
            _lastMessage = null;
            return;
          }
          var msg = SocketMessageBody.fromJson(message);
          if (msg.type == 3) {
            onReceivedMessage?.call(msg.content);
          }
          _lastMessage = msg;
        }
      },
      onError: (error) {
        log("WebSocket Error: $error");
      },
      onDone: () {
        log("WebSocket connection closed");

        if (_lastMessage?.type == 2) {
          /// 服务器主动断开连接、如果可以重连
          if (_lastMessage?.content?['canReconnect'] == 1) {
            reconnectWebSocket();
          } else {
            _clean();
          }
          return;
        } else {
          if (heartbeatTimer != null) {
            reconnectWebSocket();
          }
        }
      },
    );
  }

  /// 关闭连接
  Future close() async {
    _clean();
    heartbeatTimer?.cancel();
    heartbeatTimer = null;
    await channel?.sink.close();
    channel = null;
    return Future.value();
  }

  /// 清除参数
  void _clean() {
    url = null;
    params = null;
    onReceivedMessage = null;
  }

  /// 启动心跳包定时器
  void startHeartbeat() {
    heartbeatTimer = Timer.periodic(heartbeatInterval, (timer) {
      if (channel != null) {
        log('Sending heartbeat');
        channel?.sink.add('{"type":1}');
      }
    });
  }

  /// 重新连接 WebSocket
  void reconnectWebSocket() {
    // 停止心跳包定时器
    heartbeatTimer?.cancel();
    channel?.sink.close();
    channel = null;
    heartbeatTimer = null;
    // 重新连接 WebSocket
    Future.delayed(const Duration(seconds: 1), () {
      connect(
        url ?? '',
        params ?? {},
        headers ?? {},
      );
    });
  }

  /// 生成带参数的 URL
  static Uri generateUrlWithParams({
    required String baseUrl,
    required Map<String, String> params,
  }) {
    final Uri baseUri = Uri.parse(baseUrl);
    final newUri = baseUri.replace(queryParameters: {
      ...baseUri.queryParameters,
      ...params,
    });
    log(newUri.toString());
    return newUri;
  }
}