DemoProtocol.cpp 71 KB


  1. #include "DemoProtocol.hpp"
  2. #include "CommonFunction.hpp"
  3. #include "Functions.hpp"
  4. #include <cmn/proto/params.h>
  5. #include <cmn/proto/ui/option_list.h>
  6. #include <megopp/util/scope_cleanup.h>
  7. #include <cmn/plugin_framework/plugin.h>
  8. #include <Cmnpp/Link.h>
  9. #include <chrono>
  10. #include <Cmnpp/IFace/SubPubMessage.h>
  11. #include <Cmnpp/Proto/Point.h>
  12. #include <Cmnpp/Forward/Point.h>
  13. #include <string>
  14. #include "cmn/proto/log.h"
  15. #include <memepp/convert/common_def.hpp>
  16. #include <memepp/convert/std/wstring.hpp>
  17. #include <memepp/convert/std/string.hpp>
  18. #include <memepp/convert/self.hpp>
  19. #include <memepp/convert/fmt.hpp>
  20. #include <memepp/native.hpp>
  21. #include <mego/hardware/memory/memory.h>
  22. #include <mego/util/os/windows/windows_simplify.h>
  23. #include <mego/hardware/disk/disk.h>
  24. #include "sm4.hpp"
  25. #include <spdlog/spdlog.h>
  26. #include <spdlog/async.h>
  27. #include <spdlog/sinks/daily_file_sink.h>
  28. #include <fmt/format.h>
  29. #define DEBUG
  30. using namespace std;
  31. static int testindex = 0;
  32. #define TIME_1_SECOND 1000
  33. #define TIME_2_SECOND 2000
  34. #define TIME_5_MINUTES 300000 //5*60*1000
  35. #define MAX_DATA_LEN 160000
  36. #define INVAILD_SEND_OL_INDEX 0xFFFFFFFFFFFFFFFF
  37. #if MG_OS__WIN_AVAIL
  38. static float __CalculateCPULoad(unsigned long long idleTicks, unsigned long long totalTicks)
  39. {
  40. static unsigned long long _previousTotalTicks = 0;
  41. static unsigned long long _previousIdleTicks = 0;
  42. unsigned long long totalTicksSinceLastTime = totalTicks - _previousTotalTicks;
  43. unsigned long long idleTicksSinceLastTime = idleTicks - _previousIdleTicks;
  44. float ret = 1.0f - ((totalTicksSinceLastTime > 0) ? ((float)idleTicksSinceLastTime) / totalTicksSinceLastTime : 0);
  45. _previousTotalTicks = totalTicks;
  46. _previousIdleTicks = idleTicks;
  47. return ret;
  48. }
  49. #endif
  50. #if MG_OS__WIN_AVAIL
  51. static unsigned long long __FileTimeToInt64(const FILETIME& ft)
  52. {
  53. return (((unsigned long long)(ft.dwHighDateTime)) << 32) | ((unsigned long long)ft.dwLowDateTime);
  54. }
  55. #endif
  56. void DemoProtocol::parameters_get(cmnproto_params* _param)
  57. {
  58. cmnproto_params_set(
  59. _param,
  60. u8"MQTT",
  61. CMNPROTO_VER, //< SDK版本
  62. MEGO__MAKE_VERSION_NUMBER(0, 0, 1), //< 本协议驱动版本
  63. cmnenum_duplex_full, //< 指定协议驱动的工作模式为全双工
  64. cmnenum_data_process_forward, //< 指定协议驱动的数据处理模式为采集
  65. cmnenum_linkmode_network, //< 指定协议驱动的连接模式为网络连接
  66. cmnenum_linktype_mqttv3, //< 指定协议驱动的连接类型为MQTT
  67. cmnenum_linkquantity_singleline_singlelink, //< 指定协议驱动的连接数量为单通道单连接
  68. u8"南网网关MQTT协议"
  69. );
  70. cmnproto_params_forwardpage_append(
  71. _param,
  72. "PAGE1",
  73. u8"遥测",
  74. NULL,
  75. NULL
  76. );
  77. auto optlist1 = cmnproto_ui_optlist__create_v2(
  78. "PAGE2PARAM1",
  79. u8"转发页的2参数1",
  80. u8"转发页的2参数1的描述",
  81. 0,
  82. 0
  83. );
  84. MEGOPP_UTIL__ON_SCOPE_CLEANUP([&] { cmnproto_ui_optlist_destruct(&optlist1); });
  85. // 为转发页2里的转发变量添加参数
  86. cmnproto_params_forwardpage_append(
  87. _param,
  88. "PAGE2",
  89. u8"遥信",
  90. NULL,
  91. optlist1
  92. );
  93. // 设置参数
  94. auto optList = cmnproto_ui_optlist__create_by_str_v2(
  95. u8"PLAT_UID",
  96. u8"平台授权用户ID",
  97. u8"云平台提供的授权用户ID",
  98. u8"");
  99. cmnproto_ui_optlist__append_by_str_v2(
  100. optList,
  101. u8"PLAT_PW",
  102. u8"平台授权用户密码",
  103. u8"",
  104. u8"");
  105. cmnproto_ui_optlist__append_by_str_v2(
  106. optList,
  107. u8"SM2_PUB_KEY",
  108. u8"平台公钥SM2",
  109. u8"",
  110. u8"");
  111. cmnproto_ui_optlist__append_by_str_v2(
  112. optList,
  113. u8"SM4_SECRTY_KEY",
  114. u8"平台SM4密钥",
  115. u8"",
  116. u8"");
  117. cmnproto_ui_optlist__append_by_str_v2(
  118. optList,
  119. u8"STATION_ID",
  120. u8"站点ID",
  121. u8"",
  122. u8"");
  123. cmnproto_ui_optlist__append_by_str_v2(
  124. optList,
  125. u8"STATION_NICK",
  126. u8"站点昵称",
  127. u8"用于点位桥协议,yw、fr、lb",
  128. u8"");
  129. cmnproto_ui_optlist__append_by_str_v2(
  130. optList,
  131. u8"deviceSN",
  132. u8"设备编号",
  133. u8"",
  134. u8"");
  135. cmnproto_ui_optlist__append_by_str_v2(
  136. optList,
  137. u8"deviceName",
  138. u8"设备名称",
  139. u8"",
  140. u8"");
  141. cmnproto_ui_optlist__append_by_str_v2(
  142. optList,
  143. u8"deviceType",
  144. u8"设备类型",
  145. u8"",
  146. u8"");
  147. cmnproto_ui_optlist__append_by_str_v2(
  148. optList,
  149. u8"model",
  150. u8"产品型号",
  151. u8"",
  152. u8"");
  153. cmnproto_ui_optlist__append_by_str_v2(
  154. optList,
  155. u8"location",
  156. u8"安装位置描述",
  157. u8"",
  158. u8"");
  159. cmnproto_ui_optlist__append_by_str_v2(
  160. optList,
  161. u8"version",
  162. u8"程序版本号",
  163. u8"",
  164. u8"");
  165. cmnproto_ui_optlist__append_by_str_v2(
  166. optList,
  167. u8"ENABLE_NTP",
  168. u8"平台校时",
  169. u8"1:启用 0: 禁用",
  170. u8"1");
  171. MEGOPP_UTIL__ON_SCOPE_CLEANUP([&optList] { cmnproto_ui_optlist_destruct(&optList); });
  172. cmnproto_params_lineopt_append(
  173. _param,
  174. u8"通信参数",
  175. optList
  176. );
  177. // 该协议支持的TCP链接类型
  178. cmnproto_params_support_linkmode_append(_param, cmnenum_linkmode_network, cmnenum_linktype_tcp_client);
  179. cmnproto_params_support_linkmode_append(_param, cmnenum_linkmode_network, cmnenum_linktype_tcp_server);
  180. }
  181. cmnec_t DemoProtocol::initialize_cb(cmnproto_handle _handle)
  182. {
  183. m_handle = _handle;
  184. cmnproto_log_debug(m_handle.native(), "Seq 1");
  185. //初始化历史数据存储文件
  186. auto folderTmp = m_handle.lineDataStoreDirectoryPath();
  187. cmnproto_log_debug(m_handle.native(), "Seq 2");
  188. auto folderPath = mm_to<memepp::native_string>(folderTmp);
  189. cmnproto_log_debug(m_handle.native(), "Seq 3");
  190. auto logDirPath = mm_to<memepp::native_string>(folderTmp + memepp::string_view{ "/logs" });
  191. cmnproto_log_debug(m_handle.native(), "Seq 4");
  192. auto logFilePath = mm_to<memepp::native_string>(folderTmp + memepp::string_view{ "/logs/proto.log" });
  193. cmnproto_log_debug(m_handle.native(), "Seq 5");
  194. ghc::filesystem::create_directories(logDirPath);
  195. cmnproto_log_debug(m_handle.native(), "Seq 6");
  196. //spdlog::init_thread_pool(2048, 1);
  197. cmnproto_log_debug(m_handle.native(), "Seq 7");
  198. //spdlog::flush_every(std::chrono::seconds(3));
  199. cmnproto_log_debug(m_handle.native(), "Seq 8");
  200. //auto logger = spdlog::create_async<spdlog::sinks::daily_file_sink_mt>(
  201. // "proto", logFilePath, 0, 0, false, 30);
  202. cmnproto_log_debug(m_handle.native(), "Seq 9");
  203. //logger->set_formatter(
  204. // std::unique_ptr<spdlog::formatter>(new spdlog::pattern_formatter("[%Y-%m-%d %T.%f %z] [tid %t] [%^%l%$] [%n] %v")));
  205. cmnproto_log_debug(m_handle.native(), "Seq 10");
  206. //logger_ = logger;
  207. cmnproto_log_debug(m_handle.native(), "Seq 11");
  208. gateway_status = gateway_init;
  209. cmnproto_log_debug(m_handle.native(), "Seq 12");
  210. m_handle.timerStart(100);
  211. cmnproto_log_debug(m_handle.native(), "Seq 13");
  212. // 设置每一个定时器的时间设置
  213. m_send_total_data_timer.set_interval(TIME_5_MINUTES);
  214. cmnproto_log_debug(m_handle.native(), "Seq 14");
  215. //m_send_total_data_timer.set_interval(10000);
  216. m_send_incremental_data_timer.set_interval(TIME_1_SECOND);
  217. cmnproto_log_debug(m_handle.native(), "Seq 15");
  218. auth_passive_timer_.set_interval(5 * 1000);
  219. cmnproto_log_debug(m_handle.native(), "Seq 16");
  220. m_send_offline_data_timer.set_interval(300);
  221. cmnproto_log_debug(m_handle.native(), "Seq 17");
  222. m_up_call_timer.set_interval(1000);
  223. cmnproto_log_debug(m_handle.native(), "Seq 18");
  224. service_task_timer.set_interval(5 * 1000);
  225. cmnproto_log_debug(m_handle.native(), "Seq 19");
  226. m_algorithm_task_timer.set_interval(TIME_1_SECOND);
  227. cmnproto_log_debug(m_handle.native(), "Seq 20");
  228. cmnerrno_t ret = 0;
  229. ret = m_handle.lineGet(m_line);
  230. if (ret) {
  231. //logger_->debug("Channel get failed! ");
  232. return CMNERRNO_ERROR;
  233. }
  234. currentDir = getAppDir(folderPath);
  235. m_StorageData.init_data_storage(folderPath,1);
  236. m_offline_data_send_index = INVAILD_SEND_OL_INDEX;
  237. m_offline_file_index = 0;
  238. return 0;
  239. }
  240. cmnec_t DemoProtocol::process_cb()
  241. {
  242. return CMNERRNO_OK;
  243. }
  244. void DemoProtocol::init_topic_map()
  245. {
  246. const memepp::string station_id = m_line.getOptionString("STATION_ID");
  247. const memepp::string station_nick = m_line.getOptionString("STATION_NICK");
  248. // 请求token
  249. topic_map_.insert(std::make_pair("token", mqtt::MqttTopic("/oauth/token", station_id.c_str())));
  250. // 注册
  251. topic_map_.insert(std::make_pair("register", mqtt::MqttTopic("/register", station_id.c_str())));
  252. // 校时
  253. topic_map_.insert(std::make_pair("timecheck", mqtt::MqttTopic("/timecheck", station_id.c_str())));
  254. // 实时上传
  255. topic_map_.insert(std::make_pair("data_up", mqtt::MqttTopic("/data/up", station_id.c_str(), false)));
  256. // 补发上传
  257. topic_map_.insert(std::make_pair("data_compensation",mqtt::MqttTopic("/data/up-compensation", station_id.c_str(), false)));
  258. // 总召
  259. topic_map_.insert(std::make_pair("up_call", mqtt::MqttTopic("/data/up-call", station_id.c_str(), true, false)));
  260. // 设备查询
  261. topic_map_.insert(std::make_pair("status", mqtt::MqttTopic("/command/status", station_id.c_str(), true, false)));
  262. // 日志
  263. topic_map_.insert(std::make_pair("log", mqtt::MqttTopic("/command/log", station_id.c_str(), true, false)));
  264. // 算法下发
  265. topic_map_.insert(std::make_pair("algorithm_send", mqtt::MqttTopic("/command/algorithm_send", station_id.c_str(), true, false)));
  266. // 算法报告上送
  267. topic_map_.insert(std::make_pair("algorithm_data", mqtt::MqttTopic("/report/algorithm_data", station_id.c_str(), false)));
  268. // 阈值算法移除
  269. topic_map_.insert(std::make_pair("algorithm_del", mqtt::MqttTopic("/command/algorithm_del", station_id.c_str(), true, false)));
  270. // 点位桥
  271. topic_map_.insert(std::make_pair("bridge", mqtt::MqttTopic("bridge", station_nick.c_str())));
  272. }
  273. //离线信号
  274. cmnec_t DemoProtocol::terminate_cb()
  275. {
  276. return 0;
  277. }
  278. cmnerrno_t DemoProtocol::timer_cb()
  279. {
  280. if (auth_passive_timer_.timing_continue(100))
  281. return 0;
  282. if (service_task_timer.timing_continue(100))
  283. return 0;
  284. if (m_send_total_data_timer.timing_continue(100))
  285. return 0;
  286. if (m_send_incremental_data_timer.timing_continue(100))
  287. return 0;
  288. if (m_up_call_timer.timing_continue(100))
  289. return 0;
  290. if (m_send_offline_data_timer.timing_continue(100))
  291. return 0;
  292. if (m_algorithm_task_timer.timing_continue(100))
  293. return 0;
  294. return CMNERRNO_OK;
  295. }
  296. cmnec_t DemoProtocol::link_new_cb(cmnlink_ref _link)
  297. {
  298. cmnproto_log_debug(m_handle.native(), "MQTT Channel connected!");
  299. m_currentLink = _link;
  300. // 初始化topic列表
  301. init_topic_map();
  302. // 订阅topic列表
  303. subscribe_topic();
  304. // 定时发送认证
  305. auth_passive_timer_.start();
  306. auth_passive_timer_.on([this]
  307. {
  308. auto now = getNow();
  309. if (m_token.empty() || last_get_token_timestamp == 0 || ((now - last_get_token_timestamp) > (1000*60*20)))
  310. {
  311. pack_send_token();
  312. }
  313. else
  314. {
  315. if (m_device_sn.empty())
  316. {
  317. pack_send_register();
  318. }
  319. }
  320. });
  321. service_task_timer.start();
  322. service_task_timer.on([this]
  323. {
  324. run_service_task();
  325. });
  326. return 0;
  327. }
  328. cmnec_t DemoProtocol::link_del_cb(cmnlink_ref _link)
  329. {
  330. if (m_currentLink == _link)
  331. {
  332. //m_send_offline_data_timer.cancel();
  333. m_token = "";
  334. m_device_sn = "";
  335. gateway_status = gateway_offline;
  336. m_StorageData.create_offline_data_file();
  337. cmnproto_log_debug(m_handle.native(), "Gateway offline");
  338. m_currentLink.reset();
  339. }
  340. return 0;
  341. }
  342. cmnec_t DemoProtocol::message_cb_response(memepp::string_view topic, memepp::string_view data)
  343. {
  344. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), data.to_string().c_str());
  345. nlohmann::json data_json = nlohmann::json::parse(data);
  346. auto code = data_json["code"].get<int>();
  347. auto msg = data_json["msg"].get<std::string>();
  348. auto mid = std::to_string(data_json["mid"].get<uint64_t>()) ;
  349. auto type = data_json["type"].get<std::string>();
  350. // 获取当前缓存中的对应mid
  351. if (mid_map_.find(type) == mid_map_.end())
  352. {
  353. return CMNERRNO_OK;
  354. }
  355. if (mid_map_[type] != mid)
  356. {
  357. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== this mid= %s, that:%s ", type.c_str(), mid_map_[type].c_str(),mid.c_str());
  358. return CMNERRNO_OK;
  359. }
  360. // 移除mid
  361. mid_map_.erase(type);
  362. if (code != 200)
  363. {
  364. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== Message Error,Error Code:%d, Tip:%s ", topic.to_string().c_str(), code,msg.c_str());
  365. return CMNERRNO_ERROR;
  366. }
  367. auto param = data_json["param"].get<std::string>();
  368. // 解密
  369. auto decryptParam = sm4::sm4decodehexstr(param, std::string(sm4_key_screct));
  370. #ifdef DEBUG
  371. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), data.to_string().c_str());
  372. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), decryptParam.c_str());
  373. #else
  374. cmnproto_log_debug(m_handle.native(), "MQTT [%s]<==", topic.to_string().c_str());
  375. #endif
  376. nlohmann::json params_json = nlohmann::json::parse(decryptParam);
  377. if (topic.to_string() == topic_map_["token"].get_subscribe_topic())
  378. {
  379. unpack_data_token(params_json);
  380. }
  381. else if (topic.to_string() == topic_map_["register"].get_subscribe_topic())
  382. {
  383. unpack_data_register(params_json);
  384. }
  385. else if (topic.to_string() == topic_map_["timecheck"].get_subscribe_topic())
  386. {
  387. unpack_data_ntp(params_json);
  388. }
  389. return CMNERRNO_OK;
  390. }
  391. cmnec_t DemoProtocol::message_cb_request(memepp::string_view topic, memepp::string_view data)
  392. {
  393. nlohmann::json data_json = nlohmann::json::parse(data.begin(), data.end());
  394. auto mid = std::to_string(data_json["mid"].get<uint64_t>()) ;
  395. auto type = data_json["type"].get<std::string>();
  396. // 缓存mid
  397. mid_map_[type] = mid;
  398. // 解密param
  399. auto param = data_json["param"];
  400. // 解密
  401. auto decryptParam = sm4::sm4decodehexstr(param, std::string(sm4_key_screct));
  402. if(decryptParam.empty())
  403. {
  404. decryptParam = "{}";
  405. }
  406. #ifdef DEBUG
  407. cmnproto_log_debug(m_handle.native(), "MQTT [%s] <= %s ", topic.to_string().c_str(), decryptParam.c_str());
  408. #else
  409. cmnproto_log_debug(m_handle.native(), "MQTT [%s] <= %s ", topic.to_string().c_str(), decryptParam.c_str());
  410. #endif
  411. nlohmann::json params_json = nlohmann::json::parse(decryptParam);
  412. // 针对不同的topic进行处理
  413. // 平台总召
  414. if (topic.to_string() == topic_map_["up_call"].get_publish_topic())
  415. {
  416. unpack_data_up_all(params_json);
  417. }
  418. // 平台发起状态查询
  419. else if (topic.to_string() == topic_map_["status"].get_publish_topic())
  420. {
  421. unpack_query_machine_status(params_json);
  422. }
  423. // 平台发起日志查询
  424. else if (topic.to_string() == topic_map_["log"].get_publish_topic())
  425. {
  426. unpack_query_log(params_json);
  427. }
  428. // 算法下发
  429. else if (topic.to_string() == topic_map_["algorithm_send"].get_publish_topic())
  430. {
  431. unpack_algorithm_data(params_json);
  432. }
  433. else if (topic.to_string() == topic_map_["algorithm_del"].get_publish_topic())
  434. {
  435. unpack_algorithm_delete(params_json);
  436. }
  437. return CMNERRNO_OK;
  438. }
  439. cmnec_t DemoProtocol::message_cb(cmnlink_ref _link, cmniface_msgbody_const _msg)
  440. {
  441. auto msg = Cmnpp::IFace::SubPubMessage::Ref(_msg);
  442. auto data = msg.dataView(); // 接收到的数据,其生命周期与msg相同
  443. auto topic = msg.topicView(); // 主题,其生命周期与msg
  444. //根据不同的topic,解析不同的平台下发报文
  445. try
  446. {
  447. // 网关发布,平台响应的
  448. if (topic.contains("reply"))
  449. {
  450. return this->message_cb_response(topic, data);
  451. }
  452. // 点位桥
  453. else if (topic.to_string() == topic_map_["bridge"].get_subscribe_topic()) {
  454. cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]<= %s ", topic.to_string().c_str(), data.to_string().c_str());
  455. this->unpack_bridge_data(data);
  456. return CMNERRNO_OK;
  457. }
  458. // 平台发布,网关响应的
  459. else
  460. {
  461. return this->message_cb_request(topic, data);
  462. }
  463. }
  464. catch (nlohmann::json::parse_error& e)
  465. {
  466. Debug_Print_Error(e.what());
  467. }
  468. return CMNERRNO_OK;
  469. }
  470. // 发送mqtt消息
  471. void DemoProtocol::pack_send_mqtt(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data)
  472. {
  473. auto time_stamp = getNow();
  474. auto mid = idGenerator.nextId();
  475. nlohmann::json request_data = {
  476. {u8"mid", mid},
  477. {u8"date", time_stamp},
  478. {u8"type", type.c_str()},
  479. {u8"stationId", m_line.getOptionString("STATION_ID").c_str()}
  480. };
  481. // 如果当前是注册,就采用国密2进行加密
  482. std::string todo_sign_str;
  483. if (type == "EVENT_PROVE")
  484. {
  485. char encrypted_str[5120] = {'\0'};
  486. const memepp::string sm2_pub_key = m_line.getOptionString("SM2_PUB_KEY");
  487. Functions::util_sm2_encrypt(data.dump().c_str(), sm2_pub_key.c_str(), encrypted_str);
  488. request_data[u8"param"] = encrypted_str;
  489. todo_sign_str.append(type.c_str()).append(request_data[u8"param"].get<std::string>()).append(std::to_string(time_stamp));
  490. }
  491. // 国密4加密
  492. else
  493. {
  494. std::string encryptString = sm4::sm4encodestrhex(data.dump(), std::string(sm4_key_screct));
  495. //char encrypted_str[5120] = { '\0' };
  496. // Functions::util_sm4_encrypt(data.dump().c_str(), sm4_key_screct, encrypted_str);
  497. request_data[u8"param"] = encryptString.c_str();
  498. request_data[u8"token"] = m_token.c_str();
  499. todo_sign_str.append(m_token).append(type.c_str()).append(request_data[u8"param"].get<std::string>()).append(std::to_string(time_stamp));
  500. }
  501. // 缓存mid
  502. mid_map_[type.c_str()] = std::to_string(mid);
  503. char sign[65];
  504. Functions::sm3_sign((uint8_t*)(todo_sign_str.c_str()), static_cast<int>(todo_sign_str.length()), sign);
  505. const std::string cppString(sign);
  506. request_data["sign"] = cppString.c_str();
  507. // 发送token获取
  508. Cmnpp::IFace::SubPubMessage msg;
  509. msg.cmd(cmniface_msgbody_command_publish);
  510. msg.data(mm_view(request_data.dump().c_str(), request_data.dump().length())); //< 数据
  511. msg.topic(topic);
  512. m_handle.send(m_currentLink, msg);
  513. #ifdef DEBUG
  514. cmnproto_log_debug(m_handle.native(), "MQTT [%s] => %s ", topic.c_str(), request_data.dump().c_str());
  515. #else
  516. cmnproto_log_debug(m_handle.native(), "MQTT [%s] => ", topic.c_str());
  517. #endif
  518. }
  519. void DemoProtocol::pack_send_mqtt_reply(const memepp::string& topic, const memepp::string& type,
  520. const nlohmann::json& data)
  521. {
  522. auto time_stamp = getNow();
  523. nlohmann::json request_data = {};
  524. request_data["mid"] = mid_map_[type.c_str()];
  525. request_data["date"] = time_stamp;
  526. request_data["type"] = type.c_str();
  527. request_data["stationId"] = m_line.getOptionString("STATION_ID").c_str();
  528. request_data["msg"] = "success";
  529. request_data["code"] = 200;
  530. auto originString = data.dump();
  531. const std::string encryptString = sm4::sm4encodestrhex(originString, std::string(sm4_key_screct));
  532. request_data[u8"param"] = encryptString.c_str();
  533. // 缓存mid
  534. mid_map_.erase(type.c_str());
  535. // 计算签名值
  536. std::string todo_sign_str;
  537. todo_sign_str.append(type.c_str())
  538. .append("200")
  539. .append(request_data[u8"msg"].get<std::string>())
  540. .append(request_data[u8"param"].get<std::string>())
  541. .append(std::to_string(time_stamp));
  542. char sign[65];
  543. Functions::sm3_sign((uint8_t*)(todo_sign_str.c_str()), static_cast<int>(todo_sign_str.length()), sign);
  544. const std::string cppString(sign);
  545. request_data["sign"] = cppString.c_str();
  546. // 发送token获取
  547. Cmnpp::IFace::SubPubMessage msg;
  548. msg.cmd(cmniface_msgbody_command_publish);
  549. msg.data(mm_view(request_data.dump().c_str(), request_data.dump().length())); //< 数据
  550. msg.topic(topic);
  551. m_handle.send(m_currentLink, msg);
  552. #ifdef DEBUG
  553. cmnproto_log_debug(m_handle.native(), "MQTT [%s]=> %s ", topic.c_str(), request_data.dump().c_str());
  554. #else
  555. cmnproto_log_debug(m_handle.native(), "MQTT [%s] => ", topic.c_str());
  556. #endif
  557. }
  558. //发送二进制格式数据
  559. void DemoProtocol::pack_send_mqtt_binary(const memepp::string& topic,const uint8_t* buf, size_t len)
  560. {
  561. auto data = mm_view(buf, len);
  562. Cmnpp::IFace::SubPubMessage msg;
  563. msg.cmd(cmniface_msgbody_command_publish);
  564. msg.data(data); //< 数据
  565. msg.topic(topic);
  566. m_handle.send(m_currentLink, msg);
  567. }
  568. //发送二进制格式数据
  569. void DemoProtocol::pack_send_mqtt_text(const memepp::string& topic, const std::string &text)
  570. {
  571. auto data = mm_view(text.c_str(), text.length());
  572. Cmnpp::IFace::SubPubMessage msg;
  573. msg.cmd(cmniface_msgbody_command_publish);
  574. msg.data(data); //< 数据
  575. msg.topic(topic);
  576. m_handle.send(m_currentLink, msg);
  577. #ifdef DEBUG
  578. cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]=> %s ", topic.c_str(), text.c_str());
  579. #else
  580. cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]=> ", topic.c_str());
  581. #endif
  582. }
  583. // 发送认证
  584. void DemoProtocol::pack_send_token()
  585. {
  586. // 平台用户id
  587. const memepp::string userID = m_line.getOptionString("PLAT_UID");
  588. // 平台用户密码
  589. const memepp::string pwd = m_line.getOptionString("PLAT_PW");
  590. // 生成sm4密钥
  591. // uint8_t t[32];
  592. // Functions::util_rand_bytes(t, 16);
  593. // char str_key[33];
  594. // Functions::uint8_to_hex_string(t, 16, sm4_key_screct);
  595. // sm4_key_screct[32] = '\0';
  596. const memepp::string sm4SecrtyKey = m_line.getOptionString("SM4_SECRTY_KEY");
  597. //if (!sm4SecrtyKey.empty()) {
  598. // memcpy(sm4_key_screct, sm4SecrtyKey.c_str(), 32);
  599. // sm4_key_screct[32] = '\0';
  600. //}
  601. const nlohmann::json data = {
  602. {u8"userID", userID.c_str()},
  603. {u8"pwd", pwd.c_str()},
  604. {u8"secretKey", sm4_key_screct},
  605. };
  606. pack_send_mqtt(topic_map_["token"].get_publish_topic(), u8"EVENT_PROVE", data);
  607. }
  608. /**
  609. * @brief 解析数据令牌
  610. * @param params 响应参数
  611. */
  612. void DemoProtocol::unpack_data_token(const nlohmann::json params)
  613. {
  614. // 从传入的参数中获取token并赋值给成员变量m_token
  615. this->m_token = params["token"].get<std::string>();
  616. cmnproto_log_debug(m_handle.native(), "Token get success! Token:%s ", m_token.c_str());
  617. auto time_stamp = getNow();
  618. this->last_get_token_timestamp = time_stamp;
  619. // std::vector<std::string> dotNums;
  620. // dotNums.push_back("3015");
  621. // 发送点位查询测试
  622. //this->query_send_bridge(dotNums);
  623. //// 发送日志查询测试
  624. //nlohmann::json logJson = {};
  625. //logJson["date"] = "2024-01-01";
  626. //logJson["index"] =1;
  627. //logJson["size"] = 50;
  628. //this->unpack_query_log(logJson);
  629. }
  630. //发送注册
  631. void DemoProtocol::pack_send_register()
  632. {
  633. const nlohmann::json data = {
  634. // {u8"deviceSN", m_line.getOptionString("deviceSN").c_str()},
  635. {u8"deviceSN", "uuhhgj-980760-88976"},
  636. {u8"deviceName", m_line.getOptionString("deviceName").c_str()},
  637. {u8"deviceType", m_line.getOptionString("deviceType").c_str()},
  638. {u8"model", m_line.getOptionString("model").c_str()},
  639. {u8"location", m_line.getOptionString("location").c_str()},
  640. {u8"version", m_line.getOptionString("version").c_str()},
  641. {u8"stationId", m_line.getOptionString("STATION_ID").c_str()}
  642. };
  643. pack_send_mqtt(topic_map_["register"].get_publish_topic(), u8"EVENT_REGISTER", data);
  644. }
  645. /**
  646. * @brief 解析设备注册
  647. * @param params 响应参数
  648. */
  649. void DemoProtocol::unpack_data_register(const nlohmann::json params)
  650. {
  651. this->m_device_sn = params["deviceSN"].get<std::string>();
  652. this->m_registered = true;
  653. // 重置业务记时器
  654. this->run_service_timestamp = 0;
  655. cmnproto_log_debug(m_handle.native(), "Device Register success device Number:%s ,Online....", m_device_sn.c_str());
  656. //如果之前是离线状态,则停止存储离线数据
  657. if (gateway_status == gateway_offline)
  658. {
  659. m_StorageData.stop_save_offline();
  660. }
  661. gateway_status = gateway_online;
  662. pre_real_send_data();
  663. // 加载存储的算法
  664. if (m_mapFormulasInfo.size() == 0 )
  665. {
  666. auto folderTmp = m_handle.lineDataStoreDirectoryPath();
  667. auto folderPath = mm_to<memepp::native_string>(folderTmp);
  668. load_algorithm_from_db(folderPath);
  669. }
  670. send_algorithm_report_timer_task();
  671. //有离线数据文件,说明曾经离线了,需要重新开始补发。
  672. m_offline_file_index = 0;
  673. m_offline_data_file_list.clear();
  674. m_StorageData.get_offline_flie_list(m_offline_data_file_list);
  675. if (m_offline_data_file_list.size() > 0)
  676. {
  677. //说明上次数据还没发送完成
  678. if (m_offline_data_send_index != INVAILD_SEND_OL_INDEX)
  679. {
  680. m_is_sending_cache_data = true;
  681. }
  682. pack_send_offline_data();
  683. }
  684. }
  685. //发送数据准备
  686. void DemoProtocol::pre_real_send_data()
  687. {
  688. YC_data_cache_list.clear();
  689. YX_data_cache_list.clear();
  690. //初始化实时数据变化上送缓存
  691. for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
  692. {
  693. auto table = (*eqptIt).source().forwardTable();
  694. if (!table) {
  695. continue;
  696. }
  697. auto page1 = table.pageGet("PAGE1");
  698. if (page1.pointSize() > 0)
  699. {
  700. //创建全量数据缓存,用于比较数据变化
  701. vector<float> vecYCData;
  702. YC_data_cache_list.push_back(vecYCData);
  703. }
  704. auto page2 = table.pageGet("PAGE2");
  705. if (page2.pointSize() > 0)
  706. {
  707. //创建全量数据缓存,用于比较数据变化
  708. vector<uint8_t> vecYXData;
  709. YX_data_cache_list.push_back(vecYXData);
  710. }
  711. }
  712. pack_send_real_data();
  713. send_data_timer_task();
  714. }
  715. //发送算法报告定时器
  716. void DemoProtocol::send_algorithm_report_timer_task()
  717. {
  718. m_algorithm_task_timer.start();
  719. m_algorithm_task_timer.on([this]
  720. {
  721. if (gateway_status == gateway_online)
  722. {
  723. pack_send_algorithm_report();
  724. }
  725. return;
  726. });
  727. }
  728. //发送校时
  729. void DemoProtocol::pack_send_ntp()
  730. {
  731. const nlohmann::json data = {
  732. {u8"deviceSN", m_line.getOptionString("deviceSN").c_str()}
  733. };
  734. pack_send_mqtt(topic_map_["timecheck"].get_publish_topic(), u8"EVENT_TIMECHECK", data);
  735. }
  736. // 解包校时
  737. void DemoProtocol::unpack_data_ntp(const nlohmann::json params)
  738. {
  739. auto recvDeviceSN = params["deviceSN"].get<std::string>();
  740. auto timestamp = params["timestamp"].get<std::string>();
  741. // 判断是否需要校时,
  742. auto enableNtp = m_line.getOptionString("ENABLE_NTP").c_str();
  743. if (strcmp(enableNtp, "1") == 0) {
  744. // 调用校时接口,写到网关
  745. cmnproto__set_system_time(m_handle.native(), atoll(timestamp.c_str()), 1);
  746. }
  747. }
  748. //发送全量数据定时任务
  749. void DemoProtocol::send_data_timer_task()
  750. {
  751. m_send_total_data_timer.start();
  752. m_send_total_data_timer.on([this]
  753. {
  754. pack_send_real_data();
  755. return;
  756. });
  757. m_send_incremental_data_timer.start();
  758. m_send_incremental_data_timer.on([this]
  759. {
  760. pack_send_incremental_data();
  761. return;
  762. });
  763. }
  764. void DemoProtocol::run_service_task()
  765. {
  766. // 校时处理,如果是跨天,即凌晨0点0分,则执行校时任务
  767. uint64_t day = 1000 * 60 * 60 * 24;
  768. auto now = getNow();
  769. if (!this->m_token.empty()) {
  770. if ((run_service_timestamp / day) != (now / day))
  771. {
  772. this->pack_send_ntp();
  773. m_StorageData.to_new_day();
  774. }
  775. }
  776. run_service_timestamp = now;
  777. }
  778. size_t DemoProtocol::cacl_real_data_buff_len(const uint16_t &yc_count, const uint16_t &yx_count)
  779. {
  780. int databuff_len = 1 + 4 + 8 + yc_count * 4 + (yx_count / 8) + ((yx_count % 8 == 0) ? 0 : 1);
  781. return databuff_len;
  782. }
  783. size_t DemoProtocol::cacl_inc_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count)
  784. {
  785. int databuff_len = 1 + 8 + 2 + 8+ 2+ yc_count * 8 + 2 + yx_count * 5;
  786. return databuff_len;
  787. }
  788. double DemoProtocol::getFileSizeInMB(const std::string& filename)
  789. {
  790. std::ifstream file(filename, std::ios::binary | std::ios::ate);
  791. if (!file.is_open()) {
  792. return -1; // 返回-1表示无法打开文件
  793. }
  794. std::streampos fileSize = file.tellg();
  795. file.close();
  796. // 转换为MB并返回
  797. return static_cast<double>(fileSize) / (1024 * 1024);
  798. }
  799. // 数据上报-发送实时数据,值发送一个设备
  800. void DemoProtocol::pack_send_real_data()
  801. {
  802. // 按HASH字典顺序获取第一个转发设备的迭代器
  803. size_t eqptIt_index = 0;
  804. for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
  805. {
  806. auto table = (*eqptIt).source().forwardTable();
  807. if (!table) {
  808. continue;
  809. }
  810. // 通过遥测转发页的KEY获取转发页
  811. auto page1 = table.pageGet("PAGE1");
  812. // 通过遥信转发页的KEY获取转发页
  813. auto page2 = table.pageGet("PAGE2");
  814. uint16_t yc_count = page1.pointSize(); //遥测总数
  815. uint16_t yx_count = page2.pointSize(); //遥信总数
  816. if (yc_count == 0 && yx_count == 0) {
  817. continue;
  818. }
  819. auto databuff_len = cacl_real_data_buff_len(yc_count, yx_count);
  820. vec_real_data_buff.resize(databuff_len);
  821. databuff_len = cacl_inc_real_data_buff_len(yc_count, yx_count);
  822. vec_inc_real_data_buff.resize(databuff_len);
  823. //缓存队列不为空时,需要清空缓存,存储新的数据
  824. if (YC_data_cache_list.size() > eqptIt_index &&
  825. YC_data_cache_list[eqptIt_index].size() > 0)
  826. {
  827. YC_data_cache_list[eqptIt_index].clear();
  828. }
  829. if (YX_data_cache_list.size() > eqptIt_index &&
  830. YX_data_cache_list[eqptIt_index].size() > 0)
  831. {
  832. YX_data_cache_list[eqptIt_index].clear();
  833. }
  834. //实时数据数组
  835. memset(&vec_real_data_buff[0], 0, vec_real_data_buff.size());
  836. size_t RD_index = 0;
  837. uint8_t RS_byte_value = 0; //遥信数据项,填充满8位后再给遥信数组赋值
  838. uint8_t RS_bit_index = 0; //标识正在填充字节的位序号
  839. //包头,固定为0x00
  840. vec_real_data_buff[RD_index++] = 0x00;
  841. //填充遥信和遥测数量,高位在前
  842. WriteDataToBufferWithNetworkByteOrder(yx_count, &vec_real_data_buff[RD_index]);
  843. RD_index += 2;
  844. WriteDataToBufferWithNetworkByteOrder(yc_count, &vec_real_data_buff[RD_index]);
  845. RD_index += 2;
  846. //数据生成时间,8字节,自 1970 年 1 月 1 日(08:00:00 GMT)至当前时间的总毫秒数
  847. CurrentRealDataTime = getNow();
  848. WriteDataToBufferWithNetworkByteOrder(CurrentRealDataTime, &vec_real_data_buff[RD_index]);
  849. RD_index += 8;
  850. // 获取该页所有转发变量
  851. for (auto index = 0; index < page1.pointSize(); ++index)
  852. {
  853. auto fp = page1.pointAt(index);
  854. if (!fp) {
  855. continue;
  856. }
  857. // 获取关联采集变量
  858. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  859. if (!cp) {
  860. continue;
  861. }
  862. auto value = static_cast<float>(cp.source().processNumberValue());
  863. WriteDataToBufferWithNetworkByteOrder(value, &vec_real_data_buff[RD_index]);
  864. RD_index += 4;
  865. //缓存遥测数据,用于比较是否有变化
  866. YC_data_cache_list[eqptIt_index].push_back(value);
  867. }
  868. // 获取该页所有转发变量
  869. for (auto index = 0; index < page2.pointSize(); ++index)
  870. {
  871. auto fp = page2.pointAt(index);
  872. if (!fp) {
  873. continue;
  874. }
  875. // 获取关联采集变量
  876. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  877. if (!cp) {
  878. continue;
  879. }
  880. uint8_t yx = (int8_t)cp.source().processNumberValue();
  881. if (yx != 0)
  882. {
  883. RS_byte_value |= (1 << RS_bit_index);
  884. }
  885. //已经填充满了8个遥信信号,填充到数组,并临时遥信数据项清零
  886. if (++RS_bit_index >= 8)
  887. {
  888. vec_real_data_buff[RD_index++] = RS_byte_value;
  889. RS_byte_value = 0;
  890. RS_bit_index = 0;
  891. }
  892. YX_data_cache_list[eqptIt_index].push_back(yx);
  893. }
  894. //最后一个值没计算满8个,也需要放入发送缓存
  895. if (RS_bit_index < 8)
  896. {
  897. vec_real_data_buff[RD_index++] = RS_byte_value;
  898. }
  899. if (gateway_status == gateway_online)
  900. {
  901. pack_send_mqtt_binary(topic_map_["data_up"].get_publish_topic(), &vec_real_data_buff[0], RD_index);
  902. m_StorageData.save_data_to_file(&vec_real_data_buff[0], RD_index, CurrentRealDataTime);
  903. cmnproto_log_debug(m_handle.native(), "Total Data,size:%d,ycCount:%d,yxCount:%d", RD_index, yc_count, yx_count);
  904. #ifdef DEBUG
  905. std::string strData = HexArrayToString(&vec_real_data_buff[0], RD_index);
  906. cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
  907. #endif
  908. //发送实时数据后,增量包时间窗口内的序号清零,重新开始计数
  909. IncDataPackIndex = 0;
  910. }
  911. else if (gateway_status == gateway_offline)
  912. {
  913. m_StorageData.save_offline_data_to_file(&vec_real_data_buff[0], RD_index);
  914. }
  915. ++eqptIt_index;
  916. }
  917. }
  918. // 数据上报-发送增量数据
  919. // 五分钟发送一个全量包,在接下来的五分钟内,定时扫描值有变化的点上送
  920. void DemoProtocol::pack_send_incremental_data()
  921. {
  922. try
  923. {
  924. // 按HASH字典顺序获取第一个转发设备的迭代器
  925. int eqptIt_index = 0;
  926. for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
  927. {
  928. auto eqpt = *eqptIt;
  929. auto table = eqpt.source().forwardTable();
  930. if (!table) {
  931. continue;
  932. }
  933. // 通过遥测转发页的KEY获取转发页
  934. auto page1 = table.pageGet("PAGE1");
  935. // 通过遥信转发页的KEY获取转发页
  936. auto page2 = table.pageGet("PAGE2");
  937. uint16_t yc_count = page1.pointSize(); //遥测总数
  938. uint16_t yx_count = page2.pointSize(); //遥信总数
  939. if (yc_count == 0 && yx_count == 0) {
  940. continue;
  941. }
  942. if (YC_data_cache_list.at(eqptIt_index).size() == 0
  943. || YX_data_cache_list.at(eqptIt_index).size() == 0)
  944. {
  945. continue;
  946. }
  947. //增量包数据数组
  948. memset(&vec_inc_real_data_buff[0], 0, vec_inc_real_data_buff.size());
  949. int RD_index = 0;
  950. uint16_t telemetering_count = 0; //遥测总数
  951. uint16_t remote_signal_count = 0; //遥信总数
  952. //增量包包头,固定为0x01
  953. vec_inc_real_data_buff[RD_index++] = 0x01;
  954. //时间戳,8字节,用于标识本增量包所属的全量包
  955. WriteDataToBufferWithNetworkByteOrder(CurrentRealDataTime, &vec_inc_real_data_buff[RD_index]);
  956. RD_index += 8;
  957. //代表时间窗口内的数据包的顺序号
  958. WriteDataToBufferWithNetworkByteOrder(IncDataPackIndex, &vec_inc_real_data_buff[RD_index]);
  959. RD_index += 2;
  960. ++IncDataPackIndex;
  961. //数据产生时间,8字节
  962. auto timestamp = getNow();
  963. WriteDataToBufferWithNetworkByteOrder(timestamp, &vec_inc_real_data_buff[RD_index]);
  964. RD_index += 8;
  965. //跳过遥测总数,统计后再填充
  966. uint16_t yc_count_Index = RD_index;
  967. RD_index += 2;
  968. // 获取该页所有转发变量
  969. auto yc_point_index = 0;
  970. for (auto index = 0; index < page1.pointSize(); ++index)
  971. {
  972. auto fp = page1.pointAt(index);
  973. if (!fp) {
  974. continue;
  975. }
  976. // 获取关联采集变量
  977. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  978. if (!cp) {
  979. continue;
  980. }
  981. auto yc_value = static_cast<float> (cp.source().processNumberValue());
  982. if (YC_data_cache_list.at(eqptIt_index).at(yc_point_index) != yc_value)
  983. {
  984. // 点表序号
  985. uint16_t yc_idx = cmnforward_point_no(fp.native());
  986. WriteDataToBufferWithNetworkByteOrder(yc_idx, &vec_inc_real_data_buff[RD_index]);
  987. RD_index += 2;
  988. // 采集时刻,2个字节,数据产生时间”的偏移毫秒数,= 数据采集时间-数据产生时间
  989. int16_t collect_time = CurrentRealDataTime - timestamp;
  990. WriteDataToBufferWithNetworkByteOrder(collect_time, &vec_inc_real_data_buff[RD_index]);
  991. RD_index += 2;
  992. //数据项值
  993. WriteDataToBufferWithNetworkByteOrder(yc_value, &vec_inc_real_data_buff[RD_index]);
  994. RD_index += 4;
  995. ++telemetering_count;
  996. //更新缓存队列的数据
  997. YC_data_cache_list.at(eqptIt_index).at(yc_point_index) = yc_value;
  998. }
  999. ++yc_point_index;
  1000. }
  1001. //填充遥测总数
  1002. WriteDataToBufferWithNetworkByteOrder(telemetering_count, &vec_inc_real_data_buff[yc_count_Index]);
  1003. uint16_t yx_count_index = RD_index;
  1004. RD_index += 2; //跳过遥信总数,统计后再填充
  1005. // 获取该页所有转发变量
  1006. auto yx_point_index = 0;
  1007. for (auto index = 0; index < page2.pointSize(); ++index)
  1008. {
  1009. auto fp = page2.pointAt(index);
  1010. if (!fp) {
  1011. continue;
  1012. }
  1013. // 获取关联采集变量
  1014. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  1015. if (!cp) {
  1016. continue;
  1017. }
  1018. uint8_t yx_value = (uint8_t)cp.source().processNumberValue();
  1019. if (YX_data_cache_list.at(eqptIt_index).at(yx_point_index) != yx_value)
  1020. {
  1021. // 点表序号
  1022. uint16_t yx_idx = cmnforward_point_no(fp.native());
  1023. WriteDataToBufferWithNetworkByteOrder(yx_idx, &vec_inc_real_data_buff[RD_index]);
  1024. RD_index += 2;
  1025. // 采集时刻,2个字节,数据产生时间”的偏移毫秒数,= 数据采集时间*1000 - 数据产生时间
  1026. int16_t collect_time = CurrentRealDataTime - timestamp;
  1027. WriteDataToBufferWithNetworkByteOrder(collect_time, &vec_inc_real_data_buff[RD_index]);
  1028. RD_index += 2;
  1029. if (yx_value != 0)
  1030. {
  1031. vec_inc_real_data_buff[RD_index] |= (1 << 7);
  1032. }
  1033. RD_index += 1;
  1034. ++remote_signal_count;
  1035. //更新缓存队列的数据
  1036. YX_data_cache_list.at(eqptIt_index).at(yx_point_index) = yx_value;
  1037. }
  1038. ++yx_point_index;
  1039. }
  1040. //填充遥信总数
  1041. WriteDataToBufferWithNetworkByteOrder(remote_signal_count, &vec_inc_real_data_buff[yx_count_index]);
  1042. //数据有变化,开始发送增量数据
  1043. if (remote_signal_count > 0 || telemetering_count > 0)
  1044. {
  1045. if (gateway_status == gateway_online)
  1046. {
  1047. pack_send_mqtt_binary(topic_map_["data_up"].get_publish_topic(), &vec_inc_real_data_buff[0], RD_index);
  1048. cmnproto_log_debug(m_handle.native(), "Inc Data,size:%d,ycCount:%d,yxCount:%d", RD_index, telemetering_count, remote_signal_count);
  1049. #ifdef DEBUG
  1050. std::string strData = HexArrayToString(&vec_inc_real_data_buff[0], RD_index);
  1051. cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
  1052. #endif
  1053. }
  1054. else if (gateway_status == gateway_offline)
  1055. {
  1056. m_StorageData.save_offline_data_to_file(&vec_inc_real_data_buff[0], RD_index);
  1057. }
  1058. //TEST
  1059. //++testindex;
  1060. //if (testindex == 10)
  1061. //{
  1062. // gateway_status = gateway_offline;
  1063. // m_StorageData.create_offline_data_file();
  1064. // cmnproto_log_debug(m_handle.native(), "Gateway offline test");
  1065. //}
  1066. //else if (testindex == 100)
  1067. //{
  1068. // nlohmann::json data = {
  1069. // {u8"deviceSN", "1"},
  1070. // };
  1071. // unpack_data_register(data);
  1072. //}
  1073. //else if (testindex == 200)
  1074. //{
  1075. // testindex = 0;
  1076. //}
  1077. //else if (index == 20)
  1078. //{
  1079. // nlohmann::json data1 = {
  1080. // {u8"callid", 1},
  1081. // {u8"starttime", 1704301950560},
  1082. // {u8"endtime", 1704381847444}
  1083. // };
  1084. // unpack_data_up_all(data1);
  1085. //}
  1086. }
  1087. eqptIt_index++;
  1088. }
  1089. }
  1090. catch (const std::exception& errorinfo)
  1091. {
  1092. cmnproto_log_debug(m_handle.native(), "Send inc data exception:%s, count: %d, index: %d", errorinfo.what());
  1093. }
  1094. }
  1095. void DemoProtocol::pack_send_offline_data()
  1096. {
  1097. m_send_offline_data_timer.start();
  1098. m_send_offline_data_timer.on([this]
  1099. {
  1100. //没有文件或者,已经读取完最后一个文件,或者离线状态,不发送补发数据
  1101. if (m_offline_file_index >= m_offline_data_file_list.size()
  1102. || gateway_status == gateway_offline)
  1103. {
  1104. return;
  1105. }
  1106. if (m_offline_data_send_index == INVAILD_SEND_OL_INDEX)//发送缓存的数据发送完了
  1107. {
  1108. //从离线文件列表中获取离线文件名,读取离线数据
  1109. m_offline_data_cache.clear();
  1110. auto strFilePath = m_offline_data_file_list[m_offline_file_index];
  1111. m_StorageData.read_offline_data(strFilePath, m_offline_data_cache);
  1112. auto cache_data_count = m_offline_data_cache.size();
  1113. cmnproto_log_debug(m_handle.native(), "read offline file:%s,count:%d", strFilePath.string(), cache_data_count, m_offline_data_send_index);
  1114. if (cache_data_count > 0)
  1115. {
  1116. m_offline_data_send_index = 0;//发送序号重新0开始
  1117. m_offline_compensationId = getNow();
  1118. uint64_t starttime = get_timestamp_from_vec(&m_offline_data_cache[0][0], m_offline_data_cache[0].size());
  1119. uint64_t endtime = get_timestamp_from_vec(&m_offline_data_cache[cache_data_count - 1][0], m_offline_data_cache[cache_data_count - 1].size());
  1120. const nlohmann::json data = {
  1121. {u8"compensationid", m_offline_compensationId},
  1122. {u8"starttime", starttime},
  1123. {u8"endtime", endtime},
  1124. {u8"size", cache_data_count},
  1125. {u8"type",0}
  1126. };
  1127. pack_send_mqtt(topic_map_["data_compensation"].get_publish_topic(), u8"0", data);
  1128. }
  1129. else
  1130. {
  1131. //当前文件为空,读下一个文件
  1132. ++m_offline_file_index;
  1133. }
  1134. }
  1135. else
  1136. {
  1137. auto cache_data_count = m_offline_data_cache.size();
  1138. if (m_offline_data_send_index < cache_data_count)
  1139. {
  1140. size_t data_len = m_offline_data_cache[m_offline_data_send_index].size();
  1141. uint8_t data_arr[MAX_DATA_LEN] = { 0 };
  1142. std::copy(m_offline_data_cache[m_offline_data_send_index].begin(), m_offline_data_cache[m_offline_data_send_index].end(), data_arr);
  1143. pack_send_mqtt_binary(topic_map_["data_compensation"].get_publish_topic(), data_arr, data_len);
  1144. cmnproto_log_debug(m_handle.native(), "Send offline data size: %d, count: %d, index: %d", data_len, cache_data_count, m_offline_data_send_index);
  1145. #ifdef DEBUG
  1146. std::string strData = HexArrayToString(data_arr, data_len);
  1147. cmnproto_log_debug(m_handle.native(), "data: %s", strData.c_str());
  1148. #endif
  1149. //一个离线文件发送完了,到下一个文件。
  1150. if (++m_offline_data_send_index >= cache_data_count)
  1151. {
  1152. m_offline_data_send_index = INVAILD_SEND_OL_INDEX;
  1153. if (m_is_sending_cache_data == false)
  1154. {
  1155. ++m_offline_file_index;
  1156. }
  1157. else //如果发送的是缓存数据,文件需要不增加
  1158. {
  1159. m_is_sending_cache_data = true;
  1160. }
  1161. }
  1162. }
  1163. }
  1164. });
  1165. }
  1166. // 发送总召数据,定时器调用
  1167. void DemoProtocol::pack_send_up_call()
  1168. {
  1169. m_up_call_timer.start();
  1170. m_up_call_timer.on([this]
  1171. {
  1172. if (m_upcall_send_index < m_his_data_file_list.size())
  1173. {
  1174. std::vector<uint8_t> historyData;
  1175. m_StorageData.read_history_data(m_his_data_file_list[m_upcall_send_index], historyData);
  1176. auto data_len = historyData.size();
  1177. pack_send_mqtt_binary(topic_map_["up_call"].get_publish_topic(), &historyData[0], data_len);
  1178. cmnproto_log_debug(m_handle.native(), "Send up call data:%d", data_len);
  1179. #ifdef DEBUG
  1180. std::string strData = HexArrayToString(&historyData[0], data_len);
  1181. cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
  1182. #endif
  1183. ++m_upcall_send_index;
  1184. }
  1185. else
  1186. {
  1187. m_upcall_send_index = 0;
  1188. m_up_call_timer.cancel();
  1189. }
  1190. });
  1191. }
  1192. void DemoProtocol::query_send_bridge(std::vector<std::string> dotNumbers)
  1193. {
  1194. nlohmann::json dots = {};
  1195. // 将所有的点号加进入
  1196. for (std::vector<std::string>::iterator it = dotNumbers.begin(); it != dotNumbers.end(); ++it) {
  1197. dots[*it] = "%{" + *it + "}";
  1198. }
  1199. const memepp::string station_id = m_line.getOptionString("STATION_ID");
  1200. const auto sendText = station_id.c_str() + dots.dump();
  1201. this->pack_send_mqtt_text(topic_map_["bridge"].get_publish_topic(), sendText);
  1202. }
  1203. void DemoProtocol::unpack_bridge_data(memepp::string_view data)
  1204. {
  1205. // 只有是当前token才处理
  1206. const memepp::string station_id = m_line.getOptionString("STATION_ID");
  1207. if (data.starts_with(station_id)) {
  1208. auto jsonStr = data.substr(station_id.size());
  1209. // 将剩下的内容转换为json
  1210. nlohmann::json jsonObj = nlohmann::json::parse(jsonStr);
  1211. for (const auto& entry : jsonObj.items()) {
  1212. auto value = jsonObj[entry.key()].get<std::string>();
  1213. if (!value.empty())
  1214. {
  1215. value = removeBraces(value);
  1216. }
  1217. point_bridge_map_[entry.key()] = value;
  1218. cmnproto_log_debug(m_handle.native(), "bridge data: %s = %s", entry.key().c_str(), value.c_str());
  1219. }
  1220. }
  1221. }
  1222. // 解析总召指令
  1223. void DemoProtocol::unpack_data_up_all(const nlohmann::json params)
  1224. {
  1225. auto callid = params["callid"].get<uint64_t>(); //总召标记,时间戳
  1226. auto starttime = params["starttime"].get<uint64_t>(); //数据开始时间戳
  1227. auto endtime = params["endtime"].get<uint64_t>(); //时间结束时间戳
  1228. m_his_data_file_list.clear();
  1229. //从文件系统读取数据
  1230. m_StorageData.get_history_file_list(starttime, endtime, m_his_data_file_list);
  1231. long vecSize = m_his_data_file_list.size();
  1232. if (vecSize > 0)
  1233. {
  1234. auto timeMSCount = getNow();
  1235. // 发送应答数据开始包
  1236. const nlohmann::json data = {
  1237. {u8"replyid", timeMSCount},
  1238. {u8"callid", callid},
  1239. {u8"starttime", starttime},
  1240. {u8"endtime", endtime},
  1241. {u8"size", vecSize},
  1242. {u8"type",0},
  1243. };
  1244. pack_send_mqtt(topic_map_["up-call"].get_publish_topic(), u8"0", data);
  1245. m_upcall_send_index = 0;
  1246. pack_send_up_call();
  1247. }
  1248. }
  1249. // 解包设备状态应答
  1250. void DemoProtocol::unpack_query_machine_status(const nlohmann::json params)
  1251. {
  1252. // 内存利用率
  1253. double memRatio = 0;
  1254. mghw_memory_status memStatus = { sizeof(mghw_memory_status), 0 };
  1255. if (mghw_get_memory_status(&memStatus) == 0) {
  1256. // 计算内存使用率
  1257. memRatio = (((double)(memStatus.total_physical - memStatus.available_physical)) / memStatus.total_physical) * 100;
  1258. }
  1259. // cpu利用率
  1260. double cpuRatio = 0;
  1261. {
  1262. #ifdef COMM_OS_WINDOWS
  1263. FILETIME idleTime, kernelTime, userTime;
  1264. cpuRatio = (GetSystemTimes(&idleTime, &kernelTime, &userTime) ?
  1265. __CalculateCPULoad(__FileTimeToInt64(idleTime), __FileTimeToInt64(kernelTime) + __FileTimeToInt64(userTime)) : 1.0f) * 100.0;
  1266. #else
  1267. char buf[256] = {0};
  1268. char name[64] = {0};
  1269. FILE* fd = fopen("/proc/stat","r");
  1270. if (fd) {
  1271. }
  1272. uint32_t user = 0, nice = 0, system = 0, idle = 0;
  1273. fgets(buf,sizeof(buf), fd);
  1274. fclose(fd);
  1275. if (sscanf(buf, "%s %u %u %u %u", name, &user, &nice, &system, &idle) == 5) {
  1276. cpuRatio = (1 - (static_cast<double>(idle) / static_cast<double>(user + nice + system + idle))) * 100;
  1277. }
  1278. #endif
  1279. }
  1280. // 磁盘利用率
  1281. double diskRatio = calc_disk_ratio();
  1282. const nlohmann::json data = {
  1283. {u8"cpuUsage", cpuRatio}, //CPU使用率
  1284. {u8"memUsage", memRatio}, //内存使用率
  1285. {u8"storageUsage", diskRatio}, //存储使用率
  1286. };
  1287. pack_send_mqtt_reply(topic_map_["status"].get_subscribe_topic(), u8"CMN_MACHINE_STATUS", data);
  1288. }
  1289. //计算磁盘利用率,最大值100
  1290. double DemoProtocol::calc_disk_ratio()
  1291. {
  1292. mghw_harddisk_freespace hardDiskFreeSpace = { sizeof(mghw_harddisk_freespace), 0 };
  1293. mghw_get_harddisk_freespace_by_path(m_handle.lineDataStoreDirectoryPath().c_str(), -1, &hardDiskFreeSpace);
  1294. auto freeHardDiskMb = hardDiskFreeSpace.free / (1024 * 1024);
  1295. double diskRatio = (((double)hardDiskFreeSpace.total - (double)hardDiskFreeSpace.free) / (double)hardDiskFreeSpace.total) * 100;
  1296. return diskRatio;
  1297. }
  1298. // 查询指令-日志查询
  1299. void DemoProtocol::unpack_query_log(const nlohmann::json params)
  1300. {
  1301. // MYS
  1302. auto strDate = params["date"].get<std::string>(); //查询日期,格式yyyy-MM-dd
  1303. auto index = params["index"].get<int>(); //当前页数(从1开始)
  1304. auto size = params["size"].get<int>(); //每页条数
  1305. // 获取文件路径
  1306. //auto runDir = getRunDir();
  1307. // 当前日志文件
  1308. std::string fileName("/proto_" + strDate + ".log");
  1309. // 完整路径
  1310. //std::string fullPath;
  1311. //char separator = fs::path::preferred_separator;
  1312. //fullPath.append(std::string(currentDir.begin(), currentDir.end())).append("logs").append(fileName);
  1313. // 日志总行数
  1314. uint64_t totalCount = 0;
  1315. // 读取结果
  1316. std::vector<nlohmann::json> lines;
  1317. // 读取日志文件
  1318. auto dirPath = m_handle.lineDataStoreDirectoryPath();
  1319. auto fullPath = mm_from(fmt::format("{}/logs/{}", dirPath, fileName));
  1320. readLinesFromFile(mm_to<memepp::native_string>(fullPath), (index - 1) * size, size, totalCount, lines);
  1321. const nlohmann::json data = {
  1322. {u8"total", totalCount},
  1323. {u8"logs", lines},
  1324. };
  1325. pack_send_mqtt_reply(topic_map_["log"].get_subscribe_topic(), u8"CMN_RUNNING_LOG", data);
  1326. }
  1327. void DemoProtocol::load_algorithm_from_db(memepp::native_string filePath)
  1328. {
  1329. //加载已经部署的算法信息
  1330. m_algorithmDB.InitAlgorithmDB(filePath);
  1331. auto algoVec = m_algorithmDB.GetAllRecords();
  1332. if (algoVec.size() > 0)
  1333. {
  1334. for (auto element : algoVec)
  1335. {
  1336. memepp::string_view exprId = mm_view(element.strAlgorithmId.c_str(), element.strAlgorithmId.length());
  1337. auto expr = m_handle.createExpr(exprId);
  1338. auto isValid = expr.isObjectValid();
  1339. if (isValid)
  1340. {
  1341. //编译算法
  1342. memepp::string_view strExpr = mm_view(element.strNewThresholdFormula.c_str(), element.strNewThresholdFormula.length());
  1343. expr.setString(strExpr);
  1344. expr.setResultMode(nbpp::dlli::Expr::WithMidres);
  1345. expr.compile();
  1346. if (!expr.hasCompileError())
  1347. {
  1348. m_mapFormulasInfo[element.strAlgorithmId].AlgotithmInfo = element;
  1349. m_mapFormulasInfo[element.strAlgorithmId].exprObject = expr;
  1350. std::vector<std::string> pointNoVec = extractValueFromExpression(element.strThresholdFormula);
  1351. //通过点位桥查询
  1352. for (int i = 0; i < pointNoVec.size(); ++i)
  1353. {
  1354. //缓存对应,为了后面通过 map[采集点号] = 点位号
  1355. m_collect_point_map[element.collectPointVec[i]] = pointNoVec[i];
  1356. }
  1357. query_send_bridge(pointNoVec);
  1358. }
  1359. }
  1360. }
  1361. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] load Formula: %d ", algoVec.size());
  1362. }
  1363. }
  1364. //算法下发解包
  1365. void DemoProtocol::unpack_algorithm_data(const nlohmann::json params)
  1366. {
  1367. // 解包算法数据
  1368. auto strDeviceSN = params["deviceSN"].get<std::string>(); //网关编号
  1369. auto isDeploy = params["deploy"].get<bool>(); //是否部署:false-仅部署,true-部署
  1370. auto strlistFormulas = params["formulas"].get<vector<nlohmann::json>>();
  1371. for (auto element : strlistFormulas)
  1372. {
  1373. nlohmann::json data = {};
  1374. FormulasInfo formulasInfo;
  1375. formulasInfo.AlgotithmInfo.strAlgorithmId = element["algorithmId"].get<std::string>(); //算法ID
  1376. formulasInfo.AlgotithmInfo.strEquipId = element["equipId"].get<std::string>(); //设备ID
  1377. formulasInfo.AlgotithmInfo.strThresholdFormula = element["thresholdFormula"].get<std::string>(); //阈值公式
  1378. data["deviceSN"] = strDeviceSN;
  1379. data["algorithmId"] = formulasInfo.AlgotithmInfo.strAlgorithmId;
  1380. data["equipId"] = formulasInfo.AlgotithmInfo.strEquipId;
  1381. data["errorCode"] = 0;
  1382. data["message"] = "ok";
  1383. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] unpack Formula: %s ", formulasInfo.AlgotithmInfo.strThresholdFormula.c_str());
  1384. //获取公式中所有%{X},X点位号
  1385. std::vector<std::string> pointNoVec = extractValueFromExpression(formulasInfo.AlgotithmInfo.strThresholdFormula);
  1386. //点位号转换成采集点号
  1387. formulasInfo.AlgotithmInfo.collectPointVec = std::vector<std::string>(pointNoVec.size());
  1388. PointNo2PointId(pointNoVec, formulasInfo.AlgotithmInfo.collectPointVec);
  1389. //TOTEST
  1390. //formulasInfo.AlgotithmInfo.collectPointVec[0] = "260223241487560715";
  1391. string strcollectPointTmp = "";
  1392. bool isPointNoValid = true;
  1393. for (auto element: formulasInfo.AlgotithmInfo.collectPointVec)
  1394. {
  1395. if (element.empty())
  1396. {
  1397. isPointNoValid = false;
  1398. data["message"] = "The formula contains invalid measurement points";
  1399. break;
  1400. }
  1401. strcollectPointTmp = strcollectPointTmp + element + ",";
  1402. }
  1403. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] pointNo =>collectPoint: %s ", strcollectPointTmp.c_str());
  1404. //采集点号替换到公式中
  1405. formulasInfo.AlgotithmInfo.strNewThresholdFormula = replacePlaceholder(formulasInfo.AlgotithmInfo.strThresholdFormula, formulasInfo.AlgotithmInfo.collectPointVec);
  1406. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] NewThresholdFormula: %s ", formulasInfo.AlgotithmInfo.strNewThresholdFormula.c_str());
  1407. //语法校验
  1408. memepp::string_view exprId = mm_view(formulasInfo.AlgotithmInfo.strAlgorithmId.c_str(), formulasInfo.AlgotithmInfo.strAlgorithmId.length());
  1409. auto expr = m_handle.createExpr(exprId);
  1410. auto isValid = expr.isObjectValid();
  1411. if (isValid && isPointNoValid)
  1412. {
  1413. //编译算法
  1414. memepp::string_view strExpr = mm_view(formulasInfo.AlgotithmInfo.strNewThresholdFormula.c_str(), formulasInfo.AlgotithmInfo.strNewThresholdFormula.length());
  1415. expr.setString(strExpr);
  1416. expr.setResultMode(nbpp::dlli::Expr::WithMidres);
  1417. expr.compile();
  1418. if (expr.hasCompileError())
  1419. {
  1420. data["errorCode"] = expr.firstCompileErrorCode();
  1421. data["message"] = expr.firstCompileErrorMessage().c_str();
  1422. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] CompileError,error code:%d ,error message:%s", expr.firstCompileErrorCode(), expr.firstCompileErrorMessage().c_str());
  1423. }
  1424. else
  1425. {
  1426. //编译无措,且如果需要部署,则保存。
  1427. if (isDeploy)
  1428. {
  1429. //通过点位桥查询
  1430. for (int i = 0; i < pointNoVec.size(); ++i)
  1431. {
  1432. //缓存对应,为了后面通过 map[采集点号] = 点位号
  1433. m_collect_point_map[formulasInfo.AlgotithmInfo.collectPointVec[i]] = pointNoVec[i];
  1434. }
  1435. query_send_bridge(pointNoVec);
  1436. string strKey = formulasInfo.AlgotithmInfo.strAlgorithmId;
  1437. //保存算式
  1438. formulasInfo.exprObject = expr;
  1439. //如果已经存在,则需要更新,否则就新增一条记录
  1440. if (m_mapFormulasInfo.count(strKey) > 0)
  1441. {
  1442. m_algorithmDB.UpdateRecord(strKey, formulasInfo.AlgotithmInfo);
  1443. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile success and UPDATA Formula, count:%d ", m_mapFormulasInfo.size());
  1444. }
  1445. else
  1446. {
  1447. m_algorithmDB.AddRecord(formulasInfo.AlgotithmInfo);
  1448. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile success and ADD a Formula, count:%d ", m_mapFormulasInfo.size());
  1449. }
  1450. m_mapFormulasInfo[strKey] = formulasInfo;
  1451. }
  1452. }
  1453. }
  1454. else
  1455. {
  1456. data["errorCode"] = 10000;
  1457. cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile isInValid!");
  1458. }
  1459. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 1");
  1460. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 2");
  1461. pack_send_mqtt_reply(topic_map_["algorithm_send"].get_subscribe_topic(), u8"CMD_ALGORITHM_SEND", data);
  1462. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 3");
  1463. }
  1464. }
  1465. //平台下发点位号转换成网关平台测点ID
  1466. void DemoProtocol::PointNo2PointId(std::vector<std::string> pointNo, std::vector<std::string>& pointId)
  1467. {
  1468. size_t vec_len = pointNo.size();
  1469. std::vector<int64_t> vecpointNoNum;
  1470. for (int i = 0; i < vec_len; ++i)
  1471. {
  1472. vecpointNoNum.push_back(std::stoull(pointNo[i]));
  1473. }
  1474. for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
  1475. {
  1476. auto table = (*eqptIt).source().forwardTable();
  1477. if (!table) {
  1478. continue;
  1479. }
  1480. auto page1 = table.pageGet("PAGE1");
  1481. for (auto index = 0; index < page1.pointSize(); ++index)
  1482. {
  1483. auto fp = page1.pointAt(index);
  1484. if (!fp) {
  1485. continue;
  1486. }
  1487. auto fpNo = fp.no();
  1488. for (int i = 0; i < vec_len;++i)
  1489. {
  1490. if (fp.no() == vecpointNoNum[i])
  1491. {
  1492. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  1493. auto pointid = cp.source().id();
  1494. pointId[i] = std::string(pointid.c_str(), pointid.size());
  1495. break;
  1496. }
  1497. }
  1498. }
  1499. auto page2 = table.pageGet("PAGE2");
  1500. for (auto index = 0; index < page2.pointSize(); ++index)
  1501. {
  1502. auto fp = page2.pointAt(index);
  1503. if (!fp) {
  1504. continue;
  1505. }
  1506. auto fpNo = fp.no();
  1507. for (int i = 0; i < vec_len;++i)
  1508. {
  1509. if (fpNo == vecpointNoNum[i])
  1510. {
  1511. auto cp = fp.related().release<Cmnpp::Proto::Point>();
  1512. auto pointid = cp.source().id();
  1513. pointId[i] = std::string(pointid.c_str(), pointid.size());
  1514. break;
  1515. }
  1516. }
  1517. }
  1518. }
  1519. }
  1520. //算法结果上报
  1521. void DemoProtocol::pack_send_algorithm_report()
  1522. {
  1523. //没有部署算法,不执行
  1524. if (m_mapFormulasInfo.size() == 0)
  1525. return;
  1526. auto CurrentRealDataTime = getNow();
  1527. std::vector<nlohmann::json> vec_Formulas;
  1528. for (auto keyValue : m_mapFormulasInfo)
  1529. {
  1530. FormulasInfo formulasInfo = keyValue.second;
  1531. if (!formulasInfo.exprObject.isCompiled())
  1532. {
  1533. formulasInfo.exprObject.setResultMode(nbpp::dlli::Expr::WithMidres);
  1534. formulasInfo.exprObject.compile();
  1535. }
  1536. auto exprResult = formulasInfo.exprObject.calc();
  1537. nlohmann::json data;
  1538. if (exprResult.isObjectValid())
  1539. {
  1540. auto result = exprResult.numberValue();
  1541. if (result == 0)//算法结果 "正常"
  1542. {
  1543. data = {
  1544. {u8"result", 0},
  1545. };
  1546. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] calc result:0, no alarm! ");
  1547. }
  1548. else//算法结果 "预警" a
  1549. {
  1550. nlohmann::json points;
  1551. nlohmann::json variables;
  1552. string strpointsTmp = "";
  1553. //获取参与运算测点的值
  1554. auto pointSize = exprResult.pointStatementSize();
  1555. vector<string> pointNoVecTmp;
  1556. for (int i = 0; i < pointSize; ++i)
  1557. {
  1558. memepp::string key;
  1559. memepp::variant val;
  1560. exprResult.pointStatementKeyValue(i, key, val);
  1561. string strKey = string(key.c_str(), key.size());//采集点号
  1562. //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] engine key = %s", strKey.c_str());
  1563. //点位桥协议转换
  1564. //采集点号->点位号->设备号
  1565. if (m_collect_point_map.count(strKey) != 0)
  1566. {
  1567. //采集点号->点位号
  1568. string strPointNo = m_collect_point_map[strKey];
  1569. // cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] point key = %s", strPointNo.c_str());
  1570. if (point_bridge_map_.count(strPointNo) != 0)
  1571. {
  1572. //点位号->设备号
  1573. strKey = point_bridge_map_[strPointNo];
  1574. // cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] device key = %s", strPointNo.c_str());
  1575. }
  1576. else
  1577. {
  1578. //重发点位桥协议报文
  1579. pointNoVecTmp.push_back(strPointNo);
  1580. }
  1581. }
  1582. if (pointNoVecTmp.size() > 0)
  1583. {
  1584. //部分点位号没有对应设备号,重发点位桥协议报文
  1585. query_send_bridge(pointNoVecTmp);
  1586. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] resend bridge :%d", pointNoVecTmp.size());
  1587. }
  1588. double valTmp;
  1589. val.try_get(valTmp);
  1590. points[strKey] = valTmp;
  1591. strpointsTmp = strpointsTmp + strKey + ":" + std::to_string(valTmp) + ",";
  1592. }
  1593. //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] algorithm report points:%s ", strpointsTmp.c_str());
  1594. string strvariablesTmp = "";
  1595. //获取算法定义中间变量的计算值
  1596. auto variableSize = exprResult.assignStatementSize();
  1597. for (int i = 0; i < variableSize; ++i)
  1598. {
  1599. memepp::string key;
  1600. memepp::variant val;
  1601. exprResult.assignStatementKeyValue(i, key, val);
  1602. string strKey = string(key.c_str(), key.size());
  1603. double valTmp;
  1604. val.try_get(valTmp);
  1605. variables[strKey] = valTmp;
  1606. strvariablesTmp = strpointsTmp + strKey + ":" + std::to_string(valTmp) + ",";
  1607. }
  1608. //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] algorithm report variables:%s ", strvariablesTmp.c_str());
  1609. data = {
  1610. {u8"result", result},
  1611. {u8"points", points},
  1612. {u8"variables", variables},
  1613. };
  1614. }
  1615. }
  1616. else
  1617. {
  1618. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] calc error , object invalid ");
  1619. }
  1620. const nlohmann::json an_formula = {
  1621. {u8"equipId", formulasInfo.AlgotithmInfo.strEquipId},
  1622. {u8"algorithmId", formulasInfo.AlgotithmInfo.strAlgorithmId},
  1623. {u8"data", data},
  1624. };
  1625. vec_Formulas.push_back(an_formula);
  1626. }
  1627. const nlohmann::json data_send = {
  1628. {u8"deviceSN", m_device_sn},
  1629. {u8"date", CurrentRealDataTime},
  1630. {u8"formulas", vec_Formulas},
  1631. };
  1632. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] %s ", data_send.dump().c_str());
  1633. pack_send_mqtt(topic_map_["algorithm_data"].get_publish_topic(), u8"REP_ALGORITHM_DATA", data_send);
  1634. }
  1635. //阈值算法移除
  1636. void DemoProtocol::unpack_algorithm_delete(const nlohmann::json params)
  1637. {
  1638. auto strDeviceSN = params["deviceSN"].get<std::string>(); //网关编号
  1639. auto strDelFormulas = params["formulas"].get<vector<nlohmann::json>>();
  1640. auto nErrCode = 0;
  1641. std::vector<nlohmann::json> dels;
  1642. for (auto element : strDelFormulas)
  1643. {
  1644. auto algorithmId = element["algorithmId"].get<std::string>();
  1645. auto equipId = element["equipId"].get<std::string>();
  1646. if (m_mapFormulasInfo.count(algorithmId) > 0)
  1647. {
  1648. m_mapFormulasInfo.erase(algorithmId);
  1649. m_algorithmDB.DeleteRecord(algorithmId);
  1650. const nlohmann::json delformulas = {
  1651. {u8"algorithmId", algorithmId},
  1652. {u8"equipId", equipId},
  1653. };
  1654. dels.push_back(delformulas);
  1655. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-DEL] Delete algorithmId:%s, equipId:%s,", algorithmId.c_str(), equipId);
  1656. }
  1657. }
  1658. cmnproto_log_debug(m_handle.native(), "[ALGORITHM-DEL] mapFormulasInfo count:%d", m_mapFormulasInfo.size());
  1659. const nlohmann::json data = {
  1660. {u8"deviceSN", strDeviceSN},
  1661. {u8"formulas", dels},
  1662. };
  1663. pack_send_mqtt_reply(topic_map_["algorithm_del"].get_subscribe_topic(), u8"CMD_ALGORITHM_DEL", data);
  1664. }
  1665. long DemoProtocol::get_time_from_string(const std::string dataTime)
  1666. {
  1667. long decimalTime = 0;
  1668. std::string timestamp;
  1669. if(dataTime[1] == '0')
  1670. {
  1671. timestamp = dataTime.substr(4, 8); // 获取时间戳数据
  1672. }
  1673. else if (dataTime[1] == '1')
  1674. {
  1675. timestamp = dataTime.substr(11, 8); // 获取时间戳数据
  1676. }
  1677. decimalTime = std::stoi(timestamp, nullptr, 16);
  1678. return decimalTime;
  1679. }
  1680. void DemoProtocol::subscribe_topic()
  1681. {
  1682. for (auto iter = topic_map_.begin(); iter != topic_map_.end(); ++iter)
  1683. {
  1684. auto value = iter->second;
  1685. auto key = iter->first;
  1686. if (value.hasReply_)
  1687. {
  1688. //< 链接一建立,就订阅消息
  1689. Cmnpp::IFace::SubPubMessage msg;
  1690. msg.cmd(cmniface_msgbody_command_subscribe);
  1691. //< 数据
  1692. msg.data("");
  1693. auto topic = value.is_gateway_active_ ? value.get_subscribe_topic() : value.get_publish_topic();
  1694. msg.topic(topic);
  1695. m_handle.send(m_currentLink, msg);
  1696. cmnproto_log_debug(m_handle.native(), "Subcribe Topic:%s", topic.c_str());
  1697. }
  1698. }
  1699. }
  1700. // 获取文件内容
  1701. bool DemoProtocol::readLinesFromFile(
  1702. const memepp::native_string& filePath,
  1703. int startLine, int numLines, uint64_t& totalLines, std::vector<nlohmann::json>& result)
  1704. {
  1705. std::ifstream file(filePath, std::ios::in );
  1706. file.imbue(std::locale("chs"));
  1707. if (!file.is_open()) {
  1708. std::cerr << "Open file failed: " << filePath.c_str() << std::endl;
  1709. return false;
  1710. }
  1711. totalLines = 0;
  1712. result.clear();
  1713. std::string line;
  1714. while (std::getline(file, line)) {
  1715. totalLines++;
  1716. if (totalLines >= startLine && numLines > 0) {
  1717. auto info = parseLog(line);
  1718. result.push_back(info);
  1719. numLines--;
  1720. }
  1721. // 行数超过文件总行数,提前退出循环
  1722. if (file.eof()) {
  1723. break;
  1724. }
  1725. }
  1726. if (file.fail() && !file.eof()) {
  1727. std::cerr << "Read File error" << std::endl;
  1728. file.close();
  1729. return false;
  1730. }
  1731. file.close();
  1732. return true;
  1733. }
  1734. // 该函数是插件的退出函数,该函数会被插件加载器调用
  1735. CMN_PLUGIN_API int64_t cmnplugin_exit()
  1736. {
  1737. return CMNERRNO_OK;
  1738. }
  1739. // 该函数是插件的入口函数,该函数会被插件加载器调用
  1740. CMN_PLUGIN_API int64_t cmnplugin_initialize(
  1741. const cmnplugin_init_params* _params, rsize_t _structSize, cmnplugin_exit_func_t* _func)
  1742. {
  1743. Cmnpp::IProtocolRegistrar registrar(_params, _structSize);
  1744. auto ret = registrar.initialize();
  1745. if (ret < 0)
  1746. return CMNERRNO_ERROR;
  1747. ret = registrar.registerObject<DemoProtocol>("NW-MQTT");
  1748. if (ret < 0)
  1749. return CMNERRNO_ERROR;
  1750. *_func = cmnplugin_exit;
  1751. return 0;
  1752. }