DemoProtocol.hpp 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #pragma once
  2. #include <Cmnpp/IProtocol.h>
  3. #include "Cmnpp/Proto/HandleRef.h"
  4. #include "MqttTopic.h"
  5. #include "SnowflakeIdGenerator.hpp"
  6. #include "DataStorageMgmt.hpp"
  7. #include "ntool/cxx/chrono/passive_timer.hpp"
  8. #include <memepp/native.hpp>
  9. #include <spdlog/logger.h>
  10. #include <nlohmann/json.hpp>
  11. #include "AlgorithmDB.h"
  12. #include <map>
  13. #include <string>
  14. class DemoProtocol : public Cmnpp::IProtocol
  15. {
  16. public:
  17. void parameters_get(struct cmnproto_params* _param);
  18. cmnec_t initialize_cb(cmnproto_handle _handle);
  19. cmnec_t process_cb() override;
  20. cmnec_t terminate_cb();
  21. cmnerrno_t timer_cb() override;
  22. cmnec_t link_new_cb(cmnlink_ref _link);
  23. cmnec_t link_del_cb(cmnlink_ref _link);
  24. cmnec_t message_cb_response(memepp::string_view topic, memepp::string_view data);
  25. cmnec_t message_cb_request(memepp::string_view topic, memepp::string_view data);
  26. virtual cmnec_t message_cb(cmnlink_ref _link, cmniface_msgbody_const _msg) override;
  27. private:
  28. // 发送mqtt消息
  29. void pack_send_mqtt(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data);
  30. // 发送mqtt响应
  31. void pack_send_mqtt_reply(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data);
  32. // 发送mqtt消息
  33. void pack_send_mqtt_binary(const memepp::string& topic, const uint8_t* buf, size_t len);
  34. // 发送文本消息
  35. void pack_send_mqtt_text(const memepp::string& topic, const std::string &text);
  36. // 发送认证
  37. void pack_send_token();
  38. // 解包token
  39. void unpack_data_token(const nlohmann::json params);
  40. // 发送注册
  41. void pack_send_register();
  42. // 解包注册
  43. void unpack_data_register(const nlohmann::json params);
  44. // 发送校时
  45. void pack_send_ntp();
  46. // 解包校时
  47. void unpack_data_ntp(const nlohmann::json params);
  48. // 数据上报-发送实时数据
  49. void pack_send_real_data();
  50. // 数据上报-发送增量数据
  51. void pack_send_incremental_data();
  52. // 断点续传-发送离线数据
  53. void pack_send_offline_data();
  54. // 发送总召数据
  55. void pack_send_up_call();
  56. // 发送点位桥查询
  57. void query_send_bridge(std::vector<std::string> dotNumbers);
  58. // 解包点位桥结果
  59. void unpack_bridge_data(memepp::string_view data);
  60. // 解析总召指令
  61. void unpack_data_up_all(const nlohmann::json params);
  62. // 查询指令-状态查询
  63. void unpack_query_machine_status(const nlohmann::json params);
  64. // 查询指令-日志查询
  65. void unpack_query_log(const nlohmann::json params);
  66. // 算法下发解包
  67. void unpack_algorithm_data(const nlohmann::json params);
  68. // 发送算法报告
  69. void pack_send_algorithm_report();
  70. // 算法移除
  71. void unpack_algorithm_delete(const nlohmann::json params);
  72. //从全量包或者增量包中获取时间戳
  73. long get_time_from_string(const std::string dataTime);
  74. //发送数据定时任务,包括全量数据和增量数据
  75. void send_data_timer_task();
  76. // 处理事务任务, 主要是类似于每天某个时间点定时运行
  77. void run_service_task();
  78. //发送数据准备
  79. void pre_real_send_data();
  80. //计算磁盘利用率,最大值100
  81. double calc_disk_ratio();
  82. //平台下发点位号转换成网关平台测点ID
  83. void PointNo2PointId(std::vector<std::string> pointNo, std::vector<std::string> &pointId);
  84. // 初始化topic列表
  85. void init_topic_map();
  86. // 订阅topic
  87. void subscribe_topic();
  88. bool readLinesFromFile(const memepp::native_string& filePath, int startLine, int numLines, uint64_t& totalLines, std::vector<nlohmann::json>& result);
  89. //根据遥测遥信数量,计算发送队列长度
  90. size_t cacl_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count);
  91. //根据遥测遥信数量,计算增量数据最大发送队列长度
  92. size_t cacl_inc_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count);
  93. //发送算法报告定时任务
  94. void send_algorithm_report_timer_task();
  95. //从数据库中读取已经部署的算法信息
  96. void load_algorithm_from_db(memepp::native_string filePath);
  97. private:
  98. typedef enum
  99. {
  100. gateway_init,
  101. gateway_online,
  102. gateway_offline
  103. }ET_gateway_status;
  104. typedef struct
  105. {
  106. AlgorithmDB::Record AlgotithmInfo;
  107. Cmnpp::Calc::Expr exprObject;
  108. }FormulasInfo;
  109. private:
  110. ET_gateway_status gateway_status;
  111. Cmnpp::Proto::HandleRef m_handle;
  112. Cmnpp::Proto::LineRef m_line;
  113. Cmnpp::Link m_currentLink;
  114. //std::shared_ptr<spdlog::logger> logger_;
  115. // 令牌
  116. std::string m_token;
  117. // 当前目录
  118. memepp::native_string currentDir;
  119. char sm4_key_screct[33] = "9dd2ecbe933c80a7afbba5403b2b5626";
  120. // 设备序号
  121. std::string m_device_sn;
  122. SnowflakeIdGenerator idGenerator = SnowflakeIdGenerator(1, 2);
  123. bool m_registered = false;
  124. //当前时间窗口(5分钟)时间戳。1970 年 1 月 1 日(08:00:00 GMT)至当前时间的总毫秒数。
  125. uint64_t CurrentRealDataTime;
  126. //增量包时间窗口内的数据包的顺序号
  127. uint16_t IncDataPackIndex;
  128. //全量数据缓存。
  129. std::vector<uint8_t> vec_real_data_buff;
  130. //增量数据缓存。
  131. std::vector<uint8_t> vec_inc_real_data_buff;
  132. // topic定义
  133. std::map<std::string, mqtt::MqttTopic> topic_map_;
  134. // 事件mid缓存
  135. std::map<std::string, std::string> mid_map_;
  136. // 点位缓存,key 点位号,value 测点号
  137. std::map<std::string, std::string> point_bridge_map_;
  138. // 采集点缓存,key 采集点号,value 点位号
  139. std::map<std::string, std::string> m_collect_point_map;
  140. //存储数据类
  141. DataStorageMgmt m_StorageData;
  142. //历史数据缓存,用于定时发送
  143. std::vector<ghc::filesystem::path> m_his_data_file_list;
  144. //总召数据发送序号
  145. size_t m_upcall_send_index;
  146. //离线数据文件清单
  147. std::vector<ghc::filesystem::path> m_offline_data_file_list;
  148. //当前发送离线数据存储文件的序号
  149. size_t m_offline_file_index;
  150. //离线数据缓存,用于定时发送
  151. std::vector<std::vector<uint8_t>> m_offline_data_cache;
  152. //发送离线数据序号
  153. size_t m_offline_data_send_index;
  154. //该次断点续传标识
  155. uint64_t m_offline_compensationId;
  156. //标识是否正在发送上一次离线缓存数据。如果发送离线数据过程中,又出现了离线,回重新缓存数据,但上次读取的缓存数据还没发完
  157. //所以需要先完缓存数据,并且文件需要不要加1
  158. bool m_is_sending_cache_data;
  159. // 读取文件大小
  160. double getFileSizeInMB(const std::string& filename);
  161. // 注册定时器
  162. ntool::chrono::passive_timer auth_passive_timer_;
  163. // 发送全量数据定时器
  164. ntool::chrono::passive_timer m_send_total_data_timer;
  165. // 增量数据检查定时器
  166. ntool::chrono::passive_timer m_send_incremental_data_timer;
  167. // 发送离线数据定时器
  168. ntool::chrono::passive_timer m_send_offline_data_timer;
  169. // 发送总召数据定时器
  170. ntool::chrono::passive_timer m_up_call_timer;
  171. // 常用事务定时器
  172. ntool::chrono::passive_timer service_task_timer;
  173. // 算法结果上报定时器
  174. ntool::chrono::passive_timer m_algorithm_task_timer;
  175. // 事务定时器运行时间
  176. uint64_t run_service_timestamp = 0;
  177. // 上次获取token时间
  178. uint64_t last_get_token_timestamp = 0;
  179. //全量遥信数据缓存列表,每个转发设备一个换成,用于比较数据是否有变化
  180. std::vector<std::vector<float>> YC_data_cache_list;
  181. //全量遥测数据缓存,用于比较数据是否有变化
  182. std::vector<std::vector<uint8_t>> YX_data_cache_list;
  183. std::map<std::string, FormulasInfo> m_mapFormulasInfo;
  184. AlgorithmDB m_algorithmDB;
  185. };