123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #pragma once
- #include <Cmnpp/IProtocol.h>
- #include "Cmnpp/Proto/HandleRef.h"
- #include "MqttTopic.h"
- #include "SnowflakeIdGenerator.hpp"
- #include "DataStorageMgmt.hpp"
- #include "ntool/cxx/chrono/passive_timer.hpp"
- #include <memepp/native.hpp>
- #include <spdlog/logger.h>
- #include <nlohmann/json.hpp>
- #include "AlgorithmDB.h"
- #include <map>
- #include <string>
- class DemoProtocol : public Cmnpp::IProtocol
- {
- public:
- void parameters_get(struct cmnproto_params* _param);
- cmnec_t initialize_cb(cmnproto_handle _handle);
- cmnec_t process_cb() override;
- cmnec_t terminate_cb();
- cmnerrno_t timer_cb() override;
- cmnec_t link_new_cb(cmnlink_ref _link);
- cmnec_t link_del_cb(cmnlink_ref _link);
- cmnec_t message_cb_response(memepp::string_view topic, memepp::string_view data);
- cmnec_t message_cb_request(memepp::string_view topic, memepp::string_view data);
- virtual cmnec_t message_cb(cmnlink_ref _link, cmniface_msgbody_const _msg) override;
- private:
- // 发送mqtt消息
- void pack_send_mqtt(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data);
- // 发送mqtt响应
- void pack_send_mqtt_reply(const memepp::string& topic, const memepp::string& type, const nlohmann::json& data);
- // 发送mqtt消息
- void pack_send_mqtt_binary(const memepp::string& topic, const uint8_t* buf, size_t len);
- // 发送文本消息
- void pack_send_mqtt_text(const memepp::string& topic, const std::string &text);
- // 发送认证
- void pack_send_token();
- // 解包token
- void unpack_data_token(const nlohmann::json params);
- // 发送注册
- void pack_send_register();
- // 解包注册
- void unpack_data_register(const nlohmann::json params);
- // 发送校时
- void pack_send_ntp();
- // 解包校时
- void unpack_data_ntp(const nlohmann::json params);
- // 数据上报-发送实时数据
- void pack_send_real_data();
- // 数据上报-发送增量数据
- void pack_send_incremental_data();
- // 断点续传-发送离线数据
- void pack_send_offline_data();
- // 发送总召数据
- void pack_send_up_call();
- // 发送点位桥查询
- void query_send_bridge(std::vector<std::string> dotNumbers);
- // 解包点位桥结果
- void unpack_bridge_data(memepp::string_view data);
- // 解析总召指令
- void unpack_data_up_all(const nlohmann::json params);
- // 查询指令-状态查询
- void unpack_query_machine_status(const nlohmann::json params);
- // 查询指令-日志查询
- void unpack_query_log(const nlohmann::json params);
- // 算法下发解包
- void unpack_algorithm_data(const nlohmann::json params);
- // 发送算法报告
- void pack_send_algorithm_report();
- // 算法移除
- void unpack_algorithm_delete(const nlohmann::json params);
- //从全量包或者增量包中获取时间戳
- long get_time_from_string(const std::string dataTime);
- //发送数据定时任务,包括全量数据和增量数据
- void send_data_timer_task();
- // 处理事务任务, 主要是类似于每天某个时间点定时运行
- void run_service_task();
- //发送数据准备
- void pre_real_send_data();
- //计算磁盘利用率,最大值100
- double calc_disk_ratio();
- //平台下发点位号转换成网关平台测点ID
- void PointNo2PointId(std::vector<std::string> pointNo, std::vector<std::string> &pointId);
- // 初始化topic列表
- void init_topic_map();
- // 订阅topic
- void subscribe_topic();
- bool readLinesFromFile(const memepp::native_string& filePath, int startLine, int numLines, uint64_t& totalLines, std::vector<nlohmann::json>& result);
- //根据遥测遥信数量,计算发送队列长度
- size_t cacl_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count);
- //根据遥测遥信数量,计算增量数据最大发送队列长度
- size_t cacl_inc_real_data_buff_len(const uint16_t& yc_count, const uint16_t& yx_count);
- //发送算法报告定时任务
- void send_algorithm_report_timer_task();
- //从数据库中读取已经部署的算法信息
- void load_algorithm_from_db(memepp::native_string filePath);
- private:
- typedef enum
- {
- gateway_init,
- gateway_online,
- gateway_offline
- }ET_gateway_status;
- typedef struct
- {
- AlgorithmDB::Record AlgotithmInfo;
- Cmnpp::Calc::Expr exprObject;
- }FormulasInfo;
- private:
- ET_gateway_status gateway_status;
- Cmnpp::Proto::HandleRef m_handle;
- Cmnpp::Proto::LineRef m_line;
- Cmnpp::Link m_currentLink;
- //std::shared_ptr<spdlog::logger> logger_;
- // 令牌
- std::string m_token;
- // 当前目录
- memepp::native_string currentDir;
- char sm4_key_screct[33] = "9dd2ecbe933c80a7afbba5403b2b5626";
- // 设备序号
- std::string m_device_sn;
- SnowflakeIdGenerator idGenerator = SnowflakeIdGenerator(1, 2);
- bool m_registered = false;
- //当前时间窗口(5分钟)时间戳。1970 年 1 月 1 日(08:00:00 GMT)至当前时间的总毫秒数。
- uint64_t CurrentRealDataTime;
- //增量包时间窗口内的数据包的顺序号
- uint16_t IncDataPackIndex;
- //全量数据缓存。
- std::vector<uint8_t> vec_real_data_buff;
- //增量数据缓存。
- std::vector<uint8_t> vec_inc_real_data_buff;
- // topic定义
- std::map<std::string, mqtt::MqttTopic> topic_map_;
- // 事件mid缓存
- std::map<std::string, std::string> mid_map_;
- // 点位缓存,key 点位号,value 测点号
- std::map<std::string, std::string> point_bridge_map_;
- // 采集点缓存,key 采集点号,value 点位号
- std::map<std::string, std::string> m_collect_point_map;
- //存储数据类
- DataStorageMgmt m_StorageData;
- //历史数据缓存,用于定时发送
- std::vector<ghc::filesystem::path> m_his_data_file_list;
- //总召数据发送序号
- size_t m_upcall_send_index;
- //离线数据文件清单
- std::vector<ghc::filesystem::path> m_offline_data_file_list;
- //当前发送离线数据存储文件的序号
- size_t m_offline_file_index;
- //离线数据缓存,用于定时发送
- std::vector<std::vector<uint8_t>> m_offline_data_cache;
- //发送离线数据序号
- size_t m_offline_data_send_index;
- //该次断点续传标识
- uint64_t m_offline_compensationId;
- //标识是否正在发送上一次离线缓存数据。如果发送离线数据过程中,又出现了离线,回重新缓存数据,但上次读取的缓存数据还没发完
- //所以需要先完缓存数据,并且文件需要不要加1
- bool m_is_sending_cache_data;
- // 读取文件大小
- double getFileSizeInMB(const std::string& filename);
- // 注册定时器
- ntool::chrono::passive_timer auth_passive_timer_;
- // 发送全量数据定时器
- ntool::chrono::passive_timer m_send_total_data_timer;
- // 增量数据检查定时器
- ntool::chrono::passive_timer m_send_incremental_data_timer;
- // 发送离线数据定时器
- ntool::chrono::passive_timer m_send_offline_data_timer;
- // 发送总召数据定时器
- ntool::chrono::passive_timer m_up_call_timer;
- // 常用事务定时器
- ntool::chrono::passive_timer service_task_timer;
- // 算法结果上报定时器
- ntool::chrono::passive_timer m_algorithm_task_timer;
- // 事务定时器运行时间
- uint64_t run_service_timestamp = 0;
- // 上次获取token时间
- uint64_t last_get_token_timestamp = 0;
- //全量遥信数据缓存列表,每个转发设备一个换成,用于比较数据是否有变化
- std::vector<std::vector<float>> YC_data_cache_list;
- //全量遥测数据缓存,用于比较数据是否有变化
- std::vector<std::vector<uint8_t>> YX_data_cache_list;
- std::map<std::string, FormulasInfo> m_mapFormulasInfo;
- AlgorithmDB m_algorithmDB;
- };
|