12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027 |
- #include "DemoProtocol.hpp"
- #include "CommonFunction.hpp"
- #include "Functions.hpp"
- #include <cmn/proto/params.h>
- #include <cmn/proto/ui/option_list.h>
- #include <megopp/util/scope_cleanup.h>
- #include <cmn/plugin_framework/plugin.h>
- #include <Cmnpp/Link.h>
- #include <chrono>
- #include <Cmnpp/IFace/SubPubMessage.h>
- #include <Cmnpp/Proto/Point.h>
- #include <Cmnpp/Forward/Point.h>
- #include <string>
- #include "cmn/proto/log.h"
- #include <memepp/convert/common_def.hpp>
- #include <memepp/convert/std/wstring.hpp>
- #include <memepp/convert/std/string.hpp>
- #include <memepp/convert/self.hpp>
- #include <memepp/convert/fmt.hpp>
- #include <memepp/native.hpp>
- #include <mego/hardware/memory/memory.h>
- #include <mego/util/os/windows/windows_simplify.h>
- #include <mego/hardware/disk/disk.h>
- #include "sm4.hpp"
- #include <spdlog/spdlog.h>
- #include <spdlog/async.h>
- #include <spdlog/sinks/daily_file_sink.h>
- #include <fmt/format.h>
- #define DEBUG
- using namespace std;
- static int testindex = 0;
- #define TIME_1_SECOND 1000
- #define TIME_2_SECOND 2000
- #define TIME_5_MINUTES 300000 //5*60*1000
- #define MAX_DATA_LEN 160000
- #define INVAILD_SEND_OL_INDEX 0xFFFFFFFFFFFFFFFF
- #if MG_OS__WIN_AVAIL
- static float __CalculateCPULoad(unsigned long long idleTicks, unsigned long long totalTicks)
- {
- static unsigned long long _previousTotalTicks = 0;
- static unsigned long long _previousIdleTicks = 0;
- unsigned long long totalTicksSinceLastTime = totalTicks - _previousTotalTicks;
- unsigned long long idleTicksSinceLastTime = idleTicks - _previousIdleTicks;
- float ret = 1.0f - ((totalTicksSinceLastTime > 0) ? ((float)idleTicksSinceLastTime) / totalTicksSinceLastTime : 0);
- _previousTotalTicks = totalTicks;
- _previousIdleTicks = idleTicks;
- return ret;
- }
- #endif
- #if MG_OS__WIN_AVAIL
- static unsigned long long __FileTimeToInt64(const FILETIME& ft)
- {
- return (((unsigned long long)(ft.dwHighDateTime)) << 32) | ((unsigned long long)ft.dwLowDateTime);
- }
- #endif
- void DemoProtocol::parameters_get(cmnproto_params* _param)
- {
- cmnproto_params_set(
- _param,
- u8"MQTT",
- CMNPROTO_VER, //< SDK版本
- MEGO__MAKE_VERSION_NUMBER(0, 0, 1), //< 本协议驱动版本
- cmnenum_duplex_full, //< 指定协议驱动的工作模式为全双工
- cmnenum_data_process_forward, //< 指定协议驱动的数据处理模式为采集
- cmnenum_linkmode_network, //< 指定协议驱动的连接模式为网络连接
- cmnenum_linktype_mqttv3, //< 指定协议驱动的连接类型为MQTT
- cmnenum_linkquantity_singleline_singlelink, //< 指定协议驱动的连接数量为单通道单连接
- u8"南网网关MQTT协议"
- );
- cmnproto_params_forwardpage_append(
- _param,
- "PAGE1",
- u8"遥测",
- NULL,
- NULL
- );
- auto optlist1 = cmnproto_ui_optlist__create_v2(
- "PAGE2PARAM1",
- u8"转发页的2参数1",
- u8"转发页的2参数1的描述",
- 0,
- 0
- );
- MEGOPP_UTIL__ON_SCOPE_CLEANUP([&] { cmnproto_ui_optlist_destruct(&optlist1); });
- // 为转发页2里的转发变量添加参数
- cmnproto_params_forwardpage_append(
- _param,
- "PAGE2",
- u8"遥信",
- NULL,
- optlist1
- );
- // 设置参数
- auto optList = cmnproto_ui_optlist__create_by_str_v2(
- u8"PLAT_UID",
- u8"平台授权用户ID",
- u8"云平台提供的授权用户ID",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"PLAT_PW",
- u8"平台授权用户密码",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"SM2_PUB_KEY",
- u8"平台公钥SM2",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"SM4_SECRTY_KEY",
- u8"平台SM4密钥",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"STATION_ID",
- u8"站点ID",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"STATION_NICK",
- u8"站点昵称",
- u8"用于点位桥协议,yw、fr、lb",
- u8"");
-
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"deviceSN",
- u8"设备编号",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"deviceName",
- u8"设备名称",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"deviceType",
- u8"设备类型",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"model",
- u8"产品型号",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"location",
- u8"安装位置描述",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"version",
- u8"程序版本号",
- u8"",
- u8"");
- cmnproto_ui_optlist__append_by_str_v2(
- optList,
- u8"ENABLE_NTP",
- u8"平台校时",
- u8"1:启用 0: 禁用",
- u8"1");
- MEGOPP_UTIL__ON_SCOPE_CLEANUP([&optList] { cmnproto_ui_optlist_destruct(&optList); });
- cmnproto_params_lineopt_append(
- _param,
- u8"通信参数",
- optList
- );
- // 该协议支持的TCP链接类型
- cmnproto_params_support_linkmode_append(_param, cmnenum_linkmode_network, cmnenum_linktype_tcp_client);
- cmnproto_params_support_linkmode_append(_param, cmnenum_linkmode_network, cmnenum_linktype_tcp_server);
- }
- cmnec_t DemoProtocol::initialize_cb(cmnproto_handle _handle)
- {
- m_handle = _handle;
- cmnproto_log_debug(m_handle.native(), "Seq 1");
- //初始化历史数据存储文件
- auto folderTmp = m_handle.lineDataStoreDirectoryPath();
- cmnproto_log_debug(m_handle.native(), "Seq 2");
- auto folderPath = mm_to<memepp::native_string>(folderTmp);
- cmnproto_log_debug(m_handle.native(), "Seq 3");
-
- auto logDirPath = mm_to<memepp::native_string>(folderTmp + memepp::string_view{ "/logs" });
- cmnproto_log_debug(m_handle.native(), "Seq 4");
- auto logFilePath = mm_to<memepp::native_string>(folderTmp + memepp::string_view{ "/logs/proto.log" });
- cmnproto_log_debug(m_handle.native(), "Seq 5");
- ghc::filesystem::create_directories(logDirPath);
- cmnproto_log_debug(m_handle.native(), "Seq 6");
-
- //spdlog::init_thread_pool(2048, 1);
- cmnproto_log_debug(m_handle.native(), "Seq 7");
- //spdlog::flush_every(std::chrono::seconds(3));
- cmnproto_log_debug(m_handle.native(), "Seq 8");
- //auto logger = spdlog::create_async<spdlog::sinks::daily_file_sink_mt>(
- // "proto", logFilePath, 0, 0, false, 30);
- cmnproto_log_debug(m_handle.native(), "Seq 9");
- //logger->set_formatter(
- // std::unique_ptr<spdlog::formatter>(new spdlog::pattern_formatter("[%Y-%m-%d %T.%f %z] [tid %t] [%^%l%$] [%n] %v")));
- cmnproto_log_debug(m_handle.native(), "Seq 10");
- //logger_ = logger;
- cmnproto_log_debug(m_handle.native(), "Seq 11");
- gateway_status = gateway_init;
- cmnproto_log_debug(m_handle.native(), "Seq 12");
- m_handle.timerStart(100);
- cmnproto_log_debug(m_handle.native(), "Seq 13");
- // 设置每一个定时器的时间设置
- m_send_total_data_timer.set_interval(TIME_5_MINUTES);
- cmnproto_log_debug(m_handle.native(), "Seq 14");
- //m_send_total_data_timer.set_interval(10000);
- m_send_incremental_data_timer.set_interval(TIME_1_SECOND);
- cmnproto_log_debug(m_handle.native(), "Seq 15");
- auth_passive_timer_.set_interval(5 * 1000);
- cmnproto_log_debug(m_handle.native(), "Seq 16");
- m_send_offline_data_timer.set_interval(300);
- cmnproto_log_debug(m_handle.native(), "Seq 17");
- m_up_call_timer.set_interval(1000);
- cmnproto_log_debug(m_handle.native(), "Seq 18");
- service_task_timer.set_interval(5 * 1000);
- cmnproto_log_debug(m_handle.native(), "Seq 19");
- m_algorithm_task_timer.set_interval(TIME_1_SECOND);
- cmnproto_log_debug(m_handle.native(), "Seq 20");
- cmnerrno_t ret = 0;
- ret = m_handle.lineGet(m_line);
- if (ret) {
- //logger_->debug("Channel get failed! ");
- return CMNERRNO_ERROR;
- }
- currentDir = getAppDir(folderPath);
- m_StorageData.init_data_storage(folderPath,1);
- m_offline_data_send_index = INVAILD_SEND_OL_INDEX;
- m_offline_file_index = 0;
- return 0;
- }
- cmnec_t DemoProtocol::process_cb()
- {
- return CMNERRNO_OK;
- }
- void DemoProtocol::init_topic_map()
- {
- const memepp::string station_id = m_line.getOptionString("STATION_ID");
- const memepp::string station_nick = m_line.getOptionString("STATION_NICK");
- // 请求token
- topic_map_.insert(std::make_pair("token", mqtt::MqttTopic("/oauth/token", station_id.c_str())));
- // 注册
- topic_map_.insert(std::make_pair("register", mqtt::MqttTopic("/register", station_id.c_str())));
- // 校时
- topic_map_.insert(std::make_pair("timecheck", mqtt::MqttTopic("/timecheck", station_id.c_str())));
- // 实时上传
- topic_map_.insert(std::make_pair("data_up", mqtt::MqttTopic("/data/up", station_id.c_str(), false)));
- // 补发上传
- topic_map_.insert(std::make_pair("data_compensation",mqtt::MqttTopic("/data/up-compensation", station_id.c_str(), false)));
- // 总召
- topic_map_.insert(std::make_pair("up_call", mqtt::MqttTopic("/data/up-call", station_id.c_str(), true, false)));
- // 设备查询
- topic_map_.insert(std::make_pair("status", mqtt::MqttTopic("/command/status", station_id.c_str(), true, false)));
- // 日志
- topic_map_.insert(std::make_pair("log", mqtt::MqttTopic("/command/log", station_id.c_str(), true, false)));
- // 算法下发
- topic_map_.insert(std::make_pair("algorithm_send", mqtt::MqttTopic("/command/algorithm_send", station_id.c_str(), true, false)));
- // 算法报告上送
- topic_map_.insert(std::make_pair("algorithm_data", mqtt::MqttTopic("/report/algorithm_data", station_id.c_str(), false)));
- // 阈值算法移除
- topic_map_.insert(std::make_pair("algorithm_del", mqtt::MqttTopic("/command/algorithm_del", station_id.c_str(), true, false)));
- // 点位桥
- topic_map_.insert(std::make_pair("bridge", mqtt::MqttTopic("bridge", station_nick.c_str())));
- }
- //离线信号
- cmnec_t DemoProtocol::terminate_cb()
- {
-
- return 0;
- }
- cmnerrno_t DemoProtocol::timer_cb()
- {
- if (auth_passive_timer_.timing_continue(100))
- return 0;
- if (service_task_timer.timing_continue(100))
- return 0;
- if (m_send_total_data_timer.timing_continue(100))
- return 0;
- if (m_send_incremental_data_timer.timing_continue(100))
- return 0;
-
- if (m_up_call_timer.timing_continue(100))
- return 0;
- if (m_send_offline_data_timer.timing_continue(100))
- return 0;
- if (m_algorithm_task_timer.timing_continue(100))
- return 0;
- return CMNERRNO_OK;
- }
- cmnec_t DemoProtocol::link_new_cb(cmnlink_ref _link)
- {
- cmnproto_log_debug(m_handle.native(), "MQTT Channel connected!");
- m_currentLink = _link;
- // 初始化topic列表
- init_topic_map();
- // 订阅topic列表
- subscribe_topic();
- // 定时发送认证
- auth_passive_timer_.start();
- auth_passive_timer_.on([this]
- {
- auto now = getNow();
- if (m_token.empty() || last_get_token_timestamp == 0 || ((now - last_get_token_timestamp) > (1000*60*20)))
- {
- pack_send_token();
- }
- else
- {
- if (m_device_sn.empty())
- {
- pack_send_register();
- }
- }
- });
- service_task_timer.start();
- service_task_timer.on([this]
- {
- run_service_task();
- });
- return 0;
- }
- cmnec_t DemoProtocol::link_del_cb(cmnlink_ref _link)
- {
- if (m_currentLink == _link)
- {
- //m_send_offline_data_timer.cancel();
- m_token = "";
- m_device_sn = "";
- gateway_status = gateway_offline;
- m_StorageData.create_offline_data_file();
- cmnproto_log_debug(m_handle.native(), "Gateway offline");
- m_currentLink.reset();
- }
- return 0;
- }
- cmnec_t DemoProtocol::message_cb_response(memepp::string_view topic, memepp::string_view data)
- {
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), data.to_string().c_str());
- nlohmann::json data_json = nlohmann::json::parse(data);
- auto code = data_json["code"].get<int>();
- auto msg = data_json["msg"].get<std::string>();
- auto mid = std::to_string(data_json["mid"].get<uint64_t>()) ;
- auto type = data_json["type"].get<std::string>();
- // 获取当前缓存中的对应mid
- if (mid_map_.find(type) == mid_map_.end())
- {
- return CMNERRNO_OK;
- }
- if (mid_map_[type] != mid)
- {
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== this mid= %s, that:%s ", type.c_str(), mid_map_[type].c_str(),mid.c_str());
- return CMNERRNO_OK;
- }
- // 移除mid
- mid_map_.erase(type);
- if (code != 200)
- {
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== Message Error,Error Code:%d, Tip:%s ", topic.to_string().c_str(), code,msg.c_str());
- return CMNERRNO_ERROR;
- }
- auto param = data_json["param"].get<std::string>();
- // 解密
- auto decryptParam = sm4::sm4decodehexstr(param, std::string(sm4_key_screct));
- #ifdef DEBUG
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), data.to_string().c_str());
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<== %s ", topic.to_string().c_str(), decryptParam.c_str());
- #else
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]<==", topic.to_string().c_str());
- #endif
- nlohmann::json params_json = nlohmann::json::parse(decryptParam);
- if (topic.to_string() == topic_map_["token"].get_subscribe_topic())
- {
- unpack_data_token(params_json);
- }
- else if (topic.to_string() == topic_map_["register"].get_subscribe_topic())
- {
- unpack_data_register(params_json);
- }
- else if (topic.to_string() == topic_map_["timecheck"].get_subscribe_topic())
- {
- unpack_data_ntp(params_json);
- }
- return CMNERRNO_OK;
- }
- cmnec_t DemoProtocol::message_cb_request(memepp::string_view topic, memepp::string_view data)
- {
- nlohmann::json data_json = nlohmann::json::parse(data.begin(), data.end());
- auto mid = std::to_string(data_json["mid"].get<uint64_t>()) ;
- auto type = data_json["type"].get<std::string>();
- // 缓存mid
- mid_map_[type] = mid;
- // 解密param
- auto param = data_json["param"];
- // 解密
- auto decryptParam = sm4::sm4decodehexstr(param, std::string(sm4_key_screct));
- if(decryptParam.empty())
- {
- decryptParam = "{}";
- }
- #ifdef DEBUG
- cmnproto_log_debug(m_handle.native(), "MQTT [%s] <= %s ", topic.to_string().c_str(), decryptParam.c_str());
- #else
- cmnproto_log_debug(m_handle.native(), "MQTT [%s] <= %s ", topic.to_string().c_str(), decryptParam.c_str());
- #endif
- nlohmann::json params_json = nlohmann::json::parse(decryptParam);
- // 针对不同的topic进行处理
- // 平台总召
- if (topic.to_string() == topic_map_["up_call"].get_publish_topic())
- {
- unpack_data_up_all(params_json);
- }
- // 平台发起状态查询
- else if (topic.to_string() == topic_map_["status"].get_publish_topic())
- {
- unpack_query_machine_status(params_json);
- }
- // 平台发起日志查询
- else if (topic.to_string() == topic_map_["log"].get_publish_topic())
- {
- unpack_query_log(params_json);
- }
- // 算法下发
- else if (topic.to_string() == topic_map_["algorithm_send"].get_publish_topic())
- {
- unpack_algorithm_data(params_json);
- }
- else if (topic.to_string() == topic_map_["algorithm_del"].get_publish_topic())
- {
- unpack_algorithm_delete(params_json);
- }
- return CMNERRNO_OK;
-
- }
- cmnec_t DemoProtocol::message_cb(cmnlink_ref _link, cmniface_msgbody_const _msg)
- {
- auto msg = Cmnpp::IFace::SubPubMessage::Ref(_msg);
- auto data = msg.dataView(); // 接收到的数据,其生命周期与msg相同
- auto topic = msg.topicView(); // 主题,其生命周期与msg
- //根据不同的topic,解析不同的平台下发报文
- try
- {
- // 网关发布,平台响应的
- if (topic.contains("reply"))
- {
- return this->message_cb_response(topic, data);
- }
- // 点位桥
- else if (topic.to_string() == topic_map_["bridge"].get_subscribe_topic()) {
- cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]<= %s ", topic.to_string().c_str(), data.to_string().c_str());
- this->unpack_bridge_data(data);
- return CMNERRNO_OK;
- }
- // 平台发布,网关响应的
- else
- {
- return this->message_cb_request(topic, data);
- }
-
- }
- catch (nlohmann::json::parse_error& e)
- {
- Debug_Print_Error(e.what());
- }
- return CMNERRNO_OK;
- }
- // 发送mqtt消息
- void DemoProtocol::pack_send_mqtt(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data)
- {
- auto time_stamp = getNow();
- auto mid = idGenerator.nextId();
- nlohmann::json request_data = {
- {u8"mid", mid},
- {u8"date", time_stamp},
- {u8"type", type.c_str()},
- {u8"stationId", m_line.getOptionString("STATION_ID").c_str()}
- };
- // 如果当前是注册,就采用国密2进行加密
- std::string todo_sign_str;
- if (type == "EVENT_PROVE")
- {
- char encrypted_str[5120] = {'\0'};
- const memepp::string sm2_pub_key = m_line.getOptionString("SM2_PUB_KEY");
- Functions::util_sm2_encrypt(data.dump().c_str(), sm2_pub_key.c_str(), encrypted_str);
- request_data[u8"param"] = encrypted_str;
- todo_sign_str.append(type.c_str()).append(request_data[u8"param"].get<std::string>()).append(std::to_string(time_stamp));
- }
- // 国密4加密
- else
- {
- std::string encryptString = sm4::sm4encodestrhex(data.dump(), std::string(sm4_key_screct));
- //char encrypted_str[5120] = { '\0' };
- // Functions::util_sm4_encrypt(data.dump().c_str(), sm4_key_screct, encrypted_str);
- request_data[u8"param"] = encryptString.c_str();
- request_data[u8"token"] = m_token.c_str();
- todo_sign_str.append(m_token).append(type.c_str()).append(request_data[u8"param"].get<std::string>()).append(std::to_string(time_stamp));
-
- }
- // 缓存mid
- mid_map_[type.c_str()] = std::to_string(mid);
-
- char sign[65];
- Functions::sm3_sign((uint8_t*)(todo_sign_str.c_str()), static_cast<int>(todo_sign_str.length()), sign);
- const std::string cppString(sign);
- request_data["sign"] = cppString.c_str();
- // 发送token获取
- Cmnpp::IFace::SubPubMessage msg;
- msg.cmd(cmniface_msgbody_command_publish);
- msg.data(mm_view(request_data.dump().c_str(), request_data.dump().length())); //< 数据
- msg.topic(topic);
- m_handle.send(m_currentLink, msg);
- #ifdef DEBUG
- cmnproto_log_debug(m_handle.native(), "MQTT [%s] => %s ", topic.c_str(), request_data.dump().c_str());
- #else
- cmnproto_log_debug(m_handle.native(), "MQTT [%s] => ", topic.c_str());
- #endif
- }
- void DemoProtocol::pack_send_mqtt_reply(const memepp::string& topic, const memepp::string& type,
- const nlohmann::json& data)
- {
- auto time_stamp = getNow();
- nlohmann::json request_data = {};
- request_data["mid"] = mid_map_[type.c_str()];
- request_data["date"] = time_stamp;
- request_data["type"] = type.c_str();
- request_data["stationId"] = m_line.getOptionString("STATION_ID").c_str();
- request_data["msg"] = "success";
- request_data["code"] = 200;
- auto originString = data.dump();
- const std::string encryptString = sm4::sm4encodestrhex(originString, std::string(sm4_key_screct));
- request_data[u8"param"] = encryptString.c_str();
- // 缓存mid
- mid_map_.erase(type.c_str());
- // 计算签名值
- std::string todo_sign_str;
- todo_sign_str.append(type.c_str())
- .append("200")
- .append(request_data[u8"msg"].get<std::string>())
- .append(request_data[u8"param"].get<std::string>())
- .append(std::to_string(time_stamp));
- char sign[65];
- Functions::sm3_sign((uint8_t*)(todo_sign_str.c_str()), static_cast<int>(todo_sign_str.length()), sign);
- const std::string cppString(sign);
- request_data["sign"] = cppString.c_str();
- // 发送token获取
- Cmnpp::IFace::SubPubMessage msg;
- msg.cmd(cmniface_msgbody_command_publish);
- msg.data(mm_view(request_data.dump().c_str(), request_data.dump().length())); //< 数据
- msg.topic(topic);
- m_handle.send(m_currentLink, msg);
- #ifdef DEBUG
- cmnproto_log_debug(m_handle.native(), "MQTT [%s]=> %s ", topic.c_str(), request_data.dump().c_str());
- #else
- cmnproto_log_debug(m_handle.native(), "MQTT [%s] => ", topic.c_str());
- #endif
- }
- //发送二进制格式数据
- void DemoProtocol::pack_send_mqtt_binary(const memepp::string& topic,const uint8_t* buf, size_t len)
- {
- auto data = mm_view(buf, len);
- Cmnpp::IFace::SubPubMessage msg;
- msg.cmd(cmniface_msgbody_command_publish);
- msg.data(data); //< 数据
- msg.topic(topic);
- m_handle.send(m_currentLink, msg);
- }
- //发送二进制格式数据
- void DemoProtocol::pack_send_mqtt_text(const memepp::string& topic, const std::string &text)
- {
- auto data = mm_view(text.c_str(), text.length());
- Cmnpp::IFace::SubPubMessage msg;
- msg.cmd(cmniface_msgbody_command_publish);
- msg.data(data); //< 数据
- msg.topic(topic);
- m_handle.send(m_currentLink, msg);
- #ifdef DEBUG
- cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]=> %s ", topic.c_str(), text.c_str());
- #else
- cmnproto_log_debug(m_handle.native(), "MQTT[TEXT] [%s]=> ", topic.c_str());
- #endif
- }
- // 发送认证
- void DemoProtocol::pack_send_token()
- {
- // 平台用户id
- const memepp::string userID = m_line.getOptionString("PLAT_UID");
- // 平台用户密码
- const memepp::string pwd = m_line.getOptionString("PLAT_PW");
- // 生成sm4密钥
- // uint8_t t[32];
- // Functions::util_rand_bytes(t, 16);
- // char str_key[33];
- // Functions::uint8_to_hex_string(t, 16, sm4_key_screct);
- // sm4_key_screct[32] = '\0';
- const memepp::string sm4SecrtyKey = m_line.getOptionString("SM4_SECRTY_KEY");
- //if (!sm4SecrtyKey.empty()) {
- // memcpy(sm4_key_screct, sm4SecrtyKey.c_str(), 32);
- // sm4_key_screct[32] = '\0';
- //}
-
- const nlohmann::json data = {
- {u8"userID", userID.c_str()},
- {u8"pwd", pwd.c_str()},
- {u8"secretKey", sm4_key_screct},
- };
- pack_send_mqtt(topic_map_["token"].get_publish_topic(), u8"EVENT_PROVE", data);
- }
- /**
- * @brief 解析数据令牌
- * @param params 响应参数
- */
- void DemoProtocol::unpack_data_token(const nlohmann::json params)
- {
- // 从传入的参数中获取token并赋值给成员变量m_token
- this->m_token = params["token"].get<std::string>();
- cmnproto_log_debug(m_handle.native(), "Token get success! Token:%s ", m_token.c_str());
- auto time_stamp = getNow();
- this->last_get_token_timestamp = time_stamp;
- // std::vector<std::string> dotNums;
- // dotNums.push_back("3015");
- // 发送点位查询测试
- //this->query_send_bridge(dotNums);
- //// 发送日志查询测试
- //nlohmann::json logJson = {};
- //logJson["date"] = "2024-01-01";
- //logJson["index"] =1;
- //logJson["size"] = 50;
- //this->unpack_query_log(logJson);
- }
- //发送注册
- void DemoProtocol::pack_send_register()
- {
- const nlohmann::json data = {
- // {u8"deviceSN", m_line.getOptionString("deviceSN").c_str()},
- {u8"deviceSN", "uuhhgj-980760-88976"},
- {u8"deviceName", m_line.getOptionString("deviceName").c_str()},
- {u8"deviceType", m_line.getOptionString("deviceType").c_str()},
- {u8"model", m_line.getOptionString("model").c_str()},
- {u8"location", m_line.getOptionString("location").c_str()},
- {u8"version", m_line.getOptionString("version").c_str()},
- {u8"stationId", m_line.getOptionString("STATION_ID").c_str()}
- };
- pack_send_mqtt(topic_map_["register"].get_publish_topic(), u8"EVENT_REGISTER", data);
- }
- /**
- * @brief 解析设备注册
- * @param params 响应参数
- */
- void DemoProtocol::unpack_data_register(const nlohmann::json params)
- {
- this->m_device_sn = params["deviceSN"].get<std::string>();
- this->m_registered = true;
- // 重置业务记时器
- this->run_service_timestamp = 0;
- cmnproto_log_debug(m_handle.native(), "Device Register success device Number:%s ,Online....", m_device_sn.c_str());
- //如果之前是离线状态,则停止存储离线数据
- if (gateway_status == gateway_offline)
- {
- m_StorageData.stop_save_offline();
- }
- gateway_status = gateway_online;
- pre_real_send_data();
- // 加载存储的算法
- if (m_mapFormulasInfo.size() == 0 )
- {
- auto folderTmp = m_handle.lineDataStoreDirectoryPath();
- auto folderPath = mm_to<memepp::native_string>(folderTmp);
- load_algorithm_from_db(folderPath);
- }
- send_algorithm_report_timer_task();
- //有离线数据文件,说明曾经离线了,需要重新开始补发。
- m_offline_file_index = 0;
- m_offline_data_file_list.clear();
- m_StorageData.get_offline_flie_list(m_offline_data_file_list);
- if (m_offline_data_file_list.size() > 0)
- {
- //说明上次数据还没发送完成
- if (m_offline_data_send_index != INVAILD_SEND_OL_INDEX)
- {
- m_is_sending_cache_data = true;
- }
- pack_send_offline_data();
- }
- }
- //发送数据准备
- void DemoProtocol::pre_real_send_data()
- {
- YC_data_cache_list.clear();
- YX_data_cache_list.clear();
- //初始化实时数据变化上送缓存
- for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
- {
- auto table = (*eqptIt).source().forwardTable();
- if (!table) {
- continue;
- }
- auto page1 = table.pageGet("PAGE1");
- if (page1.pointSize() > 0)
- {
- //创建全量数据缓存,用于比较数据变化
- vector<float> vecYCData;
- YC_data_cache_list.push_back(vecYCData);
- }
- auto page2 = table.pageGet("PAGE2");
- if (page2.pointSize() > 0)
- {
- //创建全量数据缓存,用于比较数据变化
- vector<uint8_t> vecYXData;
- YX_data_cache_list.push_back(vecYXData);
- }
- }
- pack_send_real_data();
- send_data_timer_task();
- }
- //发送算法报告定时器
- void DemoProtocol::send_algorithm_report_timer_task()
- {
- m_algorithm_task_timer.start();
- m_algorithm_task_timer.on([this]
- {
- if (gateway_status == gateway_online)
- {
- pack_send_algorithm_report();
- }
- return;
- });
- }
- //发送校时
- void DemoProtocol::pack_send_ntp()
- {
- const nlohmann::json data = {
- {u8"deviceSN", m_line.getOptionString("deviceSN").c_str()}
- };
- pack_send_mqtt(topic_map_["timecheck"].get_publish_topic(), u8"EVENT_TIMECHECK", data);
- }
- // 解包校时
- void DemoProtocol::unpack_data_ntp(const nlohmann::json params)
- {
- auto recvDeviceSN = params["deviceSN"].get<std::string>();
- auto timestamp = params["timestamp"].get<std::string>();
- // 判断是否需要校时,
- auto enableNtp = m_line.getOptionString("ENABLE_NTP").c_str();
- if (strcmp(enableNtp, "1") == 0) {
- // 调用校时接口,写到网关
- cmnproto__set_system_time(m_handle.native(), atoll(timestamp.c_str()), 1);
- }
- }
- //发送全量数据定时任务
- void DemoProtocol::send_data_timer_task()
- {
- m_send_total_data_timer.start();
- m_send_total_data_timer.on([this]
- {
- pack_send_real_data();
- return;
- });
-
- m_send_incremental_data_timer.start();
- m_send_incremental_data_timer.on([this]
- {
- pack_send_incremental_data();
- return;
- });
- }
- void DemoProtocol::run_service_task()
- {
- // 校时处理,如果是跨天,即凌晨0点0分,则执行校时任务
- uint64_t day = 1000 * 60 * 60 * 24;
- auto now = getNow();
- if (!this->m_token.empty()) {
- if ((run_service_timestamp / day) != (now / day))
- {
- this->pack_send_ntp();
- m_StorageData.to_new_day();
- }
- }
- run_service_timestamp = now;
- }
- size_t DemoProtocol::cacl_real_data_buff_len(const uint16_t &yc_count, const uint16_t &yx_count)
- {
- int databuff_len = 1 + 4 + 8 + yc_count * 4 + (yx_count / 8) + ((yx_count % 8 == 0) ? 0 : 1);
- return databuff_len;
- }
- size_t DemoProtocol::cacl_inc_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count)
- {
- int databuff_len = 1 + 8 + 2 + 8+ 2+ yc_count * 8 + 2 + yx_count * 5;
- return databuff_len;
- }
- double DemoProtocol::getFileSizeInMB(const std::string& filename)
- {
- std::ifstream file(filename, std::ios::binary | std::ios::ate);
- if (!file.is_open()) {
- return -1; // 返回-1表示无法打开文件
- }
- std::streampos fileSize = file.tellg();
- file.close();
- // 转换为MB并返回
- return static_cast<double>(fileSize) / (1024 * 1024);
- }
- // 数据上报-发送实时数据,值发送一个设备
- void DemoProtocol::pack_send_real_data()
- {
- // 按HASH字典顺序获取第一个转发设备的迭代器
- size_t eqptIt_index = 0;
- for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
- {
- auto table = (*eqptIt).source().forwardTable();
- if (!table) {
- continue;
- }
- // 通过遥测转发页的KEY获取转发页
- auto page1 = table.pageGet("PAGE1");
- // 通过遥信转发页的KEY获取转发页
- auto page2 = table.pageGet("PAGE2");
- uint16_t yc_count = page1.pointSize(); //遥测总数
- uint16_t yx_count = page2.pointSize(); //遥信总数
- if (yc_count == 0 && yx_count == 0) {
- continue;
- }
- auto databuff_len = cacl_real_data_buff_len(yc_count, yx_count);
- vec_real_data_buff.resize(databuff_len);
- databuff_len = cacl_inc_real_data_buff_len(yc_count, yx_count);
- vec_inc_real_data_buff.resize(databuff_len);
- //缓存队列不为空时,需要清空缓存,存储新的数据
- if (YC_data_cache_list.size() > eqptIt_index &&
- YC_data_cache_list[eqptIt_index].size() > 0)
- {
- YC_data_cache_list[eqptIt_index].clear();
- }
- if (YX_data_cache_list.size() > eqptIt_index &&
- YX_data_cache_list[eqptIt_index].size() > 0)
- {
- YX_data_cache_list[eqptIt_index].clear();
- }
- //实时数据数组
- memset(&vec_real_data_buff[0], 0, vec_real_data_buff.size());
- size_t RD_index = 0;
- uint8_t RS_byte_value = 0; //遥信数据项,填充满8位后再给遥信数组赋值
- uint8_t RS_bit_index = 0; //标识正在填充字节的位序号
- //包头,固定为0x00
- vec_real_data_buff[RD_index++] = 0x00;
- //填充遥信和遥测数量,高位在前
- WriteDataToBufferWithNetworkByteOrder(yx_count, &vec_real_data_buff[RD_index]);
- RD_index += 2;
- WriteDataToBufferWithNetworkByteOrder(yc_count, &vec_real_data_buff[RD_index]);
- RD_index += 2;
- //数据生成时间,8字节,自 1970 年 1 月 1 日(08:00:00 GMT)至当前时间的总毫秒数
- CurrentRealDataTime = getNow();
- WriteDataToBufferWithNetworkByteOrder(CurrentRealDataTime, &vec_real_data_buff[RD_index]);
- RD_index += 8;
- // 获取该页所有转发变量
- for (auto index = 0; index < page1.pointSize(); ++index)
- {
- auto fp = page1.pointAt(index);
- if (!fp) {
- continue;
- }
- // 获取关联采集变量
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- if (!cp) {
- continue;
- }
- auto value = static_cast<float>(cp.source().processNumberValue());
- WriteDataToBufferWithNetworkByteOrder(value, &vec_real_data_buff[RD_index]);
- RD_index += 4;
- //缓存遥测数据,用于比较是否有变化
- YC_data_cache_list[eqptIt_index].push_back(value);
- }
- // 获取该页所有转发变量
- for (auto index = 0; index < page2.pointSize(); ++index)
- {
- auto fp = page2.pointAt(index);
- if (!fp) {
- continue;
- }
- // 获取关联采集变量
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- if (!cp) {
- continue;
- }
- uint8_t yx = (int8_t)cp.source().processNumberValue();
- if (yx != 0)
- {
- RS_byte_value |= (1 << RS_bit_index);
- }
- //已经填充满了8个遥信信号,填充到数组,并临时遥信数据项清零
- if (++RS_bit_index >= 8)
- {
- vec_real_data_buff[RD_index++] = RS_byte_value;
- RS_byte_value = 0;
- RS_bit_index = 0;
- }
- YX_data_cache_list[eqptIt_index].push_back(yx);
- }
- //最后一个值没计算满8个,也需要放入发送缓存
- if (RS_bit_index < 8)
- {
- vec_real_data_buff[RD_index++] = RS_byte_value;
- }
- if (gateway_status == gateway_online)
- {
- pack_send_mqtt_binary(topic_map_["data_up"].get_publish_topic(), &vec_real_data_buff[0], RD_index);
- m_StorageData.save_data_to_file(&vec_real_data_buff[0], RD_index, CurrentRealDataTime);
- cmnproto_log_debug(m_handle.native(), "Total Data,size:%d,ycCount:%d,yxCount:%d", RD_index, yc_count, yx_count);
- #ifdef DEBUG
- std::string strData = HexArrayToString(&vec_real_data_buff[0], RD_index);
- cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
- #endif
- //发送实时数据后,增量包时间窗口内的序号清零,重新开始计数
- IncDataPackIndex = 0;
- }
- else if (gateway_status == gateway_offline)
- {
- m_StorageData.save_offline_data_to_file(&vec_real_data_buff[0], RD_index);
- }
- ++eqptIt_index;
- }
- }
- // 数据上报-发送增量数据
- // 五分钟发送一个全量包,在接下来的五分钟内,定时扫描值有变化的点上送
- void DemoProtocol::pack_send_incremental_data()
- {
- try
- {
- // 按HASH字典顺序获取第一个转发设备的迭代器
- int eqptIt_index = 0;
- for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
- {
- auto eqpt = *eqptIt;
- auto table = eqpt.source().forwardTable();
- if (!table) {
- continue;
- }
- // 通过遥测转发页的KEY获取转发页
- auto page1 = table.pageGet("PAGE1");
- // 通过遥信转发页的KEY获取转发页
- auto page2 = table.pageGet("PAGE2");
- uint16_t yc_count = page1.pointSize(); //遥测总数
- uint16_t yx_count = page2.pointSize(); //遥信总数
- if (yc_count == 0 && yx_count == 0) {
- continue;
- }
- if (YC_data_cache_list.at(eqptIt_index).size() == 0
- || YX_data_cache_list.at(eqptIt_index).size() == 0)
- {
- continue;
- }
- //增量包数据数组
- memset(&vec_inc_real_data_buff[0], 0, vec_inc_real_data_buff.size());
- int RD_index = 0;
- uint16_t telemetering_count = 0; //遥测总数
- uint16_t remote_signal_count = 0; //遥信总数
- //增量包包头,固定为0x01
- vec_inc_real_data_buff[RD_index++] = 0x01;
- //时间戳,8字节,用于标识本增量包所属的全量包
- WriteDataToBufferWithNetworkByteOrder(CurrentRealDataTime, &vec_inc_real_data_buff[RD_index]);
- RD_index += 8;
- //代表时间窗口内的数据包的顺序号
- WriteDataToBufferWithNetworkByteOrder(IncDataPackIndex, &vec_inc_real_data_buff[RD_index]);
- RD_index += 2;
- ++IncDataPackIndex;
- //数据产生时间,8字节
- auto timestamp = getNow();
- WriteDataToBufferWithNetworkByteOrder(timestamp, &vec_inc_real_data_buff[RD_index]);
- RD_index += 8;
- //跳过遥测总数,统计后再填充
- uint16_t yc_count_Index = RD_index;
- RD_index += 2;
- // 获取该页所有转发变量
- auto yc_point_index = 0;
- for (auto index = 0; index < page1.pointSize(); ++index)
- {
- auto fp = page1.pointAt(index);
- if (!fp) {
- continue;
- }
- // 获取关联采集变量
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- if (!cp) {
- continue;
- }
- auto yc_value = static_cast<float> (cp.source().processNumberValue());
- if (YC_data_cache_list.at(eqptIt_index).at(yc_point_index) != yc_value)
- {
- // 点表序号
- uint16_t yc_idx = cmnforward_point_no(fp.native());
- WriteDataToBufferWithNetworkByteOrder(yc_idx, &vec_inc_real_data_buff[RD_index]);
- RD_index += 2;
- // 采集时刻,2个字节,数据产生时间”的偏移毫秒数,= 数据采集时间-数据产生时间
- int16_t collect_time = CurrentRealDataTime - timestamp;
- WriteDataToBufferWithNetworkByteOrder(collect_time, &vec_inc_real_data_buff[RD_index]);
- RD_index += 2;
- //数据项值
- WriteDataToBufferWithNetworkByteOrder(yc_value, &vec_inc_real_data_buff[RD_index]);
- RD_index += 4;
- ++telemetering_count;
- //更新缓存队列的数据
- YC_data_cache_list.at(eqptIt_index).at(yc_point_index) = yc_value;
- }
- ++yc_point_index;
- }
- //填充遥测总数
- WriteDataToBufferWithNetworkByteOrder(telemetering_count, &vec_inc_real_data_buff[yc_count_Index]);
- uint16_t yx_count_index = RD_index;
- RD_index += 2; //跳过遥信总数,统计后再填充
- // 获取该页所有转发变量
- auto yx_point_index = 0;
- for (auto index = 0; index < page2.pointSize(); ++index)
- {
- auto fp = page2.pointAt(index);
- if (!fp) {
- continue;
- }
- // 获取关联采集变量
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- if (!cp) {
- continue;
- }
- uint8_t yx_value = (uint8_t)cp.source().processNumberValue();
- if (YX_data_cache_list.at(eqptIt_index).at(yx_point_index) != yx_value)
- {
- // 点表序号
- uint16_t yx_idx = cmnforward_point_no(fp.native());
- WriteDataToBufferWithNetworkByteOrder(yx_idx, &vec_inc_real_data_buff[RD_index]);
- RD_index += 2;
- // 采集时刻,2个字节,数据产生时间”的偏移毫秒数,= 数据采集时间*1000 - 数据产生时间
- int16_t collect_time = CurrentRealDataTime - timestamp;
- WriteDataToBufferWithNetworkByteOrder(collect_time, &vec_inc_real_data_buff[RD_index]);
- RD_index += 2;
- if (yx_value != 0)
- {
- vec_inc_real_data_buff[RD_index] |= (1 << 7);
- }
- RD_index += 1;
- ++remote_signal_count;
- //更新缓存队列的数据
- YX_data_cache_list.at(eqptIt_index).at(yx_point_index) = yx_value;
- }
- ++yx_point_index;
- }
- //填充遥信总数
- WriteDataToBufferWithNetworkByteOrder(remote_signal_count, &vec_inc_real_data_buff[yx_count_index]);
- //数据有变化,开始发送增量数据
- if (remote_signal_count > 0 || telemetering_count > 0)
- {
- if (gateway_status == gateway_online)
- {
- pack_send_mqtt_binary(topic_map_["data_up"].get_publish_topic(), &vec_inc_real_data_buff[0], RD_index);
- cmnproto_log_debug(m_handle.native(), "Inc Data,size:%d,ycCount:%d,yxCount:%d", RD_index, telemetering_count, remote_signal_count);
- #ifdef DEBUG
- std::string strData = HexArrayToString(&vec_inc_real_data_buff[0], RD_index);
- cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
- #endif
- }
- else if (gateway_status == gateway_offline)
- {
- m_StorageData.save_offline_data_to_file(&vec_inc_real_data_buff[0], RD_index);
- }
- //TEST
- //++testindex;
- //if (testindex == 10)
- //{
- // gateway_status = gateway_offline;
- // m_StorageData.create_offline_data_file();
- // cmnproto_log_debug(m_handle.native(), "Gateway offline test");
- //}
- //else if (testindex == 100)
- //{
- // nlohmann::json data = {
- // {u8"deviceSN", "1"},
- // };
- // unpack_data_register(data);
- //}
- //else if (testindex == 200)
- //{
- // testindex = 0;
- //}
- //else if (index == 20)
- //{
- // nlohmann::json data1 = {
- // {u8"callid", 1},
- // {u8"starttime", 1704301950560},
- // {u8"endtime", 1704381847444}
- // };
- // unpack_data_up_all(data1);
- //}
- }
- eqptIt_index++;
- }
- }
- catch (const std::exception& errorinfo)
- {
- cmnproto_log_debug(m_handle.native(), "Send inc data exception:%s, count: %d, index: %d", errorinfo.what());
- }
- }
- void DemoProtocol::pack_send_offline_data()
- {
- m_send_offline_data_timer.start();
- m_send_offline_data_timer.on([this]
- {
- //没有文件或者,已经读取完最后一个文件,或者离线状态,不发送补发数据
- if (m_offline_file_index >= m_offline_data_file_list.size()
- || gateway_status == gateway_offline)
- {
- return;
- }
- if (m_offline_data_send_index == INVAILD_SEND_OL_INDEX)//发送缓存的数据发送完了
- {
- //从离线文件列表中获取离线文件名,读取离线数据
- m_offline_data_cache.clear();
- auto strFilePath = m_offline_data_file_list[m_offline_file_index];
- m_StorageData.read_offline_data(strFilePath, m_offline_data_cache);
- auto cache_data_count = m_offline_data_cache.size();
- cmnproto_log_debug(m_handle.native(), "read offline file:%s,count:%d", strFilePath.string(), cache_data_count, m_offline_data_send_index);
- if (cache_data_count > 0)
- {
- m_offline_data_send_index = 0;//发送序号重新0开始
- m_offline_compensationId = getNow();
- uint64_t starttime = get_timestamp_from_vec(&m_offline_data_cache[0][0], m_offline_data_cache[0].size());
- uint64_t endtime = get_timestamp_from_vec(&m_offline_data_cache[cache_data_count - 1][0], m_offline_data_cache[cache_data_count - 1].size());
- const nlohmann::json data = {
- {u8"compensationid", m_offline_compensationId},
- {u8"starttime", starttime},
- {u8"endtime", endtime},
- {u8"size", cache_data_count},
- {u8"type",0}
- };
- pack_send_mqtt(topic_map_["data_compensation"].get_publish_topic(), u8"0", data);
- }
- else
- {
- //当前文件为空,读下一个文件
- ++m_offline_file_index;
- }
- }
- else
- {
- auto cache_data_count = m_offline_data_cache.size();
- if (m_offline_data_send_index < cache_data_count)
- {
- size_t data_len = m_offline_data_cache[m_offline_data_send_index].size();
- uint8_t data_arr[MAX_DATA_LEN] = { 0 };
- std::copy(m_offline_data_cache[m_offline_data_send_index].begin(), m_offline_data_cache[m_offline_data_send_index].end(), data_arr);
- pack_send_mqtt_binary(topic_map_["data_compensation"].get_publish_topic(), data_arr, data_len);
- cmnproto_log_debug(m_handle.native(), "Send offline data size: %d, count: %d, index: %d", data_len, cache_data_count, m_offline_data_send_index);
- #ifdef DEBUG
- std::string strData = HexArrayToString(data_arr, data_len);
- cmnproto_log_debug(m_handle.native(), "data: %s", strData.c_str());
- #endif
- //一个离线文件发送完了,到下一个文件。
- if (++m_offline_data_send_index >= cache_data_count)
- {
- m_offline_data_send_index = INVAILD_SEND_OL_INDEX;
- if (m_is_sending_cache_data == false)
- {
- ++m_offline_file_index;
- }
- else //如果发送的是缓存数据,文件需要不增加
- {
- m_is_sending_cache_data = true;
- }
- }
- }
- }
- });
- }
- // 发送总召数据,定时器调用
- void DemoProtocol::pack_send_up_call()
- {
- m_up_call_timer.start();
- m_up_call_timer.on([this]
- {
- if (m_upcall_send_index < m_his_data_file_list.size())
- {
- std::vector<uint8_t> historyData;
- m_StorageData.read_history_data(m_his_data_file_list[m_upcall_send_index], historyData);
- auto data_len = historyData.size();
- pack_send_mqtt_binary(topic_map_["up_call"].get_publish_topic(), &historyData[0], data_len);
- cmnproto_log_debug(m_handle.native(), "Send up call data:%d", data_len);
- #ifdef DEBUG
- std::string strData = HexArrayToString(&historyData[0], data_len);
- cmnproto_log_debug(m_handle.native(), "data:%s", strData.c_str());
- #endif
- ++m_upcall_send_index;
- }
- else
- {
- m_upcall_send_index = 0;
- m_up_call_timer.cancel();
- }
- });
- }
- void DemoProtocol::query_send_bridge(std::vector<std::string> dotNumbers)
- {
- nlohmann::json dots = {};
- // 将所有的点号加进入
- for (std::vector<std::string>::iterator it = dotNumbers.begin(); it != dotNumbers.end(); ++it) {
- dots[*it] = "%{" + *it + "}";
- }
- const memepp::string station_id = m_line.getOptionString("STATION_ID");
- const auto sendText = station_id.c_str() + dots.dump();
- this->pack_send_mqtt_text(topic_map_["bridge"].get_publish_topic(), sendText);
- }
- void DemoProtocol::unpack_bridge_data(memepp::string_view data)
- {
- // 只有是当前token才处理
- const memepp::string station_id = m_line.getOptionString("STATION_ID");
- if (data.starts_with(station_id)) {
- auto jsonStr = data.substr(station_id.size());
- // 将剩下的内容转换为json
- nlohmann::json jsonObj = nlohmann::json::parse(jsonStr);
- for (const auto& entry : jsonObj.items()) {
- auto value = jsonObj[entry.key()].get<std::string>();
- if (!value.empty())
- {
- value = removeBraces(value);
- }
- point_bridge_map_[entry.key()] = value;
- cmnproto_log_debug(m_handle.native(), "bridge data: %s = %s", entry.key().c_str(), value.c_str());
- }
- }
- }
- // 解析总召指令
- void DemoProtocol::unpack_data_up_all(const nlohmann::json params)
- {
- auto callid = params["callid"].get<uint64_t>(); //总召标记,时间戳
- auto starttime = params["starttime"].get<uint64_t>(); //数据开始时间戳
- auto endtime = params["endtime"].get<uint64_t>(); //时间结束时间戳
- m_his_data_file_list.clear();
- //从文件系统读取数据
- m_StorageData.get_history_file_list(starttime, endtime, m_his_data_file_list);
- long vecSize = m_his_data_file_list.size();
- if (vecSize > 0)
- {
- auto timeMSCount = getNow();
- // 发送应答数据开始包
- const nlohmann::json data = {
- {u8"replyid", timeMSCount},
- {u8"callid", callid},
- {u8"starttime", starttime},
- {u8"endtime", endtime},
- {u8"size", vecSize},
- {u8"type",0},
- };
- pack_send_mqtt(topic_map_["up-call"].get_publish_topic(), u8"0", data);
- m_upcall_send_index = 0;
- pack_send_up_call();
- }
- }
- // 解包设备状态应答
- void DemoProtocol::unpack_query_machine_status(const nlohmann::json params)
- {
- // 内存利用率
- double memRatio = 0;
- mghw_memory_status memStatus = { sizeof(mghw_memory_status), 0 };
- if (mghw_get_memory_status(&memStatus) == 0) {
- // 计算内存使用率
- memRatio = (((double)(memStatus.total_physical - memStatus.available_physical)) / memStatus.total_physical) * 100;
-
- }
- // cpu利用率
- double cpuRatio = 0;
- {
- #ifdef COMM_OS_WINDOWS
- FILETIME idleTime, kernelTime, userTime;
- cpuRatio = (GetSystemTimes(&idleTime, &kernelTime, &userTime) ?
- __CalculateCPULoad(__FileTimeToInt64(idleTime), __FileTimeToInt64(kernelTime) + __FileTimeToInt64(userTime)) : 1.0f) * 100.0;
- #else
- char buf[256] = {0};
- char name[64] = {0};
- FILE* fd = fopen("/proc/stat","r");
- if (fd) {
- }
- uint32_t user = 0, nice = 0, system = 0, idle = 0;
- fgets(buf,sizeof(buf), fd);
- fclose(fd);
- if (sscanf(buf, "%s %u %u %u %u", name, &user, &nice, &system, &idle) == 5) {
- cpuRatio = (1 - (static_cast<double>(idle) / static_cast<double>(user + nice + system + idle))) * 100;
- }
- #endif
- }
- // 磁盘利用率
- double diskRatio = calc_disk_ratio();
- const nlohmann::json data = {
- {u8"cpuUsage", cpuRatio}, //CPU使用率
- {u8"memUsage", memRatio}, //内存使用率
- {u8"storageUsage", diskRatio}, //存储使用率
- };
- pack_send_mqtt_reply(topic_map_["status"].get_subscribe_topic(), u8"CMN_MACHINE_STATUS", data);
- }
- //计算磁盘利用率,最大值100
- double DemoProtocol::calc_disk_ratio()
- {
- mghw_harddisk_freespace hardDiskFreeSpace = { sizeof(mghw_harddisk_freespace), 0 };
- mghw_get_harddisk_freespace_by_path(m_handle.lineDataStoreDirectoryPath().c_str(), -1, &hardDiskFreeSpace);
- auto freeHardDiskMb = hardDiskFreeSpace.free / (1024 * 1024);
- double diskRatio = (((double)hardDiskFreeSpace.total - (double)hardDiskFreeSpace.free) / (double)hardDiskFreeSpace.total) * 100;
- return diskRatio;
- }
- // 查询指令-日志查询
- void DemoProtocol::unpack_query_log(const nlohmann::json params)
- {
- // MYS
- auto strDate = params["date"].get<std::string>(); //查询日期,格式yyyy-MM-dd
- auto index = params["index"].get<int>(); //当前页数(从1开始)
- auto size = params["size"].get<int>(); //每页条数
- // 获取文件路径
- //auto runDir = getRunDir();
- // 当前日志文件
- std::string fileName("/proto_" + strDate + ".log");
- // 完整路径
- //std::string fullPath;
- //char separator = fs::path::preferred_separator;
- //fullPath.append(std::string(currentDir.begin(), currentDir.end())).append("logs").append(fileName);
- // 日志总行数
- uint64_t totalCount = 0;
- // 读取结果
- std::vector<nlohmann::json> lines;
- // 读取日志文件
- auto dirPath = m_handle.lineDataStoreDirectoryPath();
- auto fullPath = mm_from(fmt::format("{}/logs/{}", dirPath, fileName));
- readLinesFromFile(mm_to<memepp::native_string>(fullPath), (index - 1) * size, size, totalCount, lines);
- const nlohmann::json data = {
- {u8"total", totalCount},
- {u8"logs", lines},
- };
- pack_send_mqtt_reply(topic_map_["log"].get_subscribe_topic(), u8"CMN_RUNNING_LOG", data);
- }
- void DemoProtocol::load_algorithm_from_db(memepp::native_string filePath)
- {
- //加载已经部署的算法信息
- m_algorithmDB.InitAlgorithmDB(filePath);
- auto algoVec = m_algorithmDB.GetAllRecords();
- if (algoVec.size() > 0)
- {
- for (auto element : algoVec)
- {
- memepp::string_view exprId = mm_view(element.strAlgorithmId.c_str(), element.strAlgorithmId.length());
- auto expr = m_handle.createExpr(exprId);
- auto isValid = expr.isObjectValid();
- if (isValid)
- {
- //编译算法
- memepp::string_view strExpr = mm_view(element.strNewThresholdFormula.c_str(), element.strNewThresholdFormula.length());
- expr.setString(strExpr);
- expr.setResultMode(nbpp::dlli::Expr::WithMidres);
- expr.compile();
- if (!expr.hasCompileError())
- {
- m_mapFormulasInfo[element.strAlgorithmId].AlgotithmInfo = element;
- m_mapFormulasInfo[element.strAlgorithmId].exprObject = expr;
- std::vector<std::string> pointNoVec = extractValueFromExpression(element.strThresholdFormula);
- //通过点位桥查询
- for (int i = 0; i < pointNoVec.size(); ++i)
- {
- //缓存对应,为了后面通过 map[采集点号] = 点位号
- m_collect_point_map[element.collectPointVec[i]] = pointNoVec[i];
- }
- query_send_bridge(pointNoVec);
- }
- }
- }
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] load Formula: %d ", algoVec.size());
- }
- }
- //算法下发解包
- void DemoProtocol::unpack_algorithm_data(const nlohmann::json params)
- {
- // 解包算法数据
- auto strDeviceSN = params["deviceSN"].get<std::string>(); //网关编号
- auto isDeploy = params["deploy"].get<bool>(); //是否部署:false-仅部署,true-部署
- auto strlistFormulas = params["formulas"].get<vector<nlohmann::json>>();
- for (auto element : strlistFormulas)
- {
- nlohmann::json data = {};
- FormulasInfo formulasInfo;
- formulasInfo.AlgotithmInfo.strAlgorithmId = element["algorithmId"].get<std::string>(); //算法ID
- formulasInfo.AlgotithmInfo.strEquipId = element["equipId"].get<std::string>(); //设备ID
- formulasInfo.AlgotithmInfo.strThresholdFormula = element["thresholdFormula"].get<std::string>(); //阈值公式
- data["deviceSN"] = strDeviceSN;
- data["algorithmId"] = formulasInfo.AlgotithmInfo.strAlgorithmId;
- data["equipId"] = formulasInfo.AlgotithmInfo.strEquipId;
- data["errorCode"] = 0;
- data["message"] = "ok";
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] unpack Formula: %s ", formulasInfo.AlgotithmInfo.strThresholdFormula.c_str());
- //获取公式中所有%{X},X点位号
- std::vector<std::string> pointNoVec = extractValueFromExpression(formulasInfo.AlgotithmInfo.strThresholdFormula);
- //点位号转换成采集点号
- formulasInfo.AlgotithmInfo.collectPointVec = std::vector<std::string>(pointNoVec.size());
- PointNo2PointId(pointNoVec, formulasInfo.AlgotithmInfo.collectPointVec);
- //TOTEST
- //formulasInfo.AlgotithmInfo.collectPointVec[0] = "260223241487560715";
- string strcollectPointTmp = "";
- bool isPointNoValid = true;
- for (auto element: formulasInfo.AlgotithmInfo.collectPointVec)
- {
- if (element.empty())
- {
- isPointNoValid = false;
- data["message"] = "The formula contains invalid measurement points";
- break;
- }
- strcollectPointTmp = strcollectPointTmp + element + ",";
- }
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] pointNo =>collectPoint: %s ", strcollectPointTmp.c_str());
- //采集点号替换到公式中
- formulasInfo.AlgotithmInfo.strNewThresholdFormula = replacePlaceholder(formulasInfo.AlgotithmInfo.strThresholdFormula, formulasInfo.AlgotithmInfo.collectPointVec);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] NewThresholdFormula: %s ", formulasInfo.AlgotithmInfo.strNewThresholdFormula.c_str());
- //语法校验
- memepp::string_view exprId = mm_view(formulasInfo.AlgotithmInfo.strAlgorithmId.c_str(), formulasInfo.AlgotithmInfo.strAlgorithmId.length());
- auto expr = m_handle.createExpr(exprId);
- auto isValid = expr.isObjectValid();
- if (isValid && isPointNoValid)
- {
- //编译算法
- memepp::string_view strExpr = mm_view(formulasInfo.AlgotithmInfo.strNewThresholdFormula.c_str(), formulasInfo.AlgotithmInfo.strNewThresholdFormula.length());
- expr.setString(strExpr);
- expr.setResultMode(nbpp::dlli::Expr::WithMidres);
- expr.compile();
- if (expr.hasCompileError())
- {
- data["errorCode"] = expr.firstCompileErrorCode();
- data["message"] = expr.firstCompileErrorMessage().c_str();
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] CompileError,error code:%d ,error message:%s", expr.firstCompileErrorCode(), expr.firstCompileErrorMessage().c_str());
- }
- else
- {
- //编译无措,且如果需要部署,则保存。
- if (isDeploy)
- {
- //通过点位桥查询
- for (int i = 0; i < pointNoVec.size(); ++i)
- {
- //缓存对应,为了后面通过 map[采集点号] = 点位号
- m_collect_point_map[formulasInfo.AlgotithmInfo.collectPointVec[i]] = pointNoVec[i];
- }
- query_send_bridge(pointNoVec);
- string strKey = formulasInfo.AlgotithmInfo.strAlgorithmId;
- //保存算式
- formulasInfo.exprObject = expr;
- //如果已经存在,则需要更新,否则就新增一条记录
- if (m_mapFormulasInfo.count(strKey) > 0)
- {
- m_algorithmDB.UpdateRecord(strKey, formulasInfo.AlgotithmInfo);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile success and UPDATA Formula, count:%d ", m_mapFormulasInfo.size());
- }
- else
- {
- m_algorithmDB.AddRecord(formulasInfo.AlgotithmInfo);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile success and ADD a Formula, count:%d ", m_mapFormulasInfo.size());
- }
- m_mapFormulasInfo[strKey] = formulasInfo;
- }
- }
- }
- else
- {
- data["errorCode"] = 10000;
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM] Compile isInValid!");
- }
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 1");
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 2");
- pack_send_mqtt_reply(topic_map_["algorithm_send"].get_subscribe_topic(), u8"CMD_ALGORITHM_SEND", data);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPLY] 3");
- }
- }
- //平台下发点位号转换成网关平台测点ID
- void DemoProtocol::PointNo2PointId(std::vector<std::string> pointNo, std::vector<std::string>& pointId)
- {
- size_t vec_len = pointNo.size();
- std::vector<int64_t> vecpointNoNum;
- for (int i = 0; i < vec_len; ++i)
- {
- vecpointNoNum.push_back(std::stoull(pointNo[i]));
- }
- for (auto eqptIt = m_line.begin();eqptIt != m_line.end();++eqptIt)
- {
- auto table = (*eqptIt).source().forwardTable();
- if (!table) {
- continue;
- }
- auto page1 = table.pageGet("PAGE1");
- for (auto index = 0; index < page1.pointSize(); ++index)
- {
- auto fp = page1.pointAt(index);
- if (!fp) {
- continue;
- }
- auto fpNo = fp.no();
- for (int i = 0; i < vec_len;++i)
- {
- if (fp.no() == vecpointNoNum[i])
- {
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- auto pointid = cp.source().id();
- pointId[i] = std::string(pointid.c_str(), pointid.size());
- break;
- }
- }
- }
- auto page2 = table.pageGet("PAGE2");
- for (auto index = 0; index < page2.pointSize(); ++index)
- {
- auto fp = page2.pointAt(index);
- if (!fp) {
- continue;
- }
- auto fpNo = fp.no();
- for (int i = 0; i < vec_len;++i)
- {
- if (fpNo == vecpointNoNum[i])
- {
- auto cp = fp.related().release<Cmnpp::Proto::Point>();
- auto pointid = cp.source().id();
- pointId[i] = std::string(pointid.c_str(), pointid.size());
- break;
- }
- }
- }
- }
- }
- //算法结果上报
- void DemoProtocol::pack_send_algorithm_report()
- {
- //没有部署算法,不执行
- if (m_mapFormulasInfo.size() == 0)
- return;
- auto CurrentRealDataTime = getNow();
- std::vector<nlohmann::json> vec_Formulas;
-
- for (auto keyValue : m_mapFormulasInfo)
- {
- FormulasInfo formulasInfo = keyValue.second;
- if (!formulasInfo.exprObject.isCompiled())
- {
- formulasInfo.exprObject.setResultMode(nbpp::dlli::Expr::WithMidres);
- formulasInfo.exprObject.compile();
- }
- auto exprResult = formulasInfo.exprObject.calc();
- nlohmann::json data;
- if (exprResult.isObjectValid())
- {
- auto result = exprResult.numberValue();
- if (result == 0)//算法结果 "正常"
- {
- data = {
- {u8"result", 0},
- };
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] calc result:0, no alarm! ");
- }
- else//算法结果 "预警" a
- {
- nlohmann::json points;
- nlohmann::json variables;
-
- string strpointsTmp = "";
- //获取参与运算测点的值
- auto pointSize = exprResult.pointStatementSize();
- vector<string> pointNoVecTmp;
- for (int i = 0; i < pointSize; ++i)
- {
- memepp::string key;
- memepp::variant val;
- exprResult.pointStatementKeyValue(i, key, val);
- string strKey = string(key.c_str(), key.size());//采集点号
- //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] engine key = %s", strKey.c_str());
- //点位桥协议转换
- //采集点号->点位号->设备号
- if (m_collect_point_map.count(strKey) != 0)
- {
- //采集点号->点位号
- string strPointNo = m_collect_point_map[strKey];
- // cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] point key = %s", strPointNo.c_str());
- if (point_bridge_map_.count(strPointNo) != 0)
- {
- //点位号->设备号
- strKey = point_bridge_map_[strPointNo];
- // cmnproto_log_debug(m_handle.native(), "[ALGORITHM-SEND] device key = %s", strPointNo.c_str());
- }
- else
- {
- //重发点位桥协议报文
- pointNoVecTmp.push_back(strPointNo);
- }
- }
- if (pointNoVecTmp.size() > 0)
- {
- //部分点位号没有对应设备号,重发点位桥协议报文
- query_send_bridge(pointNoVecTmp);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] resend bridge :%d", pointNoVecTmp.size());
- }
- double valTmp;
- val.try_get(valTmp);
- points[strKey] = valTmp;
- strpointsTmp = strpointsTmp + strKey + ":" + std::to_string(valTmp) + ",";
- }
- //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] algorithm report points:%s ", strpointsTmp.c_str());
- string strvariablesTmp = "";
- //获取算法定义中间变量的计算值
- auto variableSize = exprResult.assignStatementSize();
- for (int i = 0; i < variableSize; ++i)
- {
- memepp::string key;
- memepp::variant val;
- exprResult.assignStatementKeyValue(i, key, val);
- string strKey = string(key.c_str(), key.size());
- double valTmp;
- val.try_get(valTmp);
- variables[strKey] = valTmp;
- strvariablesTmp = strpointsTmp + strKey + ":" + std::to_string(valTmp) + ",";
- }
- //cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] algorithm report variables:%s ", strvariablesTmp.c_str());
-
- data = {
- {u8"result", result},
- {u8"points", points},
- {u8"variables", variables},
- };
- }
- }
- else
- {
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] calc error , object invalid ");
- }
- const nlohmann::json an_formula = {
- {u8"equipId", formulasInfo.AlgotithmInfo.strEquipId},
- {u8"algorithmId", formulasInfo.AlgotithmInfo.strAlgorithmId},
- {u8"data", data},
- };
- vec_Formulas.push_back(an_formula);
- }
-
- const nlohmann::json data_send = {
- {u8"deviceSN", m_device_sn},
- {u8"date", CurrentRealDataTime},
- {u8"formulas", vec_Formulas},
- };
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-REPORT] %s ", data_send.dump().c_str());
- pack_send_mqtt(topic_map_["algorithm_data"].get_publish_topic(), u8"REP_ALGORITHM_DATA", data_send);
- }
- //阈值算法移除
- void DemoProtocol::unpack_algorithm_delete(const nlohmann::json params)
- {
- auto strDeviceSN = params["deviceSN"].get<std::string>(); //网关编号
- auto strDelFormulas = params["formulas"].get<vector<nlohmann::json>>();
- auto nErrCode = 0;
- std::vector<nlohmann::json> dels;
- for (auto element : strDelFormulas)
- {
- auto algorithmId = element["algorithmId"].get<std::string>();
- auto equipId = element["equipId"].get<std::string>();
- if (m_mapFormulasInfo.count(algorithmId) > 0)
- {
- m_mapFormulasInfo.erase(algorithmId);
- m_algorithmDB.DeleteRecord(algorithmId);
- const nlohmann::json delformulas = {
- {u8"algorithmId", algorithmId},
- {u8"equipId", equipId},
- };
- dels.push_back(delformulas);
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-DEL] Delete algorithmId:%s, equipId:%s,", algorithmId.c_str(), equipId);
- }
- }
- cmnproto_log_debug(m_handle.native(), "[ALGORITHM-DEL] mapFormulasInfo count:%d", m_mapFormulasInfo.size());
- const nlohmann::json data = {
- {u8"deviceSN", strDeviceSN},
- {u8"formulas", dels},
- };
- pack_send_mqtt_reply(topic_map_["algorithm_del"].get_subscribe_topic(), u8"CMD_ALGORITHM_DEL", data);
- }
- long DemoProtocol::get_time_from_string(const std::string dataTime)
- {
- long decimalTime = 0;
- std::string timestamp;
- if(dataTime[1] == '0')
- {
- timestamp = dataTime.substr(4, 8); // 获取时间戳数据
- }
- else if (dataTime[1] == '1')
- {
- timestamp = dataTime.substr(11, 8); // 获取时间戳数据
- }
- decimalTime = std::stoi(timestamp, nullptr, 16);
- return decimalTime;
- }
- void DemoProtocol::subscribe_topic()
- {
- for (auto iter = topic_map_.begin(); iter != topic_map_.end(); ++iter)
- {
- auto value = iter->second;
- auto key = iter->first;
- if (value.hasReply_)
- {
- //< 链接一建立,就订阅消息
- Cmnpp::IFace::SubPubMessage msg;
- msg.cmd(cmniface_msgbody_command_subscribe);
- //< 数据
- msg.data("");
- auto topic = value.is_gateway_active_ ? value.get_subscribe_topic() : value.get_publish_topic();
- msg.topic(topic);
- m_handle.send(m_currentLink, msg);
- cmnproto_log_debug(m_handle.native(), "Subcribe Topic:%s", topic.c_str());
- }
- }
- }
- // 获取文件内容
- bool DemoProtocol::readLinesFromFile(
- const memepp::native_string& filePath,
- int startLine, int numLines, uint64_t& totalLines, std::vector<nlohmann::json>& result)
- {
- std::ifstream file(filePath, std::ios::in );
- file.imbue(std::locale("chs"));
- if (!file.is_open()) {
- std::cerr << "Open file failed: " << filePath.c_str() << std::endl;
- return false;
- }
- totalLines = 0;
- result.clear();
- std::string line;
- while (std::getline(file, line)) {
- totalLines++;
- if (totalLines >= startLine && numLines > 0) {
- auto info = parseLog(line);
- result.push_back(info);
- numLines--;
- }
- // 行数超过文件总行数,提前退出循环
- if (file.eof()) {
- break;
- }
- }
- if (file.fail() && !file.eof()) {
- std::cerr << "Read File error" << std::endl;
- file.close();
- return false;
- }
- file.close();
- return true;
- }
- // 该函数是插件的退出函数,该函数会被插件加载器调用
- CMN_PLUGIN_API int64_t cmnplugin_exit()
- {
- return CMNERRNO_OK;
- }
- // 该函数是插件的入口函数,该函数会被插件加载器调用
- CMN_PLUGIN_API int64_t cmnplugin_initialize(
- const cmnplugin_init_params* _params, rsize_t _structSize, cmnplugin_exit_func_t* _func)
- {
- Cmnpp::IProtocolRegistrar registrar(_params, _structSize);
- auto ret = registrar.initialize();
- if (ret < 0)
- return CMNERRNO_ERROR;
- ret = registrar.registerObject<DemoProtocol>("NW-MQTT");
- if (ret < 0)
- return CMNERRNO_ERROR;
- *_func = cmnplugin_exit;
- return 0;
- }
|