#include "log_producer_sender.h" #include "log_api.h" #include "log_producer_manager.h" #include "inner_log.h" #include "lz4.h" #include "sds.h" #include #include #include "baseFun.h" #ifdef WIN32 #include #else #include #include #endif const char* LOGE_SERVER_BUSY = "ServerBusy"; const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError"; const char* LOGE_UNAUTHORIZED = "Unauthorized"; const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed"; const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed"; const char* LOGE_TIME_EXPIRED = "RequestTimeExpired"; #define SEND_SLEEP_INTERVAL_MS 50 #define MAX_NETWORK_ERROR_SLEEP_MS 3000 #define BASE_NETWORK_ERROR_SLEEP_MS 300 #define MAX_QUOTA_ERROR_SLEEP_MS 10000 #define BASE_QUOTA_ERROR_SLEEP_MS 500 #define INVALID_TIME_TRY_INTERVAL 500 #define DROP_FAIL_DATA_TIME_SECOND 86400 ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */ unsigned long g_upload_TerminalSys_Suc = 0; unsigned long g_upload_TerminalUser_Suc = 0; unsigned long g_upload_BussinessSys_Suc = 0; unsigned long g_upload_BussinessUser_Suc = 0; unsigned long g_upload_beidou_Suc = 0; unsigned long g_upload_vtmsdk_Suc = 0; unsigned long g_upload_TerminalSys_Err = 0; unsigned long g_upload_TerminalUser_Err = 0; unsigned long g_upload_BussinessSys_Err = 0; unsigned long g_upload_BussinessUser_Err = 0; unsigned long g_upload_beidou_Err = 0; unsigned long g_upload_vtmsdk_Err = 0; unsigned long g_discardMsgNum_since_full = 0; unsigned long g_discardMsgNum_since_serverRet_RTI1002 = 0; unsigned long g_notUploadLogNum = 0; //#define SEND_TIME_INVALID_FIX typedef struct _send_error_info { log_producer_send_result last_send_error; int32_t last_sleep_ms; int32_t first_error_time; }send_error_info; int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info); #ifdef SEND_TIME_INVALID_FIX void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf) { char* buf; long nowTime; int compress_bound; char* compress_data; int compressed_size; aos_debug_log("rebuild log."); buf = (char *)malloc(lz4_buf->raw_length); if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0) { free(buf); aos_fatal_log("LZ4_decompress_safe error"); return; } nowTime = LOG_GET_TIME(); //fix_log_group_time(buf, lz4_buf->raw_length, nowTime); compress_bound = LZ4_compressBound(lz4_buf->raw_length); compress_data = (char *)malloc(compress_bound); compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound); if(compressed_size <= 0) { aos_fatal_log("LZ4_compress_default error"); free(buf); free(compress_data); return; } *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size); (*new_lz4_buf)->length = compressed_size; (*new_lz4_buf)->raw_length = lz4_buf->raw_length; memcpy((*new_lz4_buf)->data, compress_data, compressed_size); free(buf); free(compress_data); return; } #endif #ifdef WIN32 DWORD WINAPI log_producer_send_thread(LPVOID param) #else void * log_producer_send_thread(void * param) #endif { log_producer_manager * producer_manager = (log_producer_manager *)param; if (producer_manager->sender_data_queue == NULL) { return 0; } Sleep(producer_manager->producer_config->sendThreadWaitMs); while (!producer_manager->shutdown) { // change from 30ms to 1000ms, reduce wake up when app switch to back void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000); if (send_param != NULL) { int32_t begin; ATOMICINT_INC(&producer_manager->multi_thread_send_count); begin = time(NULL); log_producer_send_fun(send_param); aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin)); ATOMICINT_DEC(&producer_manager->multi_thread_send_count); } } return 0; } void send_log_data(const char* url, const char* body) { RvcLogSdkManager::getInstance().LOG_OS_TestLogPost(url, body); } void * log_producer_send_fun(void * param) { log_producer_manager* producer_manager; log_producer_config* config; send_error_info error_info; log_producer_send_param * send_param = (log_producer_send_param *)param; if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right { //aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num)); producer_manager = (log_producer_manager *)send_param->producer_manager; if (producer_manager && producer_manager->send_done_function != NULL) { producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length, NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param); } if (producer_manager && producer_manager->uuid_send_done_function != NULL) { producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length, NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->uuid_user_param, send_param->log_buf->n_logs, send_param->log_buf->uuid, send_param->log_buf->modular); } return NULL; } config = send_param->producer_config; memset(&error_info, 0, sizeof(error_info)); producer_manager = (log_producer_manager *)send_param->producer_manager; do { lz4_log_buf* send_buf; log_post_option option; sds accessKeyId = NULL; sds accessKey = NULL; sds stsToken = NULL; post_log_result* rst; int32_t sleepMs; int i = 0; char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = ""; if (producer_manager->shutdown) { aos_info_log((LB, "send fail but shutdown signal received, force exit")); if (producer_manager->send_done_function != NULL) { producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length, NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param); } break; } send_buf = send_param->log_buf; #ifdef SEND_TIME_INVALID_FIX nowTime = LOG_GET_TIME(); if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR) { _rebuild_time(send_param->log_buf, &send_buf); send_param->builder_time = nowTime; } #endif memset(&option, 0, sizeof(log_post_option)); option.connect_timeout = config->connectTimeoutSec; option.operation_timeout = config->sendTimeoutSec; option.compress_type = config->compressType; option.using_https = config->using_https; option.ntp_time_offset = config->ntpTimeOffset; if(config->tokenFun != NULL) config->tokenFun(channelId, token, terminalno, reserve1); rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode)); sdsfree(accessKeyId); sdsfree(accessKey); sdsfree(stsToken); if(rst->statusCode == 200) { switch(send_buf->type) { case LOG_TYPE_USER_SKYEYE: g_upload_TerminalUser_Suc += send_buf->n_logs; break; case LOG_TYPE_SYS_SKYEYE: g_upload_TerminalSys_Suc += send_buf->n_logs; break; case LOG_TYPE_BEIDOU: g_upload_beidou_Suc += send_buf->n_logs; break; case LOG_TYPE_USER_BUSINESS: g_upload_BussinessUser_Suc += send_buf->n_logs; break; case LOG_TYPE_SYS_BUSINESS: g_upload_BussinessSys_Suc += send_buf->n_logs; break; case LOG_TYPE_WEBSDK: g_upload_vtmsdk_Suc += send_buf->n_logs; break; default: break; } } else { switch (send_buf->type) { case LOG_TYPE_USER_SKYEYE: g_upload_TerminalUser_Err += send_buf->n_logs; break; case LOG_TYPE_SYS_SKYEYE: g_upload_TerminalSys_Err += send_buf->n_logs; break; case LOG_TYPE_BEIDOU: g_upload_beidou_Err += send_buf->n_logs; break; case LOG_TYPE_USER_BUSINESS: g_upload_BussinessUser_Err += send_buf->n_logs; break; case LOG_TYPE_SYS_BUSINESS: g_upload_BussinessSys_Err += send_buf->n_logs; break; case LOG_TYPE_WEBSDK: g_upload_vtmsdk_Err += send_buf->n_logs; break; default: break; } if(rst->statusCode == 300) { /* (1) if multi logs,split and send one by one again, all give uuid LOG_SEND_ONE. - if uuid equal to LOG_SEND_ONE, don't not delete uuid from db - if uuid not equal to LOG_SEND_ONE, delete uuid from db (2) if single log, means dirty data, encrypt it and give uuid LOG_SEND_DIRTY. - record the nums of dirty logs */ if(send_buf->n_logs == 1)//单条log, 已经尝试过重发,不再重发 { if(STR_LOG_SEND_DIRTY != send_buf->src_logs[0].uuid) { memcpy(send_buf->src_logs[0].uuid, STR_LOG_SEND_DIRTY, strlen(STR_LOG_SEND_DIRTY)+1); log_group_builder* builder = log_group_create(config); strcpy(builder->modular, send_buf->modular); add_log_raw2(builder, &(send_buf->src_logs[0])); RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder); g_discardMsgNum_since_serverRet_RTI1002 += send_buf->n_logs; } } else { //try resend, push to deque for(int i = 0; i < send_buf->n_logs; i++) { memcpy(send_buf->src_logs[i].uuid, STR_LOG_SEND_ONE, strlen(STR_LOG_SEND_ONE) + 1); log_group_builder* builder = log_group_create(config); strcpy(builder->modular, send_buf->modular); add_log_raw2(builder, &(send_buf->src_logs[i])); RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder); } } } } sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容 post_log_result_destroy(rst); // tmp buffer, free if (send_buf != send_param->log_buf) { free(send_buf); } if (sleepMs <= 0) { break; } i =0; for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS) { #ifdef WIN32 Sleep(SEND_SLEEP_INTERVAL_MS); #else usleep(SEND_SLEEP_INTERVAL_MS * 1000); #endif if (producer_manager->shutdown || producer_manager->networkRecover) { break; } } if (producer_manager->networkRecover) { producer_manager->networkRecover = 0; } }while(1); // at last, free all buffer free_lz4_log_buf(send_param->log_buf); free(send_param); return NULL; } int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info) { log_producer_send_result send_result = AosStatusToResult(result); log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager; if (producer_manager->send_done_function != NULL) { log_producer_result callback_result = send_result == LOG_SEND_OK ? LOG_PRODUCER_OK : (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR); producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param); } if (producer_manager->uuid_send_done_function != NULL) { log_producer_result callback_result = send_result == LOG_SEND_OK ? LOG_PRODUCER_OK : (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR); producer_manager->uuid_send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->uuid_user_param, send_param->log_buf->n_logs, send_param->log_buf->uuid, send_param->log_buf->modular); } if (send_result == LOG_SEND_UNAUTHORIZED) { // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR send_result = LOG_PRODUCER_SEND_NETWORK_ERROR; } switch (send_result) { case LOG_SEND_OK: break; case LOG_SEND_TIME_ERROR: // if no this marco, drop data #ifdef SEND_TIME_INVALID_FIX error_info->last_send_error = LOG_SEND_TIME_ERROR; error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL; return error_info->last_sleep_ms; #else break; #endif case LOG_SEND_QUOTA_EXCEED: if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED) { error_info->last_send_error = LOG_SEND_QUOTA_EXCEED; error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS; error_info->first_error_time = time(NULL); } else { if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS) { error_info->last_sleep_ms *= 2; } if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND) { break; } } aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s", (int)send_param->log_buf->length, (int)send_param->log_buf->raw_length, result->statusCode, result->errorMessage)); return error_info->last_sleep_ms; case LOG_SEND_SERVER_ERROR : case LOG_SEND_NETWORK_ERROR: if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR) { error_info->last_send_error = LOG_SEND_NETWORK_ERROR; error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS; error_info->first_error_time = time(NULL); } else { if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS) { error_info->last_sleep_ms *= 2; } if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND) { break; } } aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s", (int)send_param->log_buf->length, (int)send_param->log_buf->raw_length, result->statusCode, result->errorMessage)); return error_info->last_sleep_ms; default: // discard data break; } // always try once when discard error if (LOG_SEND_OK != send_result && error_info->last_send_error == 0) { error_info->last_send_error = LOG_SEND_DISCARD_ERROR; error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS; error_info->first_error_time = time(NULL); aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s", (int)send_param->log_buf->length, (int)send_param->log_buf->raw_length, (int)producer_manager->totalBufferSize, result->statusCode, result->errorMessage)); return BASE_NETWORK_ERROR_SLEEP_MS; } CS_ENTER(producer_manager->lock); producer_manager->totalBufferSize -= send_param->log_buf->length; CS_LEAVE(producer_manager->lock); if (send_result == LOG_SEND_OK) { aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s", (int)send_param->log_buf->length, (int)send_param->log_buf->raw_length, (int)producer_manager->totalBufferSize, result->statusCode, result->errorMessage)); } else { aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s", (int)send_param->log_buf->length, (int)send_param->log_buf->raw_length, (int)producer_manager->totalBufferSize, result->statusCode, result->errorMessage)); if (producer_manager->send_done_function != NULL) { producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param); } if (producer_manager->uuid_send_done_function != NULL) { producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->uuid_user_param, send_param->log_buf->n_logs, send_param->log_buf->uuid, send_param->log_buf->modular); } } return 0; } log_producer_result log_producer_send_data(log_producer_send_param * send_param) { log_producer_send_fun(send_param); return LOG_PRODUCER_OK; } log_producer_send_result AosStatusToResult(post_log_result * result) { if (result->statusCode == LOG_HTTP_ERRPARAM) return LOG_SEND_OK; if (result->statusCode / 100 == 2) { return LOG_SEND_OK; } if (result->statusCode <= 0) { return LOG_SEND_NETWORK_ERROR; } if (result->statusCode >= 500) { return LOG_SEND_SERVER_ERROR; } if (result->statusCode == 403) { return LOG_SEND_QUOTA_EXCEED; } if (result->statusCode == 401 || result->statusCode == 404) { return LOG_SEND_UNAUTHORIZED; } if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL) { return LOG_SEND_TIME_ERROR; } return LOG_SEND_OK; } log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config, void * producer_manager, lz4_log_buf* log_buf, log_group_builder * builder) { log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param)); param->producer_config = producer_config; param->producer_manager = producer_manager; param->log_buf = log_buf; param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM; if (builder != NULL) { param->builder_time = builder->builder_time; } else { param->builder_time = time(NULL); } return param; }