log_producer_sender.cpp 21 KB


  1. #include "log_producer_sender.h"
  2. #include "log_api.h"
  3. #include "log_producer_manager.h"
  4. #include "inner_log.h"
  5. #include "lz4.h"
  6. #include "sds.h"
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include "baseFun.h"
  10. #ifdef WIN32
  11. #include <windows.h>
  12. #else
  13. #include <unistd.h>
  14. #include <sys/syscall.h>
  15. #endif
  16. const char* LOGE_SERVER_BUSY = "ServerBusy";
  17. const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
  18. const char* LOGE_UNAUTHORIZED = "Unauthorized";
  19. const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
  20. const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
  21. const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
  22. #define SEND_SLEEP_INTERVAL_MS 50
  23. #define MAX_NETWORK_ERROR_SLEEP_MS 3000
  24. #define BASE_NETWORK_ERROR_SLEEP_MS 300
  25. #define MAX_QUOTA_ERROR_SLEEP_MS 10000
  26. #define BASE_QUOTA_ERROR_SLEEP_MS 500
  27. #define INVALID_TIME_TRY_INTERVAL 500
  28. #define DROP_FAIL_DATA_TIME_SECOND 86400
  29. ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
  30. unsigned long g_upload_TerminalSys_Suc = 0;
  31. unsigned long g_upload_TerminalUser_Suc = 0;
  32. unsigned long g_upload_BussinessSys_Suc = 0;
  33. unsigned long g_upload_BussinessUser_Suc = 0;
  34. unsigned long g_upload_beidou_Suc = 0;
  35. unsigned long g_upload_vtmsdk_Suc = 0;
  36. unsigned long g_upload_TerminalSys_Err = 0;
  37. unsigned long g_upload_TerminalUser_Err = 0;
  38. unsigned long g_upload_BussinessSys_Err = 0;
  39. unsigned long g_upload_BussinessUser_Err = 0;
  40. unsigned long g_upload_beidou_Err = 0;
  41. unsigned long g_upload_vtmsdk_Err = 0;
  42. unsigned long g_discardMsgNum_since_full = 0;
  43. unsigned long g_discardMsgNum_since_serverRet_RTI1002 = 0;
  44. unsigned long g_notUploadLogNum = 0;
  45. //#define SEND_TIME_INVALID_FIX
  46. typedef struct _send_error_info
  47. {
  48. log_producer_send_result last_send_error;
  49. int32_t last_sleep_ms;
  50. int32_t first_error_time;
  51. }send_error_info;
  52. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
  53. #ifdef SEND_TIME_INVALID_FIX
  54. void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
  55. {
  56. char* buf;
  57. long nowTime;
  58. int compress_bound;
  59. char* compress_data;
  60. int compressed_size;
  61. aos_debug_log("rebuild log.");
  62. buf = (char *)malloc(lz4_buf->raw_length);
  63. if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
  64. {
  65. free(buf);
  66. aos_fatal_log("LZ4_decompress_safe error");
  67. return;
  68. }
  69. nowTime = LOG_GET_TIME();
  70. //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
  71. compress_bound = LZ4_compressBound(lz4_buf->raw_length);
  72. compress_data = (char *)malloc(compress_bound);
  73. compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
  74. if(compressed_size <= 0)
  75. {
  76. aos_fatal_log("LZ4_compress_default error");
  77. free(buf);
  78. free(compress_data);
  79. return;
  80. }
  81. *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
  82. (*new_lz4_buf)->length = compressed_size;
  83. (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
  84. memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
  85. free(buf);
  86. free(compress_data);
  87. return;
  88. }
  89. #endif
  90. #ifdef WIN32
  91. DWORD WINAPI log_producer_send_thread(LPVOID param)
  92. #else
  93. void * log_producer_send_thread(void * param)
  94. #endif
  95. {
  96. log_producer_manager * producer_manager = (log_producer_manager *)param;
  97. if (producer_manager->sender_data_queue == NULL)
  98. {
  99. return 0;
  100. }
  101. Sleep(producer_manager->producer_config->sendThreadWaitMs);
  102. while (!producer_manager->shutdown)
  103. {
  104. // change from 30ms to 1000ms, reduce wake up when app switch to back
  105. void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
  106. if (send_param != NULL)
  107. {
  108. int32_t begin;
  109. ATOMICINT_INC(&producer_manager->multi_thread_send_count);
  110. begin = time(NULL);
  111. log_producer_send_fun(send_param);
  112. aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
  113. ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
  114. }
  115. }
  116. return 0;
  117. }
  118. void send_log_data(const char* url, const char* body)
  119. {
  120. RvcLogSdkManager::getInstance().LOG_OS_TestLogPost(url, body);
  121. }
  122. void * log_producer_send_fun(void * param)
  123. {
  124. log_producer_manager* producer_manager;
  125. log_producer_config* config;
  126. send_error_info error_info;
  127. log_producer_send_param * send_param = (log_producer_send_param *)param;
  128. if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
  129. {
  130. //aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
  131. producer_manager = (log_producer_manager *)send_param->producer_manager;
  132. if (producer_manager && producer_manager->send_done_function != NULL)
  133. {
  134. producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
  135. NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
  136. }
  137. if (producer_manager && producer_manager->uuid_send_done_function != NULL)
  138. {
  139. producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
  140. send_param->log_buf->raw_length,
  141. send_param->log_buf->length,
  142. NULL,
  143. "invalid send param, magic num not found",
  144. send_param->log_buf->data,
  145. producer_manager->uuid_user_param,
  146. send_param->log_buf->n_logs,
  147. send_param->log_buf->uuid,
  148. send_param->log_buf->modular);
  149. }
  150. return NULL;
  151. }
  152. config = send_param->producer_config;
  153. memset(&error_info, 0, sizeof(error_info));
  154. producer_manager = (log_producer_manager *)send_param->producer_manager;
  155. do
  156. {
  157. lz4_log_buf* send_buf;
  158. log_post_option option;
  159. sds accessKeyId = NULL;
  160. sds accessKey = NULL;
  161. sds stsToken = NULL;
  162. post_log_result* rst;
  163. int32_t sleepMs;
  164. int i = 0;
  165. char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = "";
  166. if (producer_manager->shutdown)
  167. {
  168. aos_info_log((LB, "send fail but shutdown signal received, force exit"));
  169. if (producer_manager->send_done_function != NULL)
  170. {
  171. producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
  172. NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
  173. }
  174. break;
  175. }
  176. send_buf = send_param->log_buf;
  177. #ifdef SEND_TIME_INVALID_FIX
  178. nowTime = LOG_GET_TIME();
  179. if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
  180. {
  181. _rebuild_time(send_param->log_buf, &send_buf);
  182. send_param->builder_time = nowTime;
  183. }
  184. #endif
  185. memset(&option, 0, sizeof(log_post_option));
  186. option.connect_timeout = config->connectTimeoutSec;
  187. option.operation_timeout = config->sendTimeoutSec;
  188. option.compress_type = config->compressType;
  189. option.using_https = config->using_https;
  190. option.ntp_time_offset = config->ntpTimeOffset;
  191. if(config->tokenFun != NULL)
  192. config->tokenFun(channelId, token, terminalno, reserve1);
  193. rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs
  194. aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
  195. sdsfree(accessKeyId);
  196. sdsfree(accessKey);
  197. sdsfree(stsToken);
  198. if(rst->statusCode == 200)
  199. {
  200. switch(send_buf->type)
  201. {
  202. case LOG_TYPE_USER_SKYEYE:
  203. g_upload_TerminalUser_Suc += send_buf->n_logs;
  204. break;
  205. case LOG_TYPE_SYS_SKYEYE:
  206. g_upload_TerminalSys_Suc += send_buf->n_logs;
  207. break;
  208. case LOG_TYPE_BEIDOU:
  209. g_upload_beidou_Suc += send_buf->n_logs;
  210. break;
  211. case LOG_TYPE_USER_BUSINESS:
  212. g_upload_BussinessUser_Suc += send_buf->n_logs;
  213. break;
  214. case LOG_TYPE_SYS_BUSINESS:
  215. g_upload_BussinessSys_Suc += send_buf->n_logs;
  216. break;
  217. case LOG_TYPE_WEBSDK:
  218. g_upload_vtmsdk_Suc += send_buf->n_logs;
  219. break;
  220. default:
  221. break;
  222. }
  223. }
  224. else
  225. {
  226. switch (send_buf->type)
  227. {
  228. case LOG_TYPE_USER_SKYEYE:
  229. g_upload_TerminalUser_Err += send_buf->n_logs;
  230. break;
  231. case LOG_TYPE_SYS_SKYEYE:
  232. g_upload_TerminalSys_Err += send_buf->n_logs;
  233. break;
  234. case LOG_TYPE_BEIDOU:
  235. g_upload_beidou_Err += send_buf->n_logs;
  236. break;
  237. case LOG_TYPE_USER_BUSINESS:
  238. g_upload_BussinessUser_Err += send_buf->n_logs;
  239. break;
  240. case LOG_TYPE_SYS_BUSINESS:
  241. g_upload_BussinessSys_Err += send_buf->n_logs;
  242. break;
  243. case LOG_TYPE_WEBSDK:
  244. g_upload_vtmsdk_Err += send_buf->n_logs;
  245. break;
  246. default:
  247. break;
  248. }
  249. if(rst->statusCode == 300)
  250. {
  251. /*
  252. (1) if multi logs,split and send one by one again, all give uuid LOG_SEND_ONE.
  253. - if uuid equal to LOG_SEND_ONE, don't not delete uuid from db
  254. - if uuid not equal to LOG_SEND_ONE, delete uuid from db
  255. (2) if single log, means dirty data, encrypt it and give uuid LOG_SEND_DIRTY.
  256. - record the nums of dirty logs
  257. */
  258. if(send_buf->n_logs == 1)//单条log, 已经尝试过重发,不再重发
  259. {
  260. if(STR_LOG_SEND_DIRTY != send_buf->src_logs[0].uuid)
  261. {
  262. memcpy(send_buf->src_logs[0].uuid, STR_LOG_SEND_DIRTY, strlen(STR_LOG_SEND_DIRTY)+1);
  263. log_group_builder* builder = log_group_create(config);
  264. strcpy(builder->modular, send_buf->modular);
  265. add_log_raw2(builder, &(send_buf->src_logs[0]));
  266. RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder);
  267. g_discardMsgNum_since_serverRet_RTI1002 += send_buf->n_logs;
  268. }
  269. }
  270. else
  271. {
  272. //try resend, push to deque
  273. for(int i = 0; i < send_buf->n_logs; i++)
  274. {
  275. memcpy(send_buf->src_logs[i].uuid, STR_LOG_SEND_ONE, strlen(STR_LOG_SEND_ONE) + 1);
  276. log_group_builder* builder = log_group_create(config);
  277. strcpy(builder->modular, send_buf->modular);
  278. add_log_raw2(builder, &(send_buf->src_logs[i]));
  279. RvcLogSdkManager::getInstance().getResendFrameList().push_back(builder);
  280. }
  281. }
  282. }
  283. }
  284. sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
  285. post_log_result_destroy(rst);
  286. // tmp buffer, free
  287. if (send_buf != send_param->log_buf)
  288. {
  289. free(send_buf);
  290. }
  291. if (sleepMs <= 0)
  292. {
  293. break;
  294. }
  295. i =0;
  296. for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
  297. {
  298. #ifdef WIN32
  299. Sleep(SEND_SLEEP_INTERVAL_MS);
  300. #else
  301. usleep(SEND_SLEEP_INTERVAL_MS * 1000);
  302. #endif
  303. if (producer_manager->shutdown || producer_manager->networkRecover)
  304. {
  305. break;
  306. }
  307. }
  308. if (producer_manager->networkRecover)
  309. {
  310. producer_manager->networkRecover = 0;
  311. }
  312. }while(1);
  313. // at last, free all buffer
  314. free_lz4_log_buf(send_param->log_buf);
  315. free(send_param);
  316. return NULL;
  317. }
  318. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
  319. {
  320. log_producer_send_result send_result = AosStatusToResult(result);
  321. log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
  322. if (producer_manager->send_done_function != NULL)
  323. {
  324. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  325. LOG_PRODUCER_OK :
  326. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  327. producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
  328. }
  329. if (producer_manager->uuid_send_done_function != NULL)
  330. {
  331. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  332. LOG_PRODUCER_OK :
  333. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  334. producer_manager->uuid_send_done_function(callback_result,
  335. send_param->log_buf->raw_length,
  336. send_param->log_buf->length,
  337. result->requestID,
  338. result->errorMessage,
  339. send_param->log_buf->data,
  340. producer_manager->uuid_user_param,
  341. send_param->log_buf->n_logs,
  342. send_param->log_buf->uuid,
  343. send_param->log_buf->modular);
  344. }
  345. if (send_result == LOG_SEND_UNAUTHORIZED)
  346. {
  347. // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
  348. send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
  349. }
  350. switch (send_result)
  351. {
  352. case LOG_SEND_OK:
  353. break;
  354. case LOG_SEND_TIME_ERROR:
  355. // if no this marco, drop data
  356. #ifdef SEND_TIME_INVALID_FIX
  357. error_info->last_send_error = LOG_SEND_TIME_ERROR;
  358. error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
  359. return error_info->last_sleep_ms;
  360. #else
  361. break;
  362. #endif
  363. case LOG_SEND_QUOTA_EXCEED:
  364. if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
  365. {
  366. error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
  367. error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
  368. error_info->first_error_time = time(NULL);
  369. }
  370. else
  371. {
  372. if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
  373. {
  374. error_info->last_sleep_ms *= 2;
  375. }
  376. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  377. {
  378. break;
  379. }
  380. }
  381. aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  382. (int)send_param->log_buf->length,
  383. (int)send_param->log_buf->raw_length,
  384. result->statusCode,
  385. result->errorMessage));
  386. return error_info->last_sleep_ms;
  387. case LOG_SEND_SERVER_ERROR :
  388. case LOG_SEND_NETWORK_ERROR:
  389. if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
  390. {
  391. error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
  392. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  393. error_info->first_error_time = time(NULL);
  394. }
  395. else
  396. {
  397. if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
  398. {
  399. error_info->last_sleep_ms *= 2;
  400. }
  401. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  402. {
  403. break;
  404. }
  405. }
  406. aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  407. (int)send_param->log_buf->length,
  408. (int)send_param->log_buf->raw_length,
  409. result->statusCode,
  410. result->errorMessage));
  411. return error_info->last_sleep_ms;
  412. default:
  413. // discard data
  414. break;
  415. }
  416. // always try once when discard error
  417. if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
  418. {
  419. error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
  420. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  421. error_info->first_error_time = time(NULL);
  422. aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  423. (int)send_param->log_buf->length,
  424. (int)send_param->log_buf->raw_length,
  425. (int)producer_manager->totalBufferSize,
  426. result->statusCode,
  427. result->errorMessage));
  428. return BASE_NETWORK_ERROR_SLEEP_MS;
  429. }
  430. CS_ENTER(producer_manager->lock);
  431. producer_manager->totalBufferSize -= send_param->log_buf->length;
  432. CS_LEAVE(producer_manager->lock);
  433. if (send_result == LOG_SEND_OK)
  434. {
  435. aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  436. (int)send_param->log_buf->length,
  437. (int)send_param->log_buf->raw_length,
  438. (int)producer_manager->totalBufferSize,
  439. result->statusCode,
  440. result->errorMessage));
  441. }
  442. else
  443. {
  444. aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  445. (int)send_param->log_buf->length,
  446. (int)send_param->log_buf->raw_length,
  447. (int)producer_manager->totalBufferSize,
  448. result->statusCode,
  449. result->errorMessage));
  450. if (producer_manager->send_done_function != NULL)
  451. {
  452. producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
  453. send_param->log_buf->raw_length,
  454. send_param->log_buf->length,
  455. result->requestID,
  456. result->errorMessage,
  457. send_param->log_buf->data,
  458. producer_manager->user_param);
  459. }
  460. if (producer_manager->uuid_send_done_function != NULL)
  461. {
  462. producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
  463. send_param->log_buf->raw_length,
  464. send_param->log_buf->length,
  465. result->requestID,
  466. result->errorMessage,
  467. send_param->log_buf->data,
  468. producer_manager->uuid_user_param,
  469. send_param->log_buf->n_logs,
  470. send_param->log_buf->uuid,
  471. send_param->log_buf->modular);
  472. }
  473. }
  474. return 0;
  475. }
  476. log_producer_result log_producer_send_data(log_producer_send_param * send_param)
  477. {
  478. log_producer_send_fun(send_param);
  479. return LOG_PRODUCER_OK;
  480. }
  481. log_producer_send_result AosStatusToResult(post_log_result * result)
  482. {
  483. if (result->statusCode == LOG_HTTP_ERRPARAM)
  484. return LOG_SEND_OK;
  485. if (result->statusCode / 100 == 2)
  486. {
  487. return LOG_SEND_OK;
  488. }
  489. if (result->statusCode <= 0)
  490. {
  491. return LOG_SEND_NETWORK_ERROR;
  492. }
  493. if (result->statusCode >= 500)
  494. {
  495. return LOG_SEND_SERVER_ERROR;
  496. }
  497. if (result->statusCode == 403)
  498. {
  499. return LOG_SEND_QUOTA_EXCEED;
  500. }
  501. if (result->statusCode == 401 || result->statusCode == 404)
  502. {
  503. return LOG_SEND_UNAUTHORIZED;
  504. }
  505. if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
  506. {
  507. return LOG_SEND_TIME_ERROR;
  508. }
  509. return LOG_SEND_OK;
  510. }
  511. log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
  512. void * producer_manager,
  513. lz4_log_buf* log_buf,
  514. log_group_builder * builder)
  515. {
  516. log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
  517. param->producer_config = producer_config;
  518. param->producer_manager = producer_manager;
  519. param->log_buf = log_buf;
  520. param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
  521. if (builder != NULL)
  522. {
  523. param->builder_time = builder->builder_time;
  524. }
  525. else
  526. {
  527. param->builder_time = time(NULL);
  528. }
  529. return param;
  530. }