#include "CanHS.h" #include "SimMsg.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; /* 全局变量定义 */ static CanServiceContext g_can_ctx = { .fd_can = -1, .running = false, .send_id_on_o2 = 0x27, .stats = {0}, .telemetry_len = 0, .telemetry_request_pending = false, .listener_thread = 0 }; /* FastDDS相关全局变量 */ string topic_name_cmd = "Command"; string topic_name_tlm = "Telemetry"; SimMsg* CanHS_part = nullptr; string servername = "Can_Hardware_Service"; /* 内部函数声明 */ static bool can_device_init(const char* dev_can); static void* background_listener_thread(void* arg); static void background_listener(void); static void process_can_data(uint8_t serve_type, uint8_t *rcv_data, uint8_t data_len); static int process_multi_frame(uint8_t serve_type, uint8_t frame_seq, uint8_t *data, uint8_t data_len); static void reset_multi_frame_state(void); static bool is_multi_frame_timeout(void); static void process_telemetry_request(uint8_t *rcv_data, uint8_t data_len); static void process_telemetry_control(uint8_t *rcv_data, uint8_t data_len); /* ============================================================ FastDDS相关函数实现 ============================================================ */ void CanHSWriteLog(const std::string &msg) { std::cout << msg << std::endl; } void CanHS_init(uint8_t domainid, std::string appname) { std::vector parameters; string expression = "dest = '" + servername + "'"; if (nullptr == CanHS_part) { CanHS_part = new SimMsg(domainid, 3000, appname, CanHSWriteLog); CanHS_part->create_pub(topic_name_cmd); CanHS_part->create_pub(topic_name_tlm); CanHS_part->create_sub(topic_name_tlm, telemetry_callback, expression, parameters); } } /* 命令发布函数 */ void CanHS_command_Pub(uint8_t* data, std::string dest, uint16_t len) { if (CanHS_part && data && len > 0) { CanHS_part->publish(topic_name_cmd, servername, dest, "command", data, len); } } /* 遥测发布函数 */ void CanHS_telemetry_Pub(uint8_t* data, std::string dest, uint16_t len) { if (CanHS_part && data && len > 0) { CanHS_part->publish(topic_name_tlm, servername, dest, "telemetry", data, len); } } /* 遥测回调函数 */ void telemetry_callback(std::string src, std::string dest, std::string type, std::string reserve1, std::string reserve2, std::vector& data) { std::cout << "[INFO] CanHS telemetry received from " << src << " to " << dest << ", size=" << data.size() << std::endl; if (data.empty()) { std::cout << "[WARN] CanHS: Empty telemetry data received" << std::endl; return; } /* 存储遥测数据 */ can_set_telemetry_data(data.data(), data.size()); /* 检查是否有遥测请求待处理 */ if (g_can_ctx.telemetry_request_pending) { std::cout << "[INFO] CanHS: Sending telemetry response (pending request)" << std::endl; send_telemetry_response(data.data(), data.size()); g_can_ctx.telemetry_request_pending = false; } } /* ============================================================ CAN服务核心实现 - 使用Linux SocketCAN ============================================================ */ /* 初始化与销毁 */ bool can_service_init(const char* can_device) { /* 初始化互斥锁 */ pthread_mutex_init(&g_can_ctx.telemetry_mutex, NULL); pthread_mutex_init(&g_can_ctx.multi_frame_mutex, NULL); memset(&g_can_ctx.multi_frame_state, 0, sizeof(g_can_ctx.multi_frame_state)); /* 初始化CAN设备 */ if (!can_device_init(can_device)) { std::cerr << "[ERROR] CAN device init failed: " << can_device << std::endl; return false; } return true; } void can_service_destroy(void) { can_service_stop(); pthread_mutex_destroy(&g_can_ctx.telemetry_mutex); pthread_mutex_destroy(&g_can_ctx.multi_frame_mutex); if (g_can_ctx.fd_can >= 0) { close(g_can_ctx.fd_can); g_can_ctx.fd_can = -1; } /* 清理FastDDS资源 */ if (CanHS_part) { delete CanHS_part; CanHS_part = nullptr; } std::cout << "[INFO] CAN Service destroyed" << std::endl; } /* 运行控制 */ void can_service_start(void) { if (g_can_ctx.running) { std::cout << "[WARN] CAN Service already running" << std::endl; return; } g_can_ctx.running = true; /* 启动后台监听线程 */ if (pthread_create(&g_can_ctx.listener_thread, NULL, background_listener_thread, NULL) != 0) { std::cerr << "[ERROR] Failed to create background listener thread" << std::endl; g_can_ctx.running = false; return; } std::cout << "[INFO] CAN Service started" << std::endl; } void can_service_stop(void) { g_can_ctx.running = false; if (g_can_ctx.listener_thread) { pthread_join(g_can_ctx.listener_thread, NULL); g_can_ctx.listener_thread = 0; } std::cout << "[INFO] CAN Service stopped" << std::endl; } /* 统计数据获取 */ const CanStats* can_service_get_stats(void) { return &g_can_ctx.stats; } /* CAN设备初始化 - Linux SocketCAN版本 */ static bool can_device_init(const char* dev_can) { int s; struct sockaddr_can addr; struct ifreq ifr; /* 创建SocketCAN socket */ s = socket(PF_CAN, SOCK_RAW, CAN_RAW); if (s < 0) { perror("socket"); return false; } strcpy(ifr.ifr_name, dev_can); if (ioctl(s, SIOCGIFINDEX, &ifr) < 0) { perror("ioctl SIOCGIFINDEX"); close(s); return false; } addr.can_family = AF_CAN; addr.can_ifindex = ifr.ifr_ifindex; if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind"); close(s); return false; } g_can_ctx.fd_can = s; std::cout << "[OK] CAN SocketCAN initialized: " << dev_can << std::endl; g_can_ctx.stats.can_reset++; return true; } /* CAN单帧发送 */ int can_send_single(uint8_t serve_type, uint8_t *data, uint8_t data_len) { if (data_len > 8 || g_can_ctx.fd_can < 0) { return -1; } struct can_frame frame; memset(&frame, 0, sizeof(struct can_frame)); frame.can_id = g_can_ctx.send_id_on_o2 << 5 | serve_type << 2 | O2_SINGLE_FRAME; frame.can_dlc = data_len; if (data && data_len > 0) { memcpy(frame.data, data, data_len); } ssize_t ret = write(g_can_ctx.fd_can, &frame, sizeof(struct can_frame)); if (ret != sizeof(struct can_frame)) { std::cerr << "[ERROR] CAN send failed: " << strerror(errno) << std::endl; return -1; } if (serve_type == O2_NORMAL_TELEMETRY) { g_can_ctx.stats.send_single_frame++; } std::cout << "[INFO] CAN single frame sent: type=0x" << std::hex << (int)serve_type << ", len=" << std::dec << (int)data_len << std::endl; return 0; } /* CAN多帧发送 */ int can_send_multi(uint8_t serve_type, uint8_t *data, uint16_t data_len) { if (data_len <= 8 || g_can_ctx.fd_can < 0) { return -1; } struct can_frame frame; uint16_t remain_len = data_len; uint16_t frame_index = 0; /* 发送首帧 */ memset(&frame, 0, sizeof(struct can_frame)); frame.can_id = g_can_ctx.send_id_on_o2 << 5 | serve_type << 2 | O2_START_FRAME; frame.can_dlc = 8; frame.data[0] = (uint8_t)frame_index; uint16_t copy_len = (remain_len > 7) ? 7 : remain_len; memcpy(&frame.data[1], data, copy_len); if (write(g_can_ctx.fd_can, &frame, sizeof(struct can_frame)) != sizeof(struct can_frame)) { std::cerr << "[ERROR] Failed to send start frame" << std::endl; return -1; } remain_len -= copy_len; frame_index++; /* 发送中间帧 */ while (remain_len > 7) { memset(&frame, 0, sizeof(struct can_frame)); frame.can_id = g_can_ctx.send_id_on_o2 << 5 | serve_type << 2 | O2_MID_FRAME; frame.can_dlc = 8; frame.data[0] = (uint8_t)frame_index; memcpy(&frame.data[1], data + (frame_index * 7), 7); if (write(g_can_ctx.fd_can, &frame, sizeof(struct can_frame)) != sizeof(struct can_frame)) { std::cerr << "[ERROR] Failed to send middle frame " << frame_index << std::endl; return -1; } remain_len -= 7; frame_index++; } /* 发送末帧 */ memset(&frame, 0, sizeof(struct can_frame)); frame.can_id = g_can_ctx.send_id_on_o2 << 5 | serve_type << 2 | O2_END_FRAME; frame.can_dlc = remain_len + 1; frame.data[0] = (uint8_t)frame_index; if (remain_len > 0) { memcpy(&frame.data[1], data + (frame_index * 7), remain_len); } if (write(g_can_ctx.fd_can, &frame, sizeof(struct can_frame)) != sizeof(struct can_frame)) { std::cerr << "[ERROR] Failed to send end frame" << std::endl; return -1; } if (serve_type == O2_NORMAL_TELEMETRY) { g_can_ctx.stats.send_multi_frame++; } std::cout << "[INFO] Multi-frame sent: " << data_len << " bytes in " << (frame_index + 1) << " frames, type=0x" << std::hex << (int)serve_type << std::dec << std::endl; return 0; } /* 遥测数据管理 */ void can_set_telemetry_data(const uint8_t *data, size_t data_len) { pthread_mutex_lock(&g_can_ctx.telemetry_mutex); size_t copy_len = (data_len < sizeof(g_can_ctx.telemetry_buffer)) ? data_len : sizeof(g_can_ctx.telemetry_buffer); memcpy(g_can_ctx.telemetry_buffer, data, copy_len); g_can_ctx.telemetry_len = copy_len; std::cout << "[INFO] Telemetry data updated, size=" << copy_len << std::endl; pthread_mutex_unlock(&g_can_ctx.telemetry_mutex); } bool can_get_telemetry_data(uint8_t *buffer, size_t buffer_size, size_t *data_len) { pthread_mutex_lock(&g_can_ctx.telemetry_mutex); if (g_can_ctx.telemetry_len == 0) { pthread_mutex_unlock(&g_can_ctx.telemetry_mutex); return false; } size_t copy_len = (g_can_ctx.telemetry_len < buffer_size) ? g_can_ctx.telemetry_len : buffer_size; memcpy(buffer, g_can_ctx.telemetry_buffer, copy_len); if (data_len) *data_len = copy_len; pthread_mutex_unlock(&g_can_ctx.telemetry_mutex); return true; } /* 遥测响应发送 */ void send_telemetry_response(const uint8_t *data, size_t data_len) { uint8_t send_data[1024]; send_data[0] = 0x02; // 遥测响应标识 size_t copy_len = (data_len < sizeof(send_data) - 1) ? data_len : sizeof(send_data) - 1; memcpy(&send_data[1], data, copy_len); uint16_t send_data_len = copy_len + 1; if (send_data_len <= 8) { can_send_single(O2_NORMAL_TELEMETRY, send_data, (uint8_t)send_data_len); } else { can_send_multi(O2_NORMAL_TELEMETRY, send_data, send_data_len); } std::cout << "[INFO] Telemetry response sent, data_len=" << copy_len << ", total_len=" << send_data_len << std::endl; } /* 数据处理函数 */ static void process_telemetry_request(uint8_t *rcv_data, uint8_t data_len) { std::cout << "[INFO] Processing telemetry request, data=0x" << std::hex << (int)rcv_data[0] << ", len=" << std::dec << (int)data_len << std::endl; if (rcv_data[0] == 0x01 && data_len == 1) { g_can_ctx.stats.rec_normal_telemetry++; /* 检查是否有遥测数据 */ uint8_t telemetry_buffer[1024]; size_t telemetry_len = 0; if (can_get_telemetry_data(telemetry_buffer, sizeof(telemetry_buffer), &telemetry_len)) { /* 有遥测数据,发送响应 */ send_telemetry_response(telemetry_buffer, telemetry_len); } else { /* 无遥测数据,标记待处理 */ g_can_ctx.telemetry_request_pending = true; std::cout << "[WARN] No telemetry data available, request pending" << std::endl; } } } static void process_telemetry_control(uint8_t *rcv_data, uint8_t data_len) { std::cout << "[INFO] Processing telemetry control, len=" << (int)data_len << std::endl; if (data_len >= 4) { // 最小有效数据长度 g_can_ctx.stats.rec_telemetry_control++; /* 通过FastDDS转发到通信服务 */ string dest = "Com_Service"; CanHS_command_Pub(rcv_data, dest, data_len); std::cout << "[INFO] Telemetry control forwarded via FastDDS" << std::endl; } } /* 主数据处理函数 */ static void process_can_data(uint8_t serve_type, uint8_t *rcv_data, uint8_t data_len) { switch (serve_type) { case O2_NORMAL_TELEMETRY: process_telemetry_request(rcv_data, data_len); break; case O2_TELEMETRY_CONTROL: process_telemetry_control(rcv_data, data_len); break; default: std::cout << "[WARN] Unknown service type: 0x" << std::hex << (int)serve_type << std::dec << std::endl; break; } } /* 多帧重组 */ static bool is_multi_frame_timeout(void) { struct timeval current_time; gettimeofday(¤t_time, NULL); long elapsed_ms = (current_time.tv_sec - g_can_ctx.multi_frame_state.start_time.tv_sec) * 1000 + (current_time.tv_usec - g_can_ctx.multi_frame_state.start_time.tv_usec) / 1000; return (elapsed_ms > 10000); // 10秒超时 } static void reset_multi_frame_state(void) { pthread_mutex_lock(&g_can_ctx.multi_frame_mutex); g_can_ctx.multi_frame_state.active = 0; g_can_ctx.multi_frame_state.length = 0; g_can_ctx.multi_frame_state.expected_frame_index = 0; pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); } static int process_multi_frame(uint8_t serve_type, uint8_t frame_seq, uint8_t *data, uint8_t data_len) { pthread_mutex_lock(&g_can_ctx.multi_frame_mutex); /* 检查超时 */ if (g_can_ctx.multi_frame_state.active && is_multi_frame_timeout()) { std::cout << "[WARN] Multi-frame reassembly timeout" << std::endl; reset_multi_frame_state(); pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return -1; } /* 如果是首帧 */ if (frame_seq == O2_START_FRAME) { g_can_ctx.multi_frame_state.active = 1; g_can_ctx.multi_frame_state.serve_type = serve_type; g_can_ctx.multi_frame_state.expected_frame_index = 1; g_can_ctx.multi_frame_state.length = 0; gettimeofday(&g_can_ctx.multi_frame_state.start_time, NULL); if (data_len > 1) { memcpy(g_can_ctx.multi_frame_state.buffer, &data[1], data_len - 1); g_can_ctx.multi_frame_state.length = data_len - 1; } pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return 0; } /* 如果是中间帧或末帧 */ if (g_can_ctx.multi_frame_state.active && serve_type == g_can_ctx.multi_frame_state.serve_type) { if (data[0] == g_can_ctx.multi_frame_state.expected_frame_index) { if (data_len > 1) { memcpy(&g_can_ctx.multi_frame_state.buffer[g_can_ctx.multi_frame_state.length], &data[1], data_len - 1); g_can_ctx.multi_frame_state.length += data_len - 1; } g_can_ctx.multi_frame_state.expected_frame_index++; if (frame_seq == O2_END_FRAME) { g_can_ctx.multi_frame_state.active = 0; uint16_t final_length = g_can_ctx.multi_frame_state.length; pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return final_length; // 返回完整数据长度 } pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return 0; // 需要继续接收 } else { std::cout << "[WARN] Multi-frame sequence mismatch: expected=" << (int)g_can_ctx.multi_frame_state.expected_frame_index << ", received=" << (int)data[0] << std::endl; reset_multi_frame_state(); pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return -1; } } pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); return -1; } /* 后台监听线程 */ static void* background_listener_thread(void* arg) { (void)arg; background_listener(); return NULL; } static void background_listener(void) { std::cout << "[INFO] CAN background listener started" << std::endl; while (g_can_ctx.running) { if (g_can_ctx.fd_can < 0) { sleep(1); continue; } fd_set readfds; struct timeval timeout = {0, 100000}; // 100ms FD_ZERO(&readfds); FD_SET(g_can_ctx.fd_can, &readfds); int ret = select(g_can_ctx.fd_can + 1, &readfds, NULL, NULL, &timeout); if (ret > 0 && FD_ISSET(g_can_ctx.fd_can, &readfds)) { struct can_frame recv_frame; ssize_t bytes = read(g_can_ctx.fd_can, &recv_frame, sizeof(struct can_frame)); if (bytes == sizeof(struct can_frame)) { uint8_t serve_type = (recv_frame.can_id >> 2) & 0x07; uint8_t frame_seq = recv_frame.can_id & 0x03; std::cout << "[INFO] CAN received: type=0x" << std::hex << (int)serve_type << ", seq=" << std::dec << (int)frame_seq << ", len=" << (int)recv_frame.can_dlc << std::endl; /* 处理多帧数据 */ if (frame_seq != O2_SINGLE_FRAME) { int result = process_multi_frame(serve_type, frame_seq, recv_frame.data, recv_frame.can_dlc); if (result > 0) { std::cout << "[SUCCESS] Multi-frame reassembly completed: type=0x" << std::hex << (int)serve_type << std::dec << ", bytes=" << result << std::endl; process_can_data(serve_type, g_can_ctx.multi_frame_state.buffer, result); } else if (result == 0) { std::cout << "[INFO] Waiting for more multi-frame data, type=0x" << std::hex << (int)serve_type << std::dec << std::endl; } else { std::cout << "[ERROR] Multi-frame reassembly failed: type=0x" << std::hex << (int)serve_type << std::dec << std::endl; } } else { process_can_data(serve_type, recv_frame.data, recv_frame.can_dlc); } } } /* 检查多帧超时 */ pthread_mutex_lock(&g_can_ctx.multi_frame_mutex); if (g_can_ctx.multi_frame_state.active && is_multi_frame_timeout()) { std::cout << "[WARN] Multi-frame reassembly timeout" << std::endl; reset_multi_frame_state(); } pthread_mutex_unlock(&g_can_ctx.multi_frame_mutex); usleep(10000); // 10ms } }