科技创新网 我要投稿   登录   注册
科技创新网
 
  VR之家您当前位置:首页 > VR之家  
开源项目:用环信MQTT实现"世界频道"只需5分钟【附源码】
发布时间:2022-04-26 15:28:41  来源:环信

说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。

当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~

使用MQTT实现世界频道-Demo效果演示

协议优势:

在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。

轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。

易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。

高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。

低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。

灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。

衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。

实现方案:

言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。

客户端实现:

客户端实现主要包含以下两部分:

底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。

APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。

接下来上底层MQTT业务集成代码。

引入SDK:

这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置

1// 在根目录 build.gradle repositories 下加入配置
2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
3...
4// 然后加入 MQTT 依赖
5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

方法封装

这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:

1 /**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 帮助类
  4 */
  5 object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 缓存主题集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 链接MQTT
 14     * @param id 用户 Id
 15     * @param token 用户链接 MQTT 的 Token
 16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 处理订阅主题
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接链接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //连接参数
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //设置自动重连
 31        options.isCleanSession = true // 缓存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒
 34        options.userName = id // 用户名
 35        options.password = token.toCharArray() // 密码
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 设置MQTT监听
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知链接断开
 41                VMLog.d("MQTT 链接断开 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未订阅则直接丢弃
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //进行连接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 链接成功")
 59                // 链接成功,循环订阅缓存的主题
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 链接失败 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 订阅主题
 71     * @param topic 主题
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //连接成功后订阅主题
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 订阅成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 订阅失败 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消订阅
 95     * @param topic 主题
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 发送 MQTT 消息
110     * @param topic 主题
111     * @param content 内容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 设置消息内容
116        msg.qos = 0 //设置消息发送质量,可为0,1,2.
117        // 设置消息的topic,并发送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息发送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息发送失败 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135 }

业务交互

和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。

1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题
 2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
 3
 4/**
 5 * 发送匹配信息
 6 */
 7private fun sendMatchInfo() {
 8    if (selfMatch.user.nickname.isEmpty()) return
 9    // 提交自己的匹配信息到服务器
10    mViewModel.submitMatch(selfMatch)
11    val json = JSONObject()
12    json.put("content", selfMatch.content)
13    json.put("emotion", selfMatch.emotion)
14    json.put("gender", selfMatch.gender)
15    json.put("type", selfMatch.type)
16    val jsonUser = JSONObject()
17    jsonUser.put("avatar", mUser.avatar)
18    jsonUser.put("id", mUser.id)
19    jsonUser.put("nickname", mUser.nickname)
20    jsonUser.put("username", mUser.username)
21    json.put("user", jsonUser)
22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
23}
24
25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个,
26// 订阅 MQTT 事件
27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
28        val match = JsonUtils.fromJson<Match>(it, Match::class.java)
29        // 这里收到匹配信息之后就增加一条弹幕
30    addBarrage(match)
31}

后端服务实现

接下来介绍后端服务实现,主要包含以下两部分:

配置连接信息:配置环信MQTT消息云连接信息。

获取鉴权信息:获取客户端连接需要的鉴权信息。

配置连接信息

配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内

1/**
 2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
 3 */
 4config.mqtt = {
 5    host: 'mqtt host', // MQTT 链接地址
 6  appId: 'appId', // MQTT AppId
 7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
 8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址
 9  clientId: 'client id', // 替换环信后台 clientId
10  clientSecret: 'client secret', // 替换环信后台 clientSecret
11};

获取鉴权信息

这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:

1/**
  2 * Create by lzan13 on 2022/3/22
  3 * 描述:MQTT 帮助类
  4 */
  5object MQTTHelper {
  6
  7    private var mqttClient: MqttAndroidClient? = null
  8
  9    // 缓存主题集合
 10    private val topicList = mutableListOf<String>()
 11
 12    /**
 13     * 链接MQTT
 14     * @param id 用户 Id
 15     * @param token 用户链接 MQTT 的 Token
 16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅
 17     */
 18    fun connect(id: String, token: String, topic: String = "") {
 19        // 处理订阅主题
 20        if (topic.isNotEmpty()) topicList.add(topic)
 21
 22        // 拼接链接地址
 23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
 24        // 拼接 clientId
 25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
 26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
 27
 28        //连接参数
 29        val options = MqttConnectOptions()
 30        options.isAutomaticReconnect = true //设置自动重连
 31        options.isCleanSession = true // 缓存
 32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒
 33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒
 34        options.userName = id // 用户名
 35        options.password = token.toCharArray() // 密码
 36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
 37        // 设置MQTT监听
 38        mqttClient?.setCallback(object : MqttCallback {
 39            override fun connectionLost(t: Throwable) {
 40                // 通知链接断开
 41                VMLog.d("MQTT 链接断开 $t")
 42            }
 43
 44            @Throws(Exception::class)
 45            override fun messageArrived(topic: String, message: MqttMessage) {
 46                // 通知收到消息
 47                VMLog.d("MQTT 收到消息:$message")
 48                // 如果未订阅则直接丢弃
 49                if (!topicList.contains(topic)) return
 50                notifyEvent(topic, String(message.payload))
 51            }
 52
 53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
 54        })
 55        //进行连接
 56        mqttClient?.connect(options, null, object : IMqttActionListener {
 57            override fun onSuccess(token: IMqttToken) {
 58                VMLog.d("MQTT 链接成功")
 59                // 链接成功,循环订阅缓存的主题
 60                topicList.forEach { subscribe(it) }
 61            }
 62
 63            override fun onFailure(token: IMqttToken, t: Throwable) {
 64                VMLog.d("MQTT 链接失败 $t")
 65            }
 66        })
 67    }
 68
 69    /**
 70     * 订阅主题
 71     * @param topic 主题
 72     */
 73    fun subscribe(topic: String) {
 74        if (!topicList.contains(topic)) {
 75            topicList.add(topic)
 76        }
 77        try {
 78            //连接成功后订阅主题
 79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
 80                override fun onSuccess(token: IMqttToken) {
 81                    VMLog.d("MQTT 订阅成功 $topic")
 82                }
 83
 84                override fun onFailure(token: IMqttToken, t: Throwable) {
 85                    VMLog.d("MQTT 订阅失败 $topic $t")
 86                }
 87            })
 88        } catch (e: MqttException) {
 89            e.printStackTrace()
 90        }
 91    }
 92
 93    /**
 94     * 取消订阅
 95     * @param topic 主题
 96     */
 97    fun unsubscribe(topic: String) {
 98        if (topicList.contains(topic)) {
 99            topicList.remove(topic)
100        }
101        try {
102            mqttClient?.unsubscribe(topic)
103        } catch (e: MqttException) {
104            e.printStackTrace()
105        }
106    }
107
108    /**
109     * 发送 MQTT 消息
110     * @param topic 主题
111     * @param content 内容
112     */
113    fun sendMsg(topic: String, content: String) {
114        val msg = MqttMessage()
115        msg.payload = content.encodeToByteArray() // 设置消息内容
116        msg.qos = 0 //设置消息发送质量,可为0,1,2.
117        // 设置消息的topic,并发送。
118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119            override fun onSuccess(asyncActionToken: IMqttToken) {
120                VMLog.d("MQTT 消息发送成功")
121            }
122
123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124                VMLog.d("MQTT 消息发送失败 ${exception.message}")
125            }
126        })
127    }
128
129    /**
130     * 通知 MQTT 事件
131     */
132    private fun notifyEvent(topic: String, data: String) {
133        LDEventBus.post(topic, data)
134    }
135}

