123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579 |
- #include "log_producer_sender.h"
- #include "log_api.h"
- #include "log_producer_manager.h"
- #include "inner_log.h"
- #include "lz4.h"
- #include "sds.h"
- #include <stdlib.h>
- #include <string.h>
- #include "baseFun.h"
- #ifdef WIN32
- #include <windows.h>
- #else
- #include <unistd.h>
- #include <sys/syscall.h>
- #endif
- const char* LOGE_SERVER_BUSY = "ServerBusy";
- const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
- const char* LOGE_UNAUTHORIZED = "Unauthorized";
- const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
- const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
- const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
- #define SEND_SLEEP_INTERVAL_MS 50
- #define MAX_NETWORK_ERROR_SLEEP_MS 3000
- #define BASE_NETWORK_ERROR_SLEEP_MS 300
- #define MAX_QUOTA_ERROR_SLEEP_MS 10000
- #define BASE_QUOTA_ERROR_SLEEP_MS 500
- #define INVALID_TIME_TRY_INTERVAL 500
- #define DROP_FAIL_DATA_TIME_SECOND 86400
- ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
- unsigned long g_upload_TerminalSys_Suc = 0;
- unsigned long g_upload_TerminalUser_Suc = 0;
- unsigned long g_upload_BussinessSys_Suc = 0;
- unsigned long g_upload_BussinessUser_Suc = 0;
- unsigned long g_upload_beidou_Suc = 0;
- unsigned long g_upload_vtmsdk_Suc = 0;
- unsigned long g_upload_TerminalSys_Err = 0;
- unsigned long g_upload_TerminalUser_Err = 0;
- unsigned long g_upload_BussinessSys_Err = 0;
- unsigned long g_upload_BussinessUser_Err = 0;
- unsigned long g_upload_beidou_Err = 0;
- unsigned long g_upload_vtmsdk_Err = 0;
- unsigned long g_discardMsgNum_since_full = 0;
- unsigned long g_discardMsgNum_since_serverRet_RTI1002 = 0;
- unsigned long g_notUploadLogNum = 0;
- //#define SEND_TIME_INVALID_FIX
- typedef struct _send_error_info
- {
- log_producer_send_result last_send_error;
- int32_t last_sleep_ms;
- int32_t first_error_time;
- }send_error_info;
- int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
- #ifdef SEND_TIME_INVALID_FIX
- void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
- {
- char* buf;
- long nowTime;
- int compress_bound;
- char* compress_data;
- int compressed_size;
- aos_debug_log("rebuild log.");
- buf = (char *)malloc(lz4_buf->raw_length);
- if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
- {
- free(buf);
- aos_fatal_log("LZ4_decompress_safe error");
- return;
- }
- nowTime = LOG_GET_TIME();
- //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
- compress_bound = LZ4_compressBound(lz4_buf->raw_length);
- compress_data = (char *)malloc(compress_bound);
- compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
- if(compressed_size <= 0)
- {
- aos_fatal_log("LZ4_compress_default error");
- free(buf);
- free(compress_data);
- return;
- }
- *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
- (*new_lz4_buf)->length = compressed_size;
- (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
- memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
- free(buf);
- free(compress_data);
- return;
- }
- #endif
- #ifdef WIN32
- DWORD WINAPI log_producer_send_thread(LPVOID param)
- #else
- void * log_producer_send_thread(void * param)
- #endif
- {
- log_producer_manager * producer_manager = (log_producer_manager *)param;
- if (producer_manager->sender_data_queue == NULL)
- {
- return 0;
- }
- Sleep(producer_manager->producer_config->sendThreadWaitMs);
- while (!producer_manager->shutdown)
- {
- // change from 30ms to 1000ms, reduce wake up when app switch to back
- void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
- if (send_param != NULL)
- {
- int32_t begin;
- ATOMICINT_INC(&producer_manager->multi_thread_send_count);
- begin = time(NULL);
- log_producer_send_fun(send_param);
- aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
- ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
- }
- }
- return 0;
- }
- void send_log_data(const char* url, const char* body)
- {
- RvcLogSdkManager::getInstance().LOG_OS_TestLogPost(url, body);
- }
- void * log_producer_send_fun(void * param)
- {
- log_producer_manager* producer_manager;
- log_producer_config* config;
- send_error_info error_info;
- log_producer_send_param * send_param = (log_producer_send_param *)param;
- if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
- {
- //aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
- producer_manager = (log_producer_manager *)send_param->producer_manager;
- if (producer_manager && producer_manager->send_done_function != NULL)
- {
- producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
- NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
- }
- if (producer_manager && producer_manager->uuid_send_done_function != NULL)
- {
- producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
- send_param->log_buf->raw_length,
- send_param->log_buf->length,
- NULL,
- "invalid send param, magic num not found",
- send_param->log_buf->data,
- producer_manager->uuid_user_param,
- send_param->log_buf->n_logs,
- send_param->log_buf->uuid,
- send_param->log_buf->modular);
- }
- return NULL;
- }
- config = send_param->producer_config;
- memset(&error_info, 0, sizeof(error_info));
- producer_manager = (log_producer_manager *)send_param->producer_manager;
- do
- {
- lz4_log_buf* send_buf;
- log_post_option option;
- sds accessKeyId = NULL;
- sds accessKey = NULL;
- sds stsToken = NULL;
- post_log_result* rst;
- int32_t sleepMs;
- int i = 0;
- char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = "";
- if (producer_manager->shutdown)
- {
- aos_info_log((LB, "send fail but shutdown signal received, force exit"));
- if (producer_manager->send_done_function != NULL)
- {
- producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
- NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
- }
- break;
- }
- send_buf = send_param->log_buf;
- #ifdef SEND_TIME_INVALID_FIX
- nowTime = LOG_GET_TIME();
- if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
- {
- _rebuild_time(send_param->log_buf, &send_buf);
- send_param->builder_time = nowTime;
- }
- #endif
- memset(&option, 0, sizeof(log_post_option));
- option.connect_timeout = config->connectTimeoutSec;
- option.operation_timeout = config->sendTimeoutSec;
- option.compress_type = config->compressType;
- option.using_https = config->using_https;
- option.ntp_time_offset = config->ntpTimeOffset;
-
- if(config->tokenFun != NULL)
- config->tokenFun(channelId, token, terminalno, reserve1);
- rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs
- aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
- sdsfree(accessKeyId);
- sdsfree(accessKey);
- sdsfree(stsToken);
- if(rst->statusCode == 200)
- {
- switch(send_buf->type)
- {
- case LOG_TYPE_USER_SKYEYE:
- g_upload_TerminalUser_Suc += send_buf->n_logs;
- break;
- case LOG_TYPE_SYS_SKYEYE:
- g_upload_TerminalSys_Suc += send_buf->n_logs;
- break;
- case LOG_TYPE_BEIDOU:
- g_upload_beidou_Suc += send_buf->n_logs;
- break;
- case LOG_TYPE_USER_BUSINESS:
- g_upload_BussinessUser_Suc += send_buf->n_logs;
- break;
- case LOG_TYPE_SYS_BUSINESS:
- g_upload_BussinessSys_Suc += send_buf->n_logs;
- break;
- case LOG_TYPE_WEBSDK:
- g_upload_vtmsdk_Suc += send_buf->n_logs;
- break;
- default:
- break;
- }
- }
- else
- {
- switch (send_buf->type)
- {
- case LOG_TYPE_USER_SKYEYE:
- g_upload_TerminalUser_Err += send_buf->n_logs;
- break;
- case LOG_TYPE_SYS_SKYEYE:
- g_upload_TerminalSys_Err += send_buf->n_logs;
- break;
- case LOG_TYPE_BEIDOU:
- g_upload_beidou_Err += send_buf->n_logs;
- break;
- case LOG_TYPE_USER_BUSINESS:
- g_upload_BussinessUser_Err += send_buf->n_logs;
- break;
- case LOG_TYPE_SYS_BUSINESS:
- g_upload_BussinessSys_Err += send_buf->n_logs;
- break;
- case LOG_TYPE_WEBSDK:
- g_upload_vtmsdk_Err += send_buf->n_logs;
- break;
- default:
- break;
- }
- if(rst->statusCode == 300)
- {
- /*
- (1) if multi logs,split and send one by one again, all give uuid LOG_SEND_ONE.
- - if uuid equal to LOG_SEND_ONE, don't not delete uuid from db
- - if uuid not equal to LOG_SEND_ONE, delete uuid from db
- (2) if single log, means dirty data, encrypt it and give uuid LOG_SEND_DIRTY.
- - record the nums of dirty logs
- */
- if(send_buf->n_logs == 1)//单条log, 已经尝试过重发,不再重发
- {
- if(STR_LOG_SEND_DIRTY != send_buf->src_logs[0].uuid)
- {
- memcpy(send_buf->src_logs[0].uuid, STR_LOG_SEND_DIRTY, strlen(STR_LOG_SEND_DIRTY)+1);
- log_group_builder* builder = log_group_create(config);
- strcpy(builder->modular, send_buf->modular);
- add_log_raw2(builder, &(send_buf->src_logs[0]));
- RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder);
- g_discardMsgNum_since_serverRet_RTI1002 += send_buf->n_logs;
- }
- }
- else
- {
- //try resend, push to deque
- for(int i = 0; i < send_buf->n_logs; i++)
- {
- memcpy(send_buf->src_logs[i].uuid, STR_LOG_SEND_ONE, strlen(STR_LOG_SEND_ONE) + 1);
- log_group_builder* builder = log_group_create(config);
- strcpy(builder->modular, send_buf->modular);
- add_log_raw2(builder, &(send_buf->src_logs[i]));
- RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder);
- }
-
- }
- }
-
- }
- sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
- post_log_result_destroy(rst);
- // tmp buffer, free
- if (send_buf != send_param->log_buf)
- {
- free(send_buf);
- }
- if (sleepMs <= 0)
- {
- break;
- }
- i =0;
- for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
- {
- #ifdef WIN32
- Sleep(SEND_SLEEP_INTERVAL_MS);
- #else
- usleep(SEND_SLEEP_INTERVAL_MS * 1000);
- #endif
- if (producer_manager->shutdown || producer_manager->networkRecover)
- {
- break;
- }
- }
- if (producer_manager->networkRecover)
- {
- producer_manager->networkRecover = 0;
- }
- }while(1);
- // at last, free all buffer
- free_lz4_log_buf(send_param->log_buf);
- free(send_param);
- return NULL;
- }
- int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
- {
- log_producer_send_result send_result = AosStatusToResult(result);
- log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
- if (producer_manager->send_done_function != NULL)
- {
- log_producer_result callback_result = send_result == LOG_SEND_OK ?
- LOG_PRODUCER_OK :
- (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
- producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
- }
- if (producer_manager->uuid_send_done_function != NULL)
- {
- log_producer_result callback_result = send_result == LOG_SEND_OK ?
- LOG_PRODUCER_OK :
- (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
- producer_manager->uuid_send_done_function(callback_result,
- send_param->log_buf->raw_length,
- send_param->log_buf->length,
- result->requestID,
- result->errorMessage,
- send_param->log_buf->data,
- producer_manager->uuid_user_param,
- send_param->log_buf->n_logs,
- send_param->log_buf->uuid,
- send_param->log_buf->modular);
- }
- if (send_result == LOG_SEND_UNAUTHORIZED)
- {
- // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
- send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
- }
- switch (send_result)
- {
- case LOG_SEND_OK:
- break;
- case LOG_SEND_TIME_ERROR:
- // if no this marco, drop data
- #ifdef SEND_TIME_INVALID_FIX
- error_info->last_send_error = LOG_SEND_TIME_ERROR;
- error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
- return error_info->last_sleep_ms;
- #else
- break;
- #endif
- case LOG_SEND_QUOTA_EXCEED:
- if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
- {
- error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
- error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
- error_info->first_error_time = time(NULL);
- }
- else
- {
- if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
- {
- error_info->last_sleep_ms *= 2;
- }
- if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
- {
- break;
- }
- }
- aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
- (int)send_param->log_buf->length,
- (int)send_param->log_buf->raw_length,
- result->statusCode,
- result->errorMessage));
- return error_info->last_sleep_ms;
- case LOG_SEND_SERVER_ERROR :
- case LOG_SEND_NETWORK_ERROR:
- if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
- {
- error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
- error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
- error_info->first_error_time = time(NULL);
- }
- else
- {
- if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
- {
- error_info->last_sleep_ms *= 2;
- }
- if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
- {
- break;
- }
- }
- aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
- (int)send_param->log_buf->length,
- (int)send_param->log_buf->raw_length,
- result->statusCode,
- result->errorMessage));
- return error_info->last_sleep_ms;
- default:
- // discard data
- break;
- }
- // always try once when discard error
- if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
- {
- error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
- error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
- error_info->first_error_time = time(NULL);
- aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
- (int)send_param->log_buf->length,
- (int)send_param->log_buf->raw_length,
- (int)producer_manager->totalBufferSize,
- result->statusCode,
- result->errorMessage));
- return BASE_NETWORK_ERROR_SLEEP_MS;
- }
- CS_ENTER(producer_manager->lock);
- producer_manager->totalBufferSize -= send_param->log_buf->length;
- CS_LEAVE(producer_manager->lock);
- if (send_result == LOG_SEND_OK)
- {
- aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
- (int)send_param->log_buf->length,
- (int)send_param->log_buf->raw_length,
- (int)producer_manager->totalBufferSize,
- result->statusCode,
- result->errorMessage));
- }
- else
- {
- aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
- (int)send_param->log_buf->length,
- (int)send_param->log_buf->raw_length,
- (int)producer_manager->totalBufferSize,
- result->statusCode,
- result->errorMessage));
- if (producer_manager->send_done_function != NULL)
- {
- producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
- send_param->log_buf->raw_length,
- send_param->log_buf->length,
- result->requestID,
- result->errorMessage,
- send_param->log_buf->data,
- producer_manager->user_param);
- }
- if (producer_manager->uuid_send_done_function != NULL)
- {
- producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
- send_param->log_buf->raw_length,
- send_param->log_buf->length,
- result->requestID,
- result->errorMessage,
- send_param->log_buf->data,
- producer_manager->uuid_user_param,
- send_param->log_buf->n_logs,
- send_param->log_buf->uuid,
- send_param->log_buf->modular);
- }
- }
- return 0;
- }
- log_producer_result log_producer_send_data(log_producer_send_param * send_param)
- {
- log_producer_send_fun(send_param);
- return LOG_PRODUCER_OK;
- }
- log_producer_send_result AosStatusToResult(post_log_result * result)
- {
- if (result->statusCode == LOG_HTTP_ERRPARAM)
- return LOG_SEND_OK;
- if (result->statusCode / 100 == 2)
- {
- return LOG_SEND_OK;
- }
- if (result->statusCode <= 0)
- {
- return LOG_SEND_NETWORK_ERROR;
- }
- if (result->statusCode >= 500)
- {
- return LOG_SEND_SERVER_ERROR;
- }
- if (result->statusCode == 403)
- {
- return LOG_SEND_QUOTA_EXCEED;
- }
- if (result->statusCode == 401 || result->statusCode == 404)
- {
- return LOG_SEND_UNAUTHORIZED;
- }
- if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
- {
- return LOG_SEND_TIME_ERROR;
- }
- return LOG_SEND_OK;
- }
- log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
- void * producer_manager,
- lz4_log_buf* log_buf,
- log_group_builder * builder)
- {
- log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
- param->producer_config = producer_config;
- param->producer_manager = producer_manager;
- param->log_buf = log_buf;
- param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
- if (builder != NULL)
- {
- param->builder_time = builder->builder_time;
- }
- else
- {
- param->builder_time = time(NULL);
- }
- return param;
- }
|