log_producer_sender.c 19 KB


  1. #include "log_producer_sender.h"
  2. #include "log_api.h"
  3. #include "log_producer_manager.h"
  4. #include "inner_log.h"
  5. #include "lz4.h"
  6. #include "sds.h"
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #ifdef WIN32
  10. #include <windows.h>
  11. #else
  12. #include <unistd.h>
  13. #include <sys/syscall.h>
  14. #endif
  15. long LOG_GET_TIME() { return time(NULL); }
  16. const char* LOGE_SERVER_BUSY = "ServerBusy";
  17. const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
  18. const char* LOGE_UNAUTHORIZED = "Unauthorized";
  19. const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
  20. const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
  21. const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
  22. #define SEND_SLEEP_INTERVAL_MS 50
  23. #define MAX_NETWORK_ERROR_SLEEP_MS 3000
  24. #define BASE_NETWORK_ERROR_SLEEP_MS 300
  25. #define MAX_QUOTA_ERROR_SLEEP_MS 10000
  26. #define BASE_QUOTA_ERROR_SLEEP_MS 500
  27. #define INVALID_TIME_TRY_INTERVAL 500
  28. #define DROP_FAIL_DATA_TIME_SECOND 86400
  29. ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
  30. #pragma data_seg("LOG_SEND_INFO")
  31. unsigned long g_upload_TerminalSys_Suc = 0;
  32. unsigned long g_upload_TerminalUser_Suc = 0;
  33. unsigned long g_upload_BussinessSys_Suc = 0;
  34. unsigned long g_upload_BussinessUser_Suc = 0;
  35. unsigned long g_upload_beidou_Suc = 0;
  36. unsigned long g_upload_TerminalSys_Err = 0;
  37. unsigned long g_upload_TerminalUser_Err = 0;
  38. unsigned long g_upload_BussinessSys_Err = 0;
  39. unsigned long g_upload_BussinessUser_Err = 0;
  40. unsigned long g_upload_beidou_Err = 0;
  41. #pragma data_seg()
  42. #pragma comment(linker,"/section:LOG_SEND_INFO,rws")
  43. //#define SEND_TIME_INVALID_FIX
  44. typedef struct _send_error_info
  45. {
  46. log_producer_send_result last_send_error;
  47. int32_t last_sleep_ms;
  48. int32_t first_error_time;
  49. }send_error_info;
  50. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
  51. #ifdef SEND_TIME_INVALID_FIX
  52. void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
  53. {
  54. char* buf;
  55. long nowTime;
  56. int compress_bound;
  57. char* compress_data;
  58. int compressed_size;
  59. aos_debug_log("rebuild log.");
  60. buf = (char *)malloc(lz4_buf->raw_length);
  61. if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
  62. {
  63. free(buf);
  64. aos_fatal_log("LZ4_decompress_safe error");
  65. return;
  66. }
  67. nowTime = LOG_GET_TIME();
  68. //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
  69. compress_bound = LZ4_compressBound(lz4_buf->raw_length);
  70. compress_data = (char *)malloc(compress_bound);
  71. compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
  72. if(compressed_size <= 0)
  73. {
  74. aos_fatal_log("LZ4_compress_default error");
  75. free(buf);
  76. free(compress_data);
  77. return;
  78. }
  79. *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
  80. (*new_lz4_buf)->length = compressed_size;
  81. (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
  82. memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
  83. free(buf);
  84. free(compress_data);
  85. return;
  86. }
  87. #endif
  88. #ifdef WIN32
  89. DWORD WINAPI log_producer_send_thread(LPVOID param)
  90. #else
  91. void * log_producer_send_thread(void * param)
  92. #endif
  93. {
  94. log_producer_manager * producer_manager = (log_producer_manager *)param;
  95. if (producer_manager->sender_data_queue == NULL)
  96. {
  97. return 0;
  98. }
  99. Sleep(producer_manager->producer_config->sendThreadWaitMs);
  100. while (!producer_manager->shutdown)
  101. {
  102. // change from 30ms to 1000ms, reduce wake up when app switch to back
  103. void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
  104. if (send_param != NULL)
  105. {
  106. int32_t begin;
  107. ATOMICINT_INC(&producer_manager->multi_thread_send_count);
  108. begin = time(NULL);
  109. log_producer_send_fun(send_param);
  110. aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
  111. ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
  112. }
  113. }
  114. return 0;
  115. }
  116. void * log_producer_send_fun(void * param)
  117. {
  118. log_producer_manager* producer_manager;
  119. log_producer_config* config;
  120. send_error_info error_info;
  121. log_producer_send_param * send_param = (log_producer_send_param *)param;
  122. if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
  123. {
  124. aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
  125. producer_manager = (log_producer_manager *)send_param->producer_manager;
  126. if (producer_manager && producer_manager->send_done_function != NULL)
  127. {
  128. producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
  129. NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
  130. }
  131. if (producer_manager && producer_manager->uuid_send_done_function != NULL)
  132. {
  133. producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
  134. send_param->log_buf->raw_length,
  135. send_param->log_buf->length,
  136. NULL,
  137. "invalid send param, magic num not found",
  138. send_param->log_buf->data,
  139. producer_manager->uuid_user_param,
  140. send_param->log_buf->n_logs,
  141. send_param->log_buf->uuid,
  142. send_param->log_buf->modular);
  143. }
  144. return NULL;
  145. }
  146. config = send_param->producer_config;
  147. memset(&error_info, 0, sizeof(error_info));
  148. producer_manager = (log_producer_manager *)send_param->producer_manager;
  149. do
  150. {
  151. lz4_log_buf* send_buf;
  152. log_post_option option;
  153. sds accessKeyId = NULL;
  154. sds accessKey = NULL;
  155. sds stsToken = NULL;
  156. post_log_result* rst;
  157. int32_t sleepMs;
  158. int i = 0;
  159. char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "";
  160. if (producer_manager->shutdown)
  161. {
  162. aos_info_log((LB, "send fail but shutdown signal received, force exit"));
  163. if (producer_manager->send_done_function != NULL)
  164. {
  165. producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
  166. NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
  167. }
  168. break;
  169. }
  170. send_buf = send_param->log_buf;
  171. #ifdef SEND_TIME_INVALID_FIX
  172. nowTime = LOG_GET_TIME();
  173. if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
  174. {
  175. _rebuild_time(send_param->log_buf, &send_buf);
  176. send_param->builder_time = nowTime;
  177. }
  178. #endif
  179. memset(&option, 0, sizeof(log_post_option));
  180. option.connect_timeout = config->connectTimeoutSec;
  181. option.operation_timeout = config->sendTimeoutSec;
  182. option.compress_type = config->compressType;
  183. option.using_https = config->using_https;
  184. option.ntp_time_offset = config->ntpTimeOffset;
  185. if(config->tokenFun != NULL)
  186. config->tokenFun(channelId, token);
  187. rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token); //通过http发送logs
  188. aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
  189. sdsfree(accessKeyId);
  190. sdsfree(accessKey);
  191. sdsfree(stsToken);
  192. if(rst->statusCode == 200)
  193. {
  194. switch(send_buf->type)
  195. {
  196. case LOG_TYPE_USER_SKYEYE:
  197. g_upload_TerminalUser_Suc += send_buf->n_logs;
  198. break;
  199. case LOG_TYPE_SYS_SKYEYE:
  200. g_upload_TerminalSys_Suc += send_buf->n_logs;
  201. break;
  202. case LOG_TYPE_BEIDOU:
  203. g_upload_beidou_Suc += send_buf->n_logs;
  204. break;
  205. case LOG_TYPE_USER_BUSINESS:
  206. g_upload_BussinessUser_Suc += send_buf->n_logs;
  207. break;
  208. case LOG_TYPE_SYS_BUSINESS:
  209. g_upload_BussinessSys_Suc += send_buf->n_logs;
  210. break;
  211. default:
  212. break;
  213. }
  214. }
  215. else
  216. {
  217. switch (send_buf->type)
  218. {
  219. case LOG_TYPE_USER_SKYEYE:
  220. g_upload_TerminalUser_Err += send_buf->n_logs;
  221. break;
  222. case LOG_TYPE_SYS_SKYEYE:
  223. g_upload_TerminalSys_Err += send_buf->n_logs;
  224. break;
  225. case LOG_TYPE_BEIDOU:
  226. g_upload_beidou_Err += send_buf->n_logs;
  227. break;
  228. case LOG_TYPE_USER_BUSINESS:
  229. g_upload_BussinessUser_Err += send_buf->n_logs;
  230. break;
  231. case LOG_TYPE_SYS_BUSINESS:
  232. g_upload_BussinessSys_Err += send_buf->n_logs;
  233. break;
  234. default:
  235. break;
  236. }
  237. }
  238. sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
  239. post_log_result_destroy(rst);
  240. // tmp buffer, free
  241. if (send_buf != send_param->log_buf)
  242. {
  243. free(send_buf);
  244. }
  245. if (sleepMs <= 0)
  246. {
  247. break;
  248. }
  249. i =0;
  250. for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
  251. {
  252. #ifdef WIN32
  253. Sleep(SEND_SLEEP_INTERVAL_MS);
  254. #else
  255. usleep(SEND_SLEEP_INTERVAL_MS * 1000);
  256. #endif
  257. if (producer_manager->shutdown || producer_manager->networkRecover)
  258. {
  259. break;
  260. }
  261. }
  262. if (producer_manager->networkRecover)
  263. {
  264. producer_manager->networkRecover = 0;
  265. }
  266. }while(1);
  267. // at last, free all buffer
  268. free_lz4_log_buf(send_param->log_buf);
  269. free(send_param);
  270. return NULL;
  271. }
  272. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
  273. {
  274. log_producer_send_result send_result = AosStatusToResult(result);
  275. log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
  276. if (producer_manager->send_done_function != NULL)
  277. {
  278. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  279. LOG_PRODUCER_OK :
  280. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  281. producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
  282. }
  283. if (producer_manager->uuid_send_done_function != NULL)
  284. {
  285. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  286. LOG_PRODUCER_OK :
  287. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  288. producer_manager->uuid_send_done_function(callback_result,
  289. send_param->log_buf->raw_length,
  290. send_param->log_buf->length,
  291. result->requestID,
  292. result->errorMessage,
  293. send_param->log_buf->data,
  294. producer_manager->uuid_user_param,
  295. send_param->log_buf->n_logs,
  296. send_param->log_buf->uuid,
  297. send_param->log_buf->modular);
  298. }
  299. if (send_result == LOG_SEND_UNAUTHORIZED)
  300. {
  301. // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
  302. send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
  303. }
  304. switch (send_result)
  305. {
  306. case LOG_SEND_OK:
  307. break;
  308. case LOG_SEND_TIME_ERROR:
  309. // if no this marco, drop data
  310. #ifdef SEND_TIME_INVALID_FIX
  311. error_info->last_send_error = LOG_SEND_TIME_ERROR;
  312. error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
  313. return error_info->last_sleep_ms;
  314. #else
  315. break;
  316. #endif
  317. case LOG_SEND_QUOTA_EXCEED:
  318. if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
  319. {
  320. error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
  321. error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
  322. error_info->first_error_time = time(NULL);
  323. }
  324. else
  325. {
  326. if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
  327. {
  328. error_info->last_sleep_ms *= 2;
  329. }
  330. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  331. {
  332. break;
  333. }
  334. }
  335. aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  336. (int)send_param->log_buf->length,
  337. (int)send_param->log_buf->raw_length,
  338. result->statusCode,
  339. result->errorMessage));
  340. return error_info->last_sleep_ms;
  341. case LOG_SEND_SERVER_ERROR :
  342. case LOG_SEND_NETWORK_ERROR:
  343. if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
  344. {
  345. error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
  346. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  347. error_info->first_error_time = time(NULL);
  348. }
  349. else
  350. {
  351. if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
  352. {
  353. error_info->last_sleep_ms *= 2;
  354. }
  355. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  356. {
  357. break;
  358. }
  359. }
  360. aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  361. (int)send_param->log_buf->length,
  362. (int)send_param->log_buf->raw_length,
  363. result->statusCode,
  364. result->errorMessage));
  365. return error_info->last_sleep_ms;
  366. default:
  367. // discard data
  368. break;
  369. }
  370. // always try once when discard error
  371. if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
  372. {
  373. error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
  374. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  375. error_info->first_error_time = time(NULL);
  376. aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  377. (int)send_param->log_buf->length,
  378. (int)send_param->log_buf->raw_length,
  379. (int)producer_manager->totalBufferSize,
  380. result->statusCode,
  381. result->errorMessage));
  382. return BASE_NETWORK_ERROR_SLEEP_MS;
  383. }
  384. CS_ENTER(producer_manager->lock);
  385. producer_manager->totalBufferSize -= send_param->log_buf->length;
  386. CS_LEAVE(producer_manager->lock);
  387. if (send_result == LOG_SEND_OK)
  388. {
  389. aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  390. (int)send_param->log_buf->length,
  391. (int)send_param->log_buf->raw_length,
  392. (int)producer_manager->totalBufferSize,
  393. result->statusCode,
  394. result->errorMessage));
  395. }
  396. else
  397. {
  398. aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  399. (int)send_param->log_buf->length,
  400. (int)send_param->log_buf->raw_length,
  401. (int)producer_manager->totalBufferSize,
  402. result->statusCode,
  403. result->errorMessage));
  404. if (producer_manager->send_done_function != NULL)
  405. {
  406. producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
  407. send_param->log_buf->raw_length,
  408. send_param->log_buf->length,
  409. result->requestID,
  410. result->errorMessage,
  411. send_param->log_buf->data,
  412. producer_manager->user_param);
  413. }
  414. if (producer_manager->uuid_send_done_function != NULL)
  415. {
  416. producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
  417. send_param->log_buf->raw_length,
  418. send_param->log_buf->length,
  419. result->requestID,
  420. result->errorMessage,
  421. send_param->log_buf->data,
  422. producer_manager->uuid_user_param,
  423. send_param->log_buf->n_logs,
  424. send_param->log_buf->uuid,
  425. send_param->log_buf->modular);
  426. }
  427. }
  428. return 0;
  429. }
  430. log_producer_result log_producer_send_data(log_producer_send_param * send_param)
  431. {
  432. log_producer_send_fun(send_param);
  433. return LOG_PRODUCER_OK;
  434. }
  435. log_producer_send_result AosStatusToResult(post_log_result * result)
  436. {
  437. if (result->statusCode == LOG_HTTP_ERRPARAM)
  438. return LOG_SEND_OK;
  439. if (result->statusCode / 100 == 2)
  440. {
  441. return LOG_SEND_OK;
  442. }
  443. if (result->statusCode <= 0)
  444. {
  445. return LOG_SEND_NETWORK_ERROR;
  446. }
  447. if (result->statusCode >= 500)
  448. {
  449. return LOG_SEND_SERVER_ERROR;
  450. }
  451. if (result->statusCode == 403)
  452. {
  453. return LOG_SEND_QUOTA_EXCEED;
  454. }
  455. if (result->statusCode == 401 || result->statusCode == 404)
  456. {
  457. return LOG_SEND_UNAUTHORIZED;
  458. }
  459. if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
  460. {
  461. return LOG_SEND_TIME_ERROR;
  462. }
  463. return LOG_SEND_OK;
  464. }
  465. log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
  466. void * producer_manager,
  467. lz4_log_buf* log_buf,
  468. log_group_builder * builder)
  469. {
  470. log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
  471. param->producer_config = producer_config;
  472. param->producer_manager = producer_manager;
  473. param->log_buf = log_buf;
  474. param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
  475. if (builder != NULL)
  476. {
  477. param->builder_time = builder->builder_time;
  478. }
  479. else
  480. {
  481. param->builder_time = time(NULL);
  482. }
  483. return param;
  484. }