源码地址

核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。

服务端github源码:

https://github.com/lzan13/vmtemplateserver

客户端github源码:

https://gitee.com/lzan13/VMTemplateAndroid

写在最后

MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。




[正文结束]
版权与免责声明:
    1. 本网注明来源为科技创新网的稿件,版权均属于科技创新网,未经科技创新网授权,不得转载、摘编使用。
    2. 本网注明“来源:XXX(非科技创新网)”的作品,均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。本网转载其他媒体之稿件,意在为公众提供免费服务。如稿件版权单位或个人不想在本网发布,可与本网联系,本网视情况可立即将其撤除。
    3. 如涉及作品内容、版权等其它问题,请在30日内同本网联系。邮箱:hnppxc @126.com
    特别提醒:本网刊发的所有商业信息,文章内容不代表本网观点,仅供参考。
上一篇:游戏人均支出344元,氪金王腾讯、网易现吃老本隐忧
下一篇:质量安全是“红线”是“底线”
  头条推荐
在虚拟世界探索中华文脉, VR互动在虚拟世界探索中华文脉, VR互动
“大美南阳”2015南阳书画界名家“大美南阳”2015南阳书画界名家
  智能汽车
勇探山林路漫漫,狂飙前行矫如虎勇探山林路漫漫,狂飙前行矫如虎
“虎啸”一出平四野,锐骐7虎啸重“虎啸”一出平四野,锐骐7虎啸重
中原战贫 小康梦圆丨拂来春风化中原战贫 小康梦圆丨拂来春风化
淅川:炎炎夏日战酷暑 浓浓关怀送淅川:炎炎夏日战酷暑 浓浓关怀送
驻马店:美不胜收稻田画 驻马店:美不胜收稻田画 
新华保险召开寿险与康养业务协同新华保险召开寿险与康养业务协同
荥阳企业多举并措开足马力加盟中荥阳企业多举并措开足马力加盟中
总投资7亿元!信阳新县将建万博城,总投资7亿元!信阳新县将建万博城,
河南一民企现金紧张要拿存货筹资河南一民企现金紧张要拿存货筹资
第25届郑交会开幕 海淘爆款多多 第25届郑交会开幕 海淘爆款多多
  热文排行
河南质量工程职业学院舞钢校区开工仪式隆重河南质量工程职业学院舞钢校区开工仪式隆重
黄河水利职业技术学院入选《2021中职教报告黄河水利职业技术学院入选《2021中职教报告
黄河水利职业技术学院召开2022年工作会议黄河水利职业技术学院召开2022年工作会议
深夜救人火苗中最靓的仔深夜救人火苗中最靓的仔
黄河水利职业技术学院入选第三批全国党建工黄河水利职业技术学院入选第三批全国党建工
黄河水利职业技术学院在党组织建设成效排名黄河水利职业技术学院在党组织建设成效排名
黄河水利职业技术学院党委书记周保平带队访黄河水利职业技术学院党委书记周保平带队访
“预案式”帮扶确保一个都不少 黄河水利职“预案式”帮扶确保一个都不少 黄河水利职
黄河水利职业技术学院的“校长茶座”黄河水利职业技术学院的“校长茶座”
黄河水利职业技术学院:聚焦机制打造特色就业黄河水利职业技术学院:聚焦机制打造特色就业
网站首页  关于我们  合作发展  广告服务  版权说明  联系我们  我要投稿
电话: 400-0820-315 E-mail: zlbg110@126.com 客服QQ:1849 189 600
科技创新网版权所有,未经书面授权禁止使用! Copyright © 2015-2023
主办单位:河南省豫南人众创空间有限公司   ICP备案号:豫ICP备2022015378号 技术支持:河南科加