log_producer_sender.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. #include "log_producer_sender.h"
  2. #include "log_api.h"
  3. #include "log_producer_manager.h"
  4. #include "inner_log.h"
  5. #include "lz4.h"
  6. #include "sds.h"
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #ifdef WIN32
  10. #include <windows.h>
  11. #else
  12. #include <unistd.h>
  13. #include <sys/syscall.h>
  14. #endif
  15. long LOG_GET_TIME() { return time(NULL); }
  16. const char* LOGE_SERVER_BUSY = "ServerBusy";
  17. const char* LOGE_INTERNAL_SERVER_ERROR = "InternalServerError";
  18. const char* LOGE_UNAUTHORIZED = "Unauthorized";
  19. const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
  20. const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";
  21. const char* LOGE_TIME_EXPIRED = "RequestTimeExpired";
  22. #define SEND_SLEEP_INTERVAL_MS 50
  23. #define MAX_NETWORK_ERROR_SLEEP_MS 3000
  24. #define BASE_NETWORK_ERROR_SLEEP_MS 300
  25. #define MAX_QUOTA_ERROR_SLEEP_MS 10000
  26. #define BASE_QUOTA_ERROR_SLEEP_MS 500
  27. #define INVALID_TIME_TRY_INTERVAL 500
  28. #define DROP_FAIL_DATA_TIME_SECOND 86400
  29. ///**TODO(Gifur@4/27/2023): 这套在Linux下行不通!! */
  30. unsigned long g_upload_TerminalSys_Suc = 0;
  31. unsigned long g_upload_TerminalUser_Suc = 0;
  32. unsigned long g_upload_BussinessSys_Suc = 0;
  33. unsigned long g_upload_BussinessUser_Suc = 0;
  34. unsigned long g_upload_beidou_Suc = 0;
  35. unsigned long g_upload_vtmsdk_Suc = 0;
  36. unsigned long g_upload_TerminalSys_Err = 0;
  37. unsigned long g_upload_TerminalUser_Err = 0;
  38. unsigned long g_upload_BussinessSys_Err = 0;
  39. unsigned long g_upload_BussinessUser_Err = 0;
  40. unsigned long g_upload_beidou_Err = 0;
  41. unsigned long g_upload_vtmsdk_Err = 0;
  42. unsigned long g_discardMsgNum_since_full = 0;
  43. unsigned long g_discardMsgNum_since_serverRet_RTI1002 = 0;
  44. unsigned long g_notUploadLogNum = 0;
  45. //#define SEND_TIME_INVALID_FIX
  46. typedef struct _send_error_info
  47. {
  48. log_producer_send_result last_send_error;
  49. int32_t last_sleep_ms;
  50. int32_t first_error_time;
  51. }send_error_info;
  52. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info);
  53. #ifdef SEND_TIME_INVALID_FIX
  54. void _rebuild_time(lz4_log_buf * lz4_buf, lz4_log_buf ** new_lz4_buf)
  55. {
  56. char* buf;
  57. long nowTime;
  58. int compress_bound;
  59. char* compress_data;
  60. int compressed_size;
  61. aos_debug_log("rebuild log.");
  62. buf = (char *)malloc(lz4_buf->raw_length);
  63. if (LZ4_decompress_safe((const char* )lz4_buf->data, buf, lz4_buf->length, lz4_buf->raw_length) <= 0)
  64. {
  65. free(buf);
  66. aos_fatal_log("LZ4_decompress_safe error");
  67. return;
  68. }
  69. nowTime = LOG_GET_TIME();
  70. //fix_log_group_time(buf, lz4_buf->raw_length, nowTime);
  71. compress_bound = LZ4_compressBound(lz4_buf->raw_length);
  72. compress_data = (char *)malloc(compress_bound);
  73. compressed_size = LZ4_compress_default((char *)buf, compress_data, lz4_buf->raw_length, compress_bound);
  74. if(compressed_size <= 0)
  75. {
  76. aos_fatal_log("LZ4_compress_default error");
  77. free(buf);
  78. free(compress_data);
  79. return;
  80. }
  81. *new_lz4_buf = (lz4_log_buf*)malloc(sizeof(lz4_log_buf) + compressed_size);
  82. (*new_lz4_buf)->length = compressed_size;
  83. (*new_lz4_buf)->raw_length = lz4_buf->raw_length;
  84. memcpy((*new_lz4_buf)->data, compress_data, compressed_size);
  85. free(buf);
  86. free(compress_data);
  87. return;
  88. }
  89. #endif
  90. #ifdef WIN32
  91. DWORD WINAPI log_producer_send_thread(LPVOID param)
  92. #else
  93. void * log_producer_send_thread(void * param)
  94. #endif
  95. {
  96. log_producer_manager * producer_manager = (log_producer_manager *)param;
  97. if (producer_manager->sender_data_queue == NULL)
  98. {
  99. return 0;
  100. }
  101. Sleep(producer_manager->producer_config->sendThreadWaitMs);
  102. while (!producer_manager->shutdown)
  103. {
  104. // change from 30ms to 1000ms, reduce wake up when app switch to back
  105. void * send_param = log_queue_pop(producer_manager->sender_data_queue, 1000);
  106. if (send_param != NULL)
  107. {
  108. int32_t begin;
  109. ATOMICINT_INC(&producer_manager->multi_thread_send_count);
  110. begin = time(NULL);
  111. log_producer_send_fun(send_param);
  112. aos_info_log((LB, "log_producer_send_fun cost %d", time(NULL) - begin));
  113. ATOMICINT_DEC(&producer_manager->multi_thread_send_count);
  114. }
  115. }
  116. return 0;
  117. }
  118. void send_log_data(const char* url, const char* body)
  119. {
  120. LOG_OS_TestLogPost(url, body);
  121. }
  122. void * log_producer_send_fun(void * param)
  123. {
  124. log_producer_manager* producer_manager;
  125. log_producer_config* config;
  126. send_error_info error_info;
  127. log_producer_send_param * send_param = (log_producer_send_param *)param;
  128. if (send_param->magic_num != LOG_PRODUCER_SEND_MAGIC_NUM) // magic num not right
  129. {
  130. aos_fatal_log((LB, "invalid send param, magic num not found, num 0x%x", send_param->magic_num));
  131. producer_manager = (log_producer_manager *)send_param->producer_manager;
  132. if (producer_manager && producer_manager->send_done_function != NULL)
  133. {
  134. producer_manager->send_done_function(LOG_PRODUCER_INVALID, send_param->log_buf->raw_length, send_param->log_buf->length,
  135. NULL, "invalid send param, magic num not found", send_param->log_buf->data, producer_manager->user_param);
  136. }
  137. if (producer_manager && producer_manager->uuid_send_done_function != NULL)
  138. {
  139. producer_manager->uuid_send_done_function(LOG_PRODUCER_INVALID,
  140. send_param->log_buf->raw_length,
  141. send_param->log_buf->length,
  142. NULL,
  143. "invalid send param, magic num not found",
  144. send_param->log_buf->data,
  145. producer_manager->uuid_user_param,
  146. send_param->log_buf->n_logs,
  147. send_param->log_buf->uuid,
  148. send_param->log_buf->modular);
  149. }
  150. return NULL;
  151. }
  152. config = send_param->producer_config;
  153. memset(&error_info, 0, sizeof(error_info));
  154. producer_manager = (log_producer_manager *)send_param->producer_manager;
  155. do
  156. {
  157. lz4_log_buf* send_buf;
  158. log_post_option option;
  159. sds accessKeyId = NULL;
  160. sds accessKey = NULL;
  161. sds stsToken = NULL;
  162. post_log_result* rst;
  163. int32_t sleepMs;
  164. int i = 0;
  165. char channelId[MAX_TOKEN_LEN] = "", token[MAX_TOKEN_LEN] = "", terminalno[MAX_TOKEN_LEN] = "", reserve1[MAX_TOKEN_LEN] = "";
  166. if (producer_manager->shutdown)
  167. {
  168. aos_info_log((LB, "send fail but shutdown signal received, force exit"));
  169. if (producer_manager->send_done_function != NULL)
  170. {
  171. producer_manager->send_done_function(LOG_PRODUCER_SEND_EXIT_BUFFERED, send_param->log_buf->raw_length, send_param->log_buf->length,
  172. NULL, "producer is being destroyed, producer has no time to send this buffer out", send_param->log_buf->data, producer_manager->user_param);
  173. }
  174. break;
  175. }
  176. send_buf = send_param->log_buf;
  177. #ifdef SEND_TIME_INVALID_FIX
  178. nowTime = LOG_GET_TIME();
  179. if (nowTime - send_param->builder_time > 600 || send_param->builder_time - nowTime > 600 || error_info.last_send_error == LOG_SEND_TIME_ERROR)
  180. {
  181. _rebuild_time(send_param->log_buf, &send_buf);
  182. send_param->builder_time = nowTime;
  183. }
  184. #endif
  185. memset(&option, 0, sizeof(log_post_option));
  186. option.connect_timeout = config->connectTimeoutSec;
  187. option.operation_timeout = config->sendTimeoutSec;
  188. option.compress_type = config->compressType;
  189. option.using_https = config->using_https;
  190. option.ntp_time_offset = config->ntpTimeOffset;
  191. if(config->tokenFun != NULL)
  192. config->tokenFun(channelId, token, terminalno, reserve1);
  193. rst = post_logs(config->endpoint, accessKeyId, accessKey, stsToken, send_buf, &option, channelId, token, terminalno, reserve1); //通过http发送logs
  194. aos_info_log((LB, "post_logs, type:%d, %s, result:%d", send_buf->type, config->endpoint, rst->statusCode));
  195. sdsfree(accessKeyId);
  196. sdsfree(accessKey);
  197. sdsfree(stsToken);
  198. if(rst->statusCode == 200)
  199. {
  200. switch(send_buf->type)
  201. {
  202. case LOG_TYPE_USER_SKYEYE:
  203. g_upload_TerminalUser_Suc += send_buf->n_logs;
  204. break;
  205. case LOG_TYPE_SYS_SKYEYE:
  206. g_upload_TerminalSys_Suc += send_buf->n_logs;
  207. break;
  208. case LOG_TYPE_BEIDOU:
  209. g_upload_beidou_Suc += send_buf->n_logs;
  210. break;
  211. case LOG_TYPE_USER_BUSINESS:
  212. g_upload_BussinessUser_Suc += send_buf->n_logs;
  213. break;
  214. case LOG_TYPE_SYS_BUSINESS:
  215. g_upload_BussinessSys_Suc += send_buf->n_logs;
  216. break;
  217. case LOG_TYPE_WEBSDK:
  218. g_upload_vtmsdk_Suc += send_buf->n_logs;
  219. break;
  220. default:
  221. break;
  222. }
  223. }
  224. else
  225. {
  226. switch (send_buf->type)
  227. {
  228. case LOG_TYPE_USER_SKYEYE:
  229. g_upload_TerminalUser_Err += send_buf->n_logs;
  230. break;
  231. case LOG_TYPE_SYS_SKYEYE:
  232. g_upload_TerminalSys_Err += send_buf->n_logs;
  233. break;
  234. case LOG_TYPE_BEIDOU:
  235. g_upload_beidou_Err += send_buf->n_logs;
  236. break;
  237. case LOG_TYPE_USER_BUSINESS:
  238. g_upload_BussinessUser_Err += send_buf->n_logs;
  239. break;
  240. case LOG_TYPE_SYS_BUSINESS:
  241. g_upload_BussinessSys_Err += send_buf->n_logs;
  242. break;
  243. case LOG_TYPE_WEBSDK:
  244. g_upload_vtmsdk_Err += send_buf->n_logs;
  245. break;
  246. default:
  247. break;
  248. }
  249. if(rst->statusCode == 300)
  250. g_discardMsgNum_since_serverRet_RTI1002 += send_buf->n_logs;
  251. }
  252. sleepMs = log_producer_on_send_done(send_param, rst, &error_info) / 2;//执行senddone,删除数据库中内容
  253. post_log_result_destroy(rst);
  254. // tmp buffer, free
  255. if (send_buf != send_param->log_buf)
  256. {
  257. free(send_buf);
  258. }
  259. if (sleepMs <= 0)
  260. {
  261. break;
  262. }
  263. i =0;
  264. for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
  265. {
  266. #ifdef WIN32
  267. Sleep(SEND_SLEEP_INTERVAL_MS);
  268. #else
  269. usleep(SEND_SLEEP_INTERVAL_MS * 1000);
  270. #endif
  271. if (producer_manager->shutdown || producer_manager->networkRecover)
  272. {
  273. break;
  274. }
  275. }
  276. if (producer_manager->networkRecover)
  277. {
  278. producer_manager->networkRecover = 0;
  279. }
  280. }while(1);
  281. // at last, free all buffer
  282. free_lz4_log_buf(send_param->log_buf);
  283. free(send_param);
  284. return NULL;
  285. }
  286. int32_t log_producer_on_send_done(log_producer_send_param * send_param, post_log_result * result, send_error_info * error_info)
  287. {
  288. log_producer_send_result send_result = AosStatusToResult(result);
  289. log_producer_manager * producer_manager = (log_producer_manager *)send_param->producer_manager;
  290. if (producer_manager->send_done_function != NULL)
  291. {
  292. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  293. LOG_PRODUCER_OK :
  294. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  295. producer_manager->send_done_function(callback_result, send_param->log_buf->raw_length, send_param->log_buf->length, result->requestID, result->errorMessage, send_param->log_buf->data, producer_manager->user_param);
  296. }
  297. if (producer_manager->uuid_send_done_function != NULL)
  298. {
  299. log_producer_result callback_result = send_result == LOG_SEND_OK ?
  300. LOG_PRODUCER_OK :
  301. (LOG_PRODUCER_SEND_NETWORK_ERROR + send_result - LOG_SEND_NETWORK_ERROR);
  302. producer_manager->uuid_send_done_function(callback_result,
  303. send_param->log_buf->raw_length,
  304. send_param->log_buf->length,
  305. result->requestID,
  306. result->errorMessage,
  307. send_param->log_buf->data,
  308. producer_manager->uuid_user_param,
  309. send_param->log_buf->n_logs,
  310. send_param->log_buf->uuid,
  311. send_param->log_buf->modular);
  312. }
  313. if (send_result == LOG_SEND_UNAUTHORIZED)
  314. {
  315. // if do not drop unauthorized log, change the code to LOG_PRODUCER_SEND_NETWORK_ERROR
  316. send_result = LOG_PRODUCER_SEND_NETWORK_ERROR;
  317. }
  318. switch (send_result)
  319. {
  320. case LOG_SEND_OK:
  321. break;
  322. case LOG_SEND_TIME_ERROR:
  323. // if no this marco, drop data
  324. #ifdef SEND_TIME_INVALID_FIX
  325. error_info->last_send_error = LOG_SEND_TIME_ERROR;
  326. error_info->last_sleep_ms = INVALID_TIME_TRY_INTERVAL;
  327. return error_info->last_sleep_ms;
  328. #else
  329. break;
  330. #endif
  331. case LOG_SEND_QUOTA_EXCEED:
  332. if (error_info->last_send_error != LOG_SEND_QUOTA_EXCEED)
  333. {
  334. error_info->last_send_error = LOG_SEND_QUOTA_EXCEED;
  335. error_info->last_sleep_ms = BASE_QUOTA_ERROR_SLEEP_MS;
  336. error_info->first_error_time = time(NULL);
  337. }
  338. else
  339. {
  340. if (error_info->last_sleep_ms < MAX_QUOTA_ERROR_SLEEP_MS)
  341. {
  342. error_info->last_sleep_ms *= 2;
  343. }
  344. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  345. {
  346. break;
  347. }
  348. }
  349. aos_warn_log((LB, "send quota error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  350. (int)send_param->log_buf->length,
  351. (int)send_param->log_buf->raw_length,
  352. result->statusCode,
  353. result->errorMessage));
  354. return error_info->last_sleep_ms;
  355. case LOG_SEND_SERVER_ERROR :
  356. case LOG_SEND_NETWORK_ERROR:
  357. if (error_info->last_send_error != LOG_SEND_NETWORK_ERROR)
  358. {
  359. error_info->last_send_error = LOG_SEND_NETWORK_ERROR;
  360. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  361. error_info->first_error_time = time(NULL);
  362. }
  363. else
  364. {
  365. if (error_info->last_sleep_ms < MAX_NETWORK_ERROR_SLEEP_MS)
  366. {
  367. error_info->last_sleep_ms *= 2;
  368. }
  369. if (time(NULL) - error_info->first_error_time > DROP_FAIL_DATA_TIME_SECOND)
  370. {
  371. break;
  372. }
  373. }
  374. aos_warn_log((LB, "send network error, buffer len : %d, raw len : %d, code : %d, error msg : %s",
  375. (int)send_param->log_buf->length,
  376. (int)send_param->log_buf->raw_length,
  377. result->statusCode,
  378. result->errorMessage));
  379. return error_info->last_sleep_ms;
  380. default:
  381. // discard data
  382. break;
  383. }
  384. // always try once when discard error
  385. if (LOG_SEND_OK != send_result && error_info->last_send_error == 0)
  386. {
  387. error_info->last_send_error = LOG_SEND_DISCARD_ERROR;
  388. error_info->last_sleep_ms = BASE_NETWORK_ERROR_SLEEP_MS;
  389. error_info->first_error_time = time(NULL);
  390. aos_warn_log((LB, "send fail, the error is discard data, retry once, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  391. (int)send_param->log_buf->length,
  392. (int)send_param->log_buf->raw_length,
  393. (int)producer_manager->totalBufferSize,
  394. result->statusCode,
  395. result->errorMessage));
  396. return BASE_NETWORK_ERROR_SLEEP_MS;
  397. }
  398. CS_ENTER(producer_manager->lock);
  399. producer_manager->totalBufferSize -= send_param->log_buf->length;
  400. CS_LEAVE(producer_manager->lock);
  401. if (send_result == LOG_SEND_OK)
  402. {
  403. aos_debug_log((LB, "send success, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  404. (int)send_param->log_buf->length,
  405. (int)send_param->log_buf->raw_length,
  406. (int)producer_manager->totalBufferSize,
  407. result->statusCode,
  408. result->errorMessage));
  409. }
  410. else
  411. {
  412. aos_warn_log((LB, "send fail, discard data, buffer len : %d, raw len : %d, total buffer : %d,code : %d, error msg : %s",
  413. (int)send_param->log_buf->length,
  414. (int)send_param->log_buf->raw_length,
  415. (int)producer_manager->totalBufferSize,
  416. result->statusCode,
  417. result->errorMessage));
  418. if (producer_manager->send_done_function != NULL)
  419. {
  420. producer_manager->send_done_function(LOG_PRODUCER_DROP_ERROR,
  421. send_param->log_buf->raw_length,
  422. send_param->log_buf->length,
  423. result->requestID,
  424. result->errorMessage,
  425. send_param->log_buf->data,
  426. producer_manager->user_param);
  427. }
  428. if (producer_manager->uuid_send_done_function != NULL)
  429. {
  430. producer_manager->uuid_send_done_function(LOG_PRODUCER_DROP_ERROR,
  431. send_param->log_buf->raw_length,
  432. send_param->log_buf->length,
  433. result->requestID,
  434. result->errorMessage,
  435. send_param->log_buf->data,
  436. producer_manager->uuid_user_param,
  437. send_param->log_buf->n_logs,
  438. send_param->log_buf->uuid,
  439. send_param->log_buf->modular);
  440. }
  441. }
  442. return 0;
  443. }
  444. log_producer_result log_producer_send_data(log_producer_send_param * send_param)
  445. {
  446. log_producer_send_fun(send_param);
  447. return LOG_PRODUCER_OK;
  448. }
  449. log_producer_send_result AosStatusToResult(post_log_result * result)
  450. {
  451. if (result->statusCode == LOG_HTTP_ERRPARAM)
  452. return LOG_SEND_OK;
  453. if (result->statusCode / 100 == 2)
  454. {
  455. return LOG_SEND_OK;
  456. }
  457. if (result->statusCode <= 0)
  458. {
  459. return LOG_SEND_NETWORK_ERROR;
  460. }
  461. if (result->statusCode >= 500)
  462. {
  463. return LOG_SEND_SERVER_ERROR;
  464. }
  465. if (result->statusCode == 403)
  466. {
  467. return LOG_SEND_QUOTA_EXCEED;
  468. }
  469. if (result->statusCode == 401 || result->statusCode == 404)
  470. {
  471. return LOG_SEND_UNAUTHORIZED;
  472. }
  473. if (result->errorMessage != NULL && strstr(result->errorMessage, LOGE_TIME_EXPIRED) != NULL)
  474. {
  475. return LOG_SEND_TIME_ERROR;
  476. }
  477. return LOG_SEND_OK;
  478. }
  479. log_producer_send_param * create_log_producer_send_param(log_producer_config * producer_config,
  480. void * producer_manager,
  481. lz4_log_buf* log_buf,
  482. log_group_builder * builder)
  483. {
  484. log_producer_send_param * param = (log_producer_send_param *)malloc(sizeof(log_producer_send_param));
  485. param->producer_config = producer_config;
  486. param->producer_manager = producer_manager;
  487. param->log_buf = log_buf;
  488. param->magic_num = LOG_PRODUCER_SEND_MAGIC_NUM;
  489. if (builder != NULL)
  490. {
  491. param->builder_time = builder->builder_time;
  492. }
  493. else
  494. {
  495. param->builder_time = time(NULL);
  496. }
  497. return param;
  498. }