log_producer_sender.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. #include "log_producer_sender.h"
  2. #include "log_api.h"
  3. #include "log_producer_manager.h"
  4. #include "inner_log.h"
  5. #include "lz4.h"
  6. #include "sds.h"
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #ifdef WIN32
  10. #include <windows.h>
  11. #else
  12. #include <unistd.h>
  13. #include <sys/syscall.h>
  14. #endif
  15. long LOG_GET_TIME() { return time(NULL); }
  16. const char* LOGE_SERVER_BUSY = "ServerBusy";
  17. const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
  18. const char* LOGE_UNAUTHORIZED = "Unauthorized";
  19. const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
  20. const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
  21. const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
  22. #define SEND_SLEEP_INTERVAL_MS 50
  23. #define MAX_NETWORK_ERROR_SLEEP_MS 3000
  24. #define BASE_NETWORK_ERROR_SLEEP_MS 300
  25. #define MAX_QUOTA_ERROR_SLEEP_MS 10000
  26. #define BASE_QUOTA_ERROR_SLEEP_MS 500
  27. #define INVALID_TIME_TRY_INTERVAL 500
  28. #define DROP_FAIL_DATA_TIME_SECOND 86400
  29. ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
  30. #pragma data_seg("LOG_SEND_INFO")
  31. unsigned long g_upload_TerminalSys_Suc = 0;
  32. unsigned long g_upload_TerminalUser_Suc = 0;
  33. unsigned long g_upload_BussinessSys_Suc = 0;
  34. unsigned long g_upload_BussinessUser_Suc = 0;
  35. unsigned long g_upload_beidou_Suc = 0;
  36. unsigned long g_upload_vtmsdk_Suc = 0;
  37. unsigned long g_upload_TerminalSys_Err = 0;
  38. unsigned long g_upload_TerminalUser_Err = 0;
  39. unsigned long g_upload_BussinessSys_Err = 0;
  40. unsigned long g_upload_BussinessUser_Err = 0;
  41. unsigned long g_upload_beidou_Err = 0;
  42. unsigned long g_upload_vtmsdk_Err = 0;
  43. #pragma data_seg()
  44. #pragma comment(linker,"/section:LOG_SEND_INFO,rws")
  45. //#define SEND_TIME_INVALID_FIX
  46. typedef struct _send_error_info
  47. {
  48. log_producer_send_result last_send_error;
  49. int32_t last_sleep_ms;
  50. int32_t first_error_time;
  51. }send_error_info;
  52. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
  53. #ifdef SEND_TIME_INVALID_FIX
  54. void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
  55. {
  56. char* buf;
  57. long nowTime;
  58. int compress_bound;
  59. char* compress_data;
  60. int compressed_size;
  61. aos_debug_log("rebuild log.");
  62. buf = (char *)malloc(lz4_buf->raw_length);
  63. if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
  64. {
  65. free(buf);
  66. aos_fatal_log("LZ4_decompress_safe error");
  67. return;
  68. }
  69. nowTime = LOG_GET_TIME();
  70. //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
  71. compress_bound = LZ4_compressBound(lz4_buf->raw_length);
  72. compress_data = (char *)malloc(compress_bound);
  73. compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
  74. if(compressed_size <= 0)
  75. {
  76. aos_fatal_log("LZ4_compress_default error");
  77. free(buf);
  78. free(compress_data);
  79. return;
  80. }
  81. *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
  82. (*new_lz4_buf)->length = compressed_size;
  83. (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
  84. memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
  85. free(buf);
  86. free(compress_data);
  87. return;
  88. }
  89. #endif
  90. #ifdef WIN32
  91. DWORD WINAPI log_producer_send_thread(LPVOID param)
  92. #else
  93. void * log_producer_send_thread(void * param)
  94. #endif
  95. {
  96. log_producer_manager * producer_manager = (log_producer_manager *)param;
  97. if (producer_manager->sender_data_queue == NULL)
  98. {
  99. return 0;
  100. }
  101. Sleep(producer_manager->producer_config->sendThreadWaitMs);
  102. while (!producer_manager->shutdown)
  103. {
  104. // change from 30ms to 1000ms, reduce wake up when app switch to back
  105. void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
  106. if (send_param != NULL)
  107. {
  108. int32_t begin;
  109. ATOMICINT_INC(&producer_manager->multi_thread_send_count);
  110. begin = time(NULL);
  111. log_producer_send_fun(send_param);
  112. aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
  113. ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
  114. }
  115. }
  116. return 0;
  117. }
  118. void send_log_data(const char* url, const char* body)
  119. {
  120. LOG_OS_TestLogPost(url, body);
  121. }
  122. void * log_producer_send_fun(void * param)
  123. {
  124. log_producer_manager* producer_manager;
  125. log_producer_config* config;
  126. send_error_info error_info;
  127. log_producer_send_param * send_param = (log_producer_send_param *)param;
  128. if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
  129. {
  130. aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
  131. producer_manager = (log_producer_manager *)send_param->producer_manager;
  132. if (producer_manager && producer_manager->send_done_function != NULL)
  133. {
  134. producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
  135. NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
  136. }
  137. if (producer_manager && producer_manager->uuid_send_done_function != NULL)
  138. {
  139. producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
  140. send_param->log_buf->raw_length,
  141. send_param->log_buf->length,
  142. NULL,
  143. "invalid send param, magic num not found",
  144. send_param->log_buf->data,
  145. producer_manager->uuid_user_param,
  146. send_param->log_buf->n_logs,
  147. send_param->log_buf->uuid,
  148. send_param->log_buf->modular);
  149. }
  150. return NULL;
  151. }
  152. config = send_param->producer_config;
  153. memset(&error_info, 0, sizeof(error_info));
  154. producer_manager = (log_producer_manager *)send_param->producer_manager;
  155. do
  156. {
  157. lz4_log_buf* send_buf;
  158. log_post_option option;
  159. sds accessKeyId = NULL;
  160. sds accessKey = NULL;
  161. sds stsToken = NULL;
  162. post_log_result* rst;
  163. int32_t sleepMs;
  164. int i = 0;
  165. char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = "";
  166. if (producer_manager->shutdown)
  167. {
  168. aos_info_log((LB, "send fail but shutdown signal received, force exit"));
  169. if (producer_manager->send_done_function != NULL)
  170. {
  171. producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
  172. NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
  173. }
  174. break;
  175. }
  176. send_buf = send_param->log_buf;
  177. #ifdef SEND_TIME_INVALID_FIX
  178. nowTime = LOG_GET_TIME();
  179. if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
  180. {
  181. _rebuild_time(send_param->log_buf, &send_buf);
  182. send_param->builder_time = nowTime;
  183. }
  184. #endif
  185. memset(&option, 0, sizeof(log_post_option));
  186. option.connect_timeout = config->connectTimeoutSec;
  187. option.operation_timeout = config->sendTimeoutSec;
  188. option.compress_type = config->compressType;
  189. option.using_https = config->using_https;
  190. option.ntp_time_offset = config->ntpTimeOffset;
  191. if(config->tokenFun != NULL)
  192. config->tokenFun(channelId, token, terminalno, reserve1);
  193. rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs
  194. aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
  195. sdsfree(accessKeyId);
  196. sdsfree(accessKey);
  197. sdsfree(stsToken);
  198. if(rst->statusCode == 200)
  199. {
  200. switch(send_buf->type)
  201. {
  202. case LOG_TYPE_USER_SKYEYE:
  203. g_upload_TerminalUser_Suc += send_buf->n_logs;
  204. break;
  205. case LOG_TYPE_SYS_SKYEYE:
  206. g_upload_TerminalSys_Suc += send_buf->n_logs;
  207. break;
  208. case LOG_TYPE_BEIDOU:
  209. g_upload_beidou_Suc += send_buf->n_logs;
  210. break;
  211. case LOG_TYPE_USER_BUSINESS:
  212. g_upload_BussinessUser_Suc += send_buf->n_logs;
  213. break;
  214. case LOG_TYPE_SYS_BUSINESS:
  215. g_upload_BussinessSys_Suc += send_buf->n_logs;
  216. break;
  217. case LOG_TYPE_WEBSDK:
  218. g_upload_vtmsdk_Suc += send_buf->n_logs;
  219. break;
  220. default:
  221. break;
  222. }
  223. }
  224. else
  225. {
  226. switch (send_buf->type)
  227. {
  228. case LOG_TYPE_USER_SKYEYE:
  229. g_upload_TerminalUser_Err += send_buf->n_logs;
  230. break;
  231. case LOG_TYPE_SYS_SKYEYE:
  232. g_upload_TerminalSys_Err += send_buf->n_logs;
  233. break;
  234. case LOG_TYPE_BEIDOU:
  235. g_upload_beidou_Err += send_buf->n_logs;
  236. break;
  237. case LOG_TYPE_USER_BUSINESS:
  238. g_upload_BussinessUser_Err += send_buf->n_logs;
  239. break;
  240. case LOG_TYPE_SYS_BUSINESS:
  241. g_upload_BussinessSys_Err += send_buf->n_logs;
  242. break;
  243. case LOG_TYPE_WEBSDK:
  244. g_upload_vtmsdk_Err += send_buf->n_logs;
  245. break;
  246. default:
  247. break;
  248. }
  249. }
  250. sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
  251. post_log_result_destroy(rst);
  252. // tmp buffer, free
  253. if (send_buf != send_param->log_buf)
  254. {
  255. free(send_buf);
  256. }
  257. if (sleepMs <= 0)
  258. {
  259. break;
  260. }
  261. i =0;
  262. for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
  263. {
  264. #ifdef WIN32
  265. Sleep(SEND_SLEEP_INTERVAL_MS);
  266. #else
  267. usleep(SEND_SLEEP_INTERVAL_MS * 1000);
  268. #endif
  269. if (producer_manager->shutdown || producer_manager->networkRecover)
  270. {
  271. break;
  272. }
  273. }
  274. if (producer_manager->networkRecover)
  275. {
  276. producer_manager->networkRecover = 0;
  277. }
  278. }while(1);
  279. // at last, free all buffer
  280. free_lz4_log_buf(send_param->log_buf);
  281. free(send_param);
  282. return NULL;
  283. }
  284. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
  285. {
  286. log_producer_send_result send_result = AosStatusToResult(result);
  287. log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
  288. if (producer_manager->send_done_function != NULL)
  289. {
  290. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  291. LOG_PRODUCER_OK :
  292. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  293. producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
  294. }
  295. if (producer_manager->uuid_send_done_function != NULL)
  296. {
  297. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  298. LOG_PRODUCER_OK :
  299. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  300. producer_manager->uuid_send_done_function(callback_result,
  301. send_param->log_buf->raw_length,
  302. send_param->log_buf->length,
  303. result->requestID,
  304. result->errorMessage,
  305. send_param->log_buf->data,
  306. producer_manager->uuid_user_param,
  307. send_param->log_buf->n_logs,
  308. send_param->log_buf->uuid,
  309. send_param->log_buf->modular);
  310. }
  311. if (send_result == LOG_SEND_UNAUTHORIZED)
  312. {
  313. // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
  314. send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
  315. }
  316. switch (send_result)
  317. {
  318. case LOG_SEND_OK:
  319. break;
  320. case LOG_SEND_TIME_ERROR:
  321. // if no this marco, drop data
  322. #ifdef SEND_TIME_INVALID_FIX
  323. error_info->last_send_error = LOG_SEND_TIME_ERROR;
  324. error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
  325. return error_info->last_sleep_ms;
  326. #else
  327. break;
  328. #endif
  329. case LOG_SEND_QUOTA_EXCEED:
  330. if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
  331. {
  332. error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
  333. error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
  334. error_info->first_error_time = time(NULL);
  335. }
  336. else
  337. {
  338. if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
  339. {
  340. error_info->last_sleep_ms *= 2;
  341. }
  342. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  343. {
  344. break;
  345. }
  346. }
  347. aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  348. (int)send_param->log_buf->length,
  349. (int)send_param->log_buf->raw_length,
  350. result->statusCode,
  351. result->errorMessage));
  352. return error_info->last_sleep_ms;
  353. case LOG_SEND_SERVER_ERROR :
  354. case LOG_SEND_NETWORK_ERROR:
  355. if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
  356. {
  357. error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
  358. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  359. error_info->first_error_time = time(NULL);
  360. }
  361. else
  362. {
  363. if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
  364. {
  365. error_info->last_sleep_ms *= 2;
  366. }
  367. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  368. {
  369. break;
  370. }
  371. }
  372. aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  373. (int)send_param->log_buf->length,
  374. (int)send_param->log_buf->raw_length,
  375. result->statusCode,
  376. result->errorMessage));
  377. return error_info->last_sleep_ms;
  378. default:
  379. // discard data
  380. break;
  381. }
  382. // always try once when discard error
  383. if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
  384. {
  385. error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
  386. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  387. error_info->first_error_time = time(NULL);
  388. aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  389. (int)send_param->log_buf->length,
  390. (int)send_param->log_buf->raw_length,
  391. (int)producer_manager->totalBufferSize,
  392. result->statusCode,
  393. result->errorMessage));
  394. return BASE_NETWORK_ERROR_SLEEP_MS;
  395. }
  396. CS_ENTER(producer_manager->lock);
  397. producer_manager->totalBufferSize -= send_param->log_buf->length;
  398. CS_LEAVE(producer_manager->lock);
  399. if (send_result == LOG_SEND_OK)
  400. {
  401. aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  402. (int)send_param->log_buf->length,
  403. (int)send_param->log_buf->raw_length,
  404. (int)producer_manager->totalBufferSize,
  405. result->statusCode,
  406. result->errorMessage));
  407. }
  408. else
  409. {
  410. aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  411. (int)send_param->log_buf->length,
  412. (int)send_param->log_buf->raw_length,
  413. (int)producer_manager->totalBufferSize,
  414. result->statusCode,
  415. result->errorMessage));
  416. if (producer_manager->send_done_function != NULL)
  417. {
  418. producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
  419. send_param->log_buf->raw_length,
  420. send_param->log_buf->length,
  421. result->requestID,
  422. result->errorMessage,
  423. send_param->log_buf->data,
  424. producer_manager->user_param);
  425. }
  426. if (producer_manager->uuid_send_done_function != NULL)
  427. {
  428. producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
  429. send_param->log_buf->raw_length,
  430. send_param->log_buf->length,
  431. result->requestID,
  432. result->errorMessage,
  433. send_param->log_buf->data,
  434. producer_manager->uuid_user_param,
  435. send_param->log_buf->n_logs,
  436. send_param->log_buf->uuid,
  437. send_param->log_buf->modular);
  438. }
  439. }
  440. return 0;
  441. }
  442. log_producer_result log_producer_send_data(log_producer_send_param * send_param)
  443. {
  444. log_producer_send_fun(send_param);
  445. return LOG_PRODUCER_OK;
  446. }
  447. log_producer_send_result AosStatusToResult(post_log_result * result)
  448. {
  449. if (result->statusCode == LOG_HTTP_ERRPARAM)
  450. return LOG_SEND_OK;
  451. if (result->statusCode / 100 == 2)
  452. {
  453. return LOG_SEND_OK;
  454. }
  455. if (result->statusCode <= 0)
  456. {
  457. return LOG_SEND_NETWORK_ERROR;
  458. }
  459. if (result->statusCode >= 500)
  460. {
  461. return LOG_SEND_SERVER_ERROR;
  462. }
  463. if (result->statusCode == 403)
  464. {
  465. return LOG_SEND_QUOTA_EXCEED;
  466. }
  467. if (result->statusCode == 401 || result->statusCode == 404)
  468. {
  469. return LOG_SEND_UNAUTHORIZED;
  470. }
  471. if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
  472. {
  473. return LOG_SEND_TIME_ERROR;
  474. }
  475. return LOG_SEND_OK;
  476. }
  477. log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
  478. void * producer_manager,
  479. lz4_log_buf* log_buf,
  480. log_group_builder * builder)
  481. {
  482. log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
  483. param->producer_config = producer_config;
  484. param->producer_manager = producer_manager;
  485. param->log_buf = log_buf;
  486. param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
  487. if (builder != NULL)
  488. {
  489. param->builder_time = builder->builder_time;
  490. }
  491. else
  492. {
  493. param->builder_time = time(NULL);
  494. }
  495. return param;
  496. }