log_producer_sender.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. #include "log_producer_sender.h"
  2. #include "log_api.h"
  3. #include "log_producer_manager.h"
  4. #include "inner_log.h"
  5. #include "lz4.h"
  6. #include "sds.h"
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #ifdef WIN32
  10. #include <windows.h>
  11. #else
  12. #include <unistd.h>
  13. #include <sys/syscall.h>
  14. #endif
  15. long LOG_GET_TIME() { return time(NULL); }
  16. const char* LOGE_SERVER_BUSY = "ServerBusy";
  17. const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
  18. const char* LOGE_UNAUTHORIZED = "Unauthorized";
  19. const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
  20. const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
  21. const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
  22. #define SEND_SLEEP_INTERVAL_MS 50
  23. #define MAX_NETWORK_ERROR_SLEEP_MS 3000
  24. #define BASE_NETWORK_ERROR_SLEEP_MS 300
  25. #define MAX_QUOTA_ERROR_SLEEP_MS 10000
  26. #define BASE_QUOTA_ERROR_SLEEP_MS 500
  27. #define INVALID_TIME_TRY_INTERVAL 500
  28. #define DROP_FAIL_DATA_TIME_SECOND 86400
  29. ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
  30. #pragma data_seg("LOG_SEND_INFO")
  31. unsigned long g_upload_TerminalSys_Suc = 0;
  32. unsigned long g_upload_TerminalUser_Suc = 0;
  33. unsigned long g_upload_BussinessSys_Suc = 0;
  34. unsigned long g_upload_BussinessUser_Suc = 0;
  35. unsigned long g_upload_beidou_Suc = 0;
  36. unsigned long g_upload_vtmsdk_Suc = 0;
  37. unsigned long g_upload_TerminalSys_Err = 0;
  38. unsigned long g_upload_TerminalUser_Err = 0;
  39. unsigned long g_upload_BussinessSys_Err = 0;
  40. unsigned long g_upload_BussinessUser_Err = 0;
  41. unsigned long g_upload_beidou_Err = 0;
  42. unsigned long g_upload_vtmsdk_Err = 0;
  43. #pragma data_seg()
  44. #pragma comment(linker,"/section:LOG_SEND_INFO,rws")
  45. //#define SEND_TIME_INVALID_FIX
  46. typedef struct _send_error_info
  47. {
  48. log_producer_send_result last_send_error;
  49. int32_t last_sleep_ms;
  50. int32_t first_error_time;
  51. }send_error_info;
  52. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
  53. #ifdef SEND_TIME_INVALID_FIX
  54. void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
  55. {
  56. char* buf;
  57. long nowTime;
  58. int compress_bound;
  59. char* compress_data;
  60. int compressed_size;
  61. aos_debug_log("rebuild log.");
  62. buf = (char *)malloc(lz4_buf->raw_length);
  63. if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
  64. {
  65. free(buf);
  66. aos_fatal_log("LZ4_decompress_safe error");
  67. return;
  68. }
  69. nowTime = LOG_GET_TIME();
  70. //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
  71. compress_bound = LZ4_compressBound(lz4_buf->raw_length);
  72. compress_data = (char *)malloc(compress_bound);
  73. compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
  74. if(compressed_size <= 0)
  75. {
  76. aos_fatal_log("LZ4_compress_default error");
  77. free(buf);
  78. free(compress_data);
  79. return;
  80. }
  81. *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
  82. (*new_lz4_buf)->length = compressed_size;
  83. (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
  84. memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
  85. free(buf);
  86. free(compress_data);
  87. return;
  88. }
  89. #endif
  90. #ifdef WIN32
  91. DWORD WINAPI log_producer_send_thread(LPVOID param)
  92. #else
  93. void * log_producer_send_thread(void * param)
  94. #endif
  95. {
  96. log_producer_manager * producer_manager = (log_producer_manager *)param;
  97. if (producer_manager->sender_data_queue == NULL)
  98. {
  99. return 0;
  100. }
  101. Sleep(producer_manager->producer_config->sendThreadWaitMs);
  102. while (!producer_manager->shutdown)
  103. {
  104. // change from 30ms to 1000ms, reduce wake up when app switch to back
  105. void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
  106. if (send_param != NULL)
  107. {
  108. int32_t begin;
  109. ATOMICINT_INC(&producer_manager->multi_thread_send_count);
  110. begin = time(NULL);
  111. log_producer_send_fun(send_param);
  112. aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
  113. ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
  114. }
  115. }
  116. return 0;
  117. }
  118. void * log_producer_send_fun(void * param)
  119. {
  120. log_producer_manager* producer_manager;
  121. log_producer_config* config;
  122. send_error_info error_info;
  123. log_producer_send_param * send_param = (log_producer_send_param *)param;
  124. if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
  125. {
  126. aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
  127. producer_manager = (log_producer_manager *)send_param->producer_manager;
  128. if (producer_manager && producer_manager->send_done_function != NULL)
  129. {
  130. producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
  131. NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
  132. }
  133. if (producer_manager && producer_manager->uuid_send_done_function != NULL)
  134. {
  135. producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
  136. send_param->log_buf->raw_length,
  137. send_param->log_buf->length,
  138. NULL,
  139. "invalid send param, magic num not found",
  140. send_param->log_buf->data,
  141. producer_manager->uuid_user_param,
  142. send_param->log_buf->n_logs,
  143. send_param->log_buf->uuid,
  144. send_param->log_buf->modular);
  145. }
  146. return NULL;
  147. }
  148. config = send_param->producer_config;
  149. memset(&error_info, 0, sizeof(error_info));
  150. producer_manager = (log_producer_manager *)send_param->producer_manager;
  151. do
  152. {
  153. lz4_log_buf* send_buf;
  154. log_post_option option;
  155. sds accessKeyId = NULL;
  156. sds accessKey = NULL;
  157. sds stsToken = NULL;
  158. post_log_result* rst;
  159. int32_t sleepMs;
  160. int i = 0;
  161. char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = "";
  162. if (producer_manager->shutdown)
  163. {
  164. aos_info_log((LB, "send fail but shutdown signal received, force exit"));
  165. if (producer_manager->send_done_function != NULL)
  166. {
  167. producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
  168. NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
  169. }
  170. break;
  171. }
  172. send_buf = send_param->log_buf;
  173. #ifdef SEND_TIME_INVALID_FIX
  174. nowTime = LOG_GET_TIME();
  175. if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
  176. {
  177. _rebuild_time(send_param->log_buf, &send_buf);
  178. send_param->builder_time = nowTime;
  179. }
  180. #endif
  181. memset(&option, 0, sizeof(log_post_option));
  182. option.connect_timeout = config->connectTimeoutSec;
  183. option.operation_timeout = config->sendTimeoutSec;
  184. option.compress_type = config->compressType;
  185. option.using_https = config->using_https;
  186. option.ntp_time_offset = config->ntpTimeOffset;
  187. if(config->tokenFun != NULL)
  188. config->tokenFun(channelId, token, terminalno, reserve1);
  189. rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs
  190. aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
  191. sdsfree(accessKeyId);
  192. sdsfree(accessKey);
  193. sdsfree(stsToken);
  194. if(rst->statusCode == 200)
  195. {
  196. switch(send_buf->type)
  197. {
  198. case LOG_TYPE_USER_SKYEYE:
  199. g_upload_TerminalUser_Suc += send_buf->n_logs;
  200. break;
  201. case LOG_TYPE_SYS_SKYEYE:
  202. g_upload_TerminalSys_Suc += send_buf->n_logs;
  203. break;
  204. case LOG_TYPE_BEIDOU:
  205. g_upload_beidou_Suc += send_buf->n_logs;
  206. break;
  207. case LOG_TYPE_USER_BUSINESS:
  208. g_upload_BussinessUser_Suc += send_buf->n_logs;
  209. break;
  210. case LOG_TYPE_SYS_BUSINESS:
  211. g_upload_BussinessSys_Suc += send_buf->n_logs;
  212. break;
  213. case LOG_TYPE_WEBSDK:
  214. g_upload_vtmsdk_Suc += send_buf->n_logs;
  215. break;
  216. default:
  217. break;
  218. }
  219. }
  220. else
  221. {
  222. switch (send_buf->type)
  223. {
  224. case LOG_TYPE_USER_SKYEYE:
  225. g_upload_TerminalUser_Err += send_buf->n_logs;
  226. break;
  227. case LOG_TYPE_SYS_SKYEYE:
  228. g_upload_TerminalSys_Err += send_buf->n_logs;
  229. break;
  230. case LOG_TYPE_BEIDOU:
  231. g_upload_beidou_Err += send_buf->n_logs;
  232. break;
  233. case LOG_TYPE_USER_BUSINESS:
  234. g_upload_BussinessUser_Err += send_buf->n_logs;
  235. break;
  236. case LOG_TYPE_SYS_BUSINESS:
  237. g_upload_BussinessSys_Err += send_buf->n_logs;
  238. break;
  239. case LOG_TYPE_WEBSDK:
  240. g_upload_vtmsdk_Err += send_buf->n_logs;
  241. break;
  242. default:
  243. break;
  244. }
  245. }
  246. sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
  247. post_log_result_destroy(rst);
  248. // tmp buffer, free
  249. if (send_buf != send_param->log_buf)
  250. {
  251. free(send_buf);
  252. }
  253. if (sleepMs <= 0)
  254. {
  255. break;
  256. }
  257. i =0;
  258. for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
  259. {
  260. #ifdef WIN32
  261. Sleep(SEND_SLEEP_INTERVAL_MS);
  262. #else
  263. usleep(SEND_SLEEP_INTERVAL_MS * 1000);
  264. #endif
  265. if (producer_manager->shutdown || producer_manager->networkRecover)
  266. {
  267. break;
  268. }
  269. }
  270. if (producer_manager->networkRecover)
  271. {
  272. producer_manager->networkRecover = 0;
  273. }
  274. }while(1);
  275. // at last, free all buffer
  276. free_lz4_log_buf(send_param->log_buf);
  277. free(send_param);
  278. return NULL;
  279. }
  280. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
  281. {
  282. log_producer_send_result send_result = AosStatusToResult(result);
  283. log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
  284. if (producer_manager->send_done_function != NULL)
  285. {
  286. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  287. LOG_PRODUCER_OK :
  288. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  289. producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
  290. }
  291. if (producer_manager->uuid_send_done_function != NULL)
  292. {
  293. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  294. LOG_PRODUCER_OK :
  295. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  296. producer_manager->uuid_send_done_function(callback_result,
  297. send_param->log_buf->raw_length,
  298. send_param->log_buf->length,
  299. result->requestID,
  300. result->errorMessage,
  301. send_param->log_buf->data,
  302. producer_manager->uuid_user_param,
  303. send_param->log_buf->n_logs,
  304. send_param->log_buf->uuid,
  305. send_param->log_buf->modular);
  306. }
  307. if (send_result == LOG_SEND_UNAUTHORIZED)
  308. {
  309. // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
  310. send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
  311. }
  312. switch (send_result)
  313. {
  314. case LOG_SEND_OK:
  315. break;
  316. case LOG_SEND_TIME_ERROR:
  317. // if no this marco, drop data
  318. #ifdef SEND_TIME_INVALID_FIX
  319. error_info->last_send_error = LOG_SEND_TIME_ERROR;
  320. error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
  321. return error_info->last_sleep_ms;
  322. #else
  323. break;
  324. #endif
  325. case LOG_SEND_QUOTA_EXCEED:
  326. if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
  327. {
  328. error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
  329. error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
  330. error_info->first_error_time = time(NULL);
  331. }
  332. else
  333. {
  334. if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
  335. {
  336. error_info->last_sleep_ms *= 2;
  337. }
  338. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  339. {
  340. break;
  341. }
  342. }
  343. aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  344. (int)send_param->log_buf->length,
  345. (int)send_param->log_buf->raw_length,
  346. result->statusCode,
  347. result->errorMessage));
  348. return error_info->last_sleep_ms;
  349. case LOG_SEND_SERVER_ERROR :
  350. case LOG_SEND_NETWORK_ERROR:
  351. if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
  352. {
  353. error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
  354. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  355. error_info->first_error_time = time(NULL);
  356. }
  357. else
  358. {
  359. if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
  360. {
  361. error_info->last_sleep_ms *= 2;
  362. }
  363. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  364. {
  365. break;
  366. }
  367. }
  368. aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  369. (int)send_param->log_buf->length,
  370. (int)send_param->log_buf->raw_length,
  371. result->statusCode,
  372. result->errorMessage));
  373. return error_info->last_sleep_ms;
  374. default:
  375. // discard data
  376. break;
  377. }
  378. // always try once when discard error
  379. if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
  380. {
  381. error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
  382. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  383. error_info->first_error_time = time(NULL);
  384. aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  385. (int)send_param->log_buf->length,
  386. (int)send_param->log_buf->raw_length,
  387. (int)producer_manager->totalBufferSize,
  388. result->statusCode,
  389. result->errorMessage));
  390. return BASE_NETWORK_ERROR_SLEEP_MS;
  391. }
  392. CS_ENTER(producer_manager->lock);
  393. producer_manager->totalBufferSize -= send_param->log_buf->length;
  394. CS_LEAVE(producer_manager->lock);
  395. if (send_result == LOG_SEND_OK)
  396. {
  397. aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  398. (int)send_param->log_buf->length,
  399. (int)send_param->log_buf->raw_length,
  400. (int)producer_manager->totalBufferSize,
  401. result->statusCode,
  402. result->errorMessage));
  403. }
  404. else
  405. {
  406. aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  407. (int)send_param->log_buf->length,
  408. (int)send_param->log_buf->raw_length,
  409. (int)producer_manager->totalBufferSize,
  410. result->statusCode,
  411. result->errorMessage));
  412. if (producer_manager->send_done_function != NULL)
  413. {
  414. producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
  415. send_param->log_buf->raw_length,
  416. send_param->log_buf->length,
  417. result->requestID,
  418. result->errorMessage,
  419. send_param->log_buf->data,
  420. producer_manager->user_param);
  421. }
  422. if (producer_manager->uuid_send_done_function != NULL)
  423. {
  424. producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
  425. send_param->log_buf->raw_length,
  426. send_param->log_buf->length,
  427. result->requestID,
  428. result->errorMessage,
  429. send_param->log_buf->data,
  430. producer_manager->uuid_user_param,
  431. send_param->log_buf->n_logs,
  432. send_param->log_buf->uuid,
  433. send_param->log_buf->modular);
  434. }
  435. }
  436. return 0;
  437. }
  438. log_producer_result log_producer_send_data(log_producer_send_param * send_param)
  439. {
  440. log_producer_send_fun(send_param);
  441. return LOG_PRODUCER_OK;
  442. }
  443. log_producer_send_result AosStatusToResult(post_log_result * result)
  444. {
  445. if (result->statusCode == LOG_HTTP_ERRPARAM)
  446. return LOG_SEND_OK;
  447. if (result->statusCode / 100 == 2)
  448. {
  449. return LOG_SEND_OK;
  450. }
  451. if (result->statusCode <= 0)
  452. {
  453. return LOG_SEND_NETWORK_ERROR;
  454. }
  455. if (result->statusCode >= 500)
  456. {
  457. return LOG_SEND_SERVER_ERROR;
  458. }
  459. if (result->statusCode == 403)
  460. {
  461. return LOG_SEND_QUOTA_EXCEED;
  462. }
  463. if (result->statusCode == 401 || result->statusCode == 404)
  464. {
  465. return LOG_SEND_UNAUTHORIZED;
  466. }
  467. if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
  468. {
  469. return LOG_SEND_TIME_ERROR;
  470. }
  471. return LOG_SEND_OK;
  472. }
  473. log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
  474. void * producer_manager,
  475. lz4_log_buf* log_buf,
  476. log_group_builder * builder)
  477. {
  478. log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
  479. param->producer_config = producer_config;
  480. param->producer_manager = producer_manager;
  481. param->log_buf = log_buf;
  482. param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
  483. if (builder != NULL)
  484. {
  485. param->builder_time = builder->builder_time;
  486. }
  487. else
  488. {
  489. param->builder_time = time(NULL);
  490. }
  491. return param;
  492. }