diff --git a/src/processing_utils.cpp b/src/processing_utils.cpp index f005ccd..ea0e3d5 100644 --- a/src/processing_utils.cpp +++ b/src/processing_utils.cpp @@ -8,7 +8,7 @@ #include #include "database_utils.hpp" namespace processing_utils { - + constexpr std::size_t MAX_BATCH_ROWS = 500; bool parse_date_from_filename(std::string_view path, int& year, int& month, int& day) { @@ -204,10 +204,12 @@ parse_chat(std::string_view line) activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client); current_line.clear(); } - void process_blocking(std::string path, +void process_blocking(std::string path, std::shared_ptr client, std::string target_user_id) { + constexpr uint64_t MAX_BATCH_ROWS = 500; + spdlog::info("Processing (blocking) file {}", path); std::ifstream input_file(path); @@ -216,68 +218,123 @@ parse_chat(std::string_view line) return; } - // Seek to the end like tail -f + // tail -f semantics input_file.seekg(0, std::ios::end); std::string current_line; - db_utils::ClientEventsBatch client_events{}; - db_utils::ChatEventsBatch chat_events{}; - db_utils::ActivityEventsBatch activity_events{}; + + db_utils::ClientEventsBatch client_events; + db_utils::ChatEventsBatch chat_events; + db_utils::ActivityEventsBatch activity_events; + + uint64_t row_count = 0; while (true) { - // Read all available lines - while (std::getline(input_file, current_line)) { - if (current_line.empty()) continue; + bool read_any = false; - std::time_t log_timestamp = extract_timestamp_utc(path, current_line); + while (std::getline(input_file, current_line)) { + read_any = true; + if (current_line.empty()) + continue; + + std::time_t ts = extract_timestamp_utc(path, current_line); + if (ts == static_cast(-1)) + continue; switch (classify_line(current_line)) { + case Route::CLIENT_START: - client_events.add_row(target_user_id, log_timestamp, - std::variant{"start"}, path); + client_events.add_row( + target_user_id, ts, + std::variant{"start"}, + path + ); + ++row_count; break; + case Route::CLIENT_END: - client_events.add_row(target_user_id, log_timestamp, - std::variant{"end"}, path); + client_events.add_row( + target_user_id, ts, + std::variant{"end"}, + path + ); + ++row_count; break; + case Route::ACT_COMMAND: - activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f); + activity_events.add_row( + target_user_id, ts, + "command", 1.0f + ); + ++row_count; break; + case Route::ACT_DEATH: - activity_events.add_row(target_user_id, log_timestamp, - std::variant{"death"}, 0.5f); + activity_events.add_row( + target_user_id, ts, + std::variant{"death"}, + 0.5f + ); + ++row_count; break; + case Route::ACT_DIMENSION: - activity_events.add_row(target_user_id, log_timestamp, - std::variant{"dimension"}, 0.3f); - break; - case Route::ACT_OTHER: - activity_events.add_row(target_user_id, log_timestamp, - std::variant{"other"}, 0.2f); + activity_events.add_row( + target_user_id, ts, + std::variant{"dimension"}, + 0.3f + ); + ++row_count; break; + case Route::CHAT: { - auto chat_pair = parse_chat(current_line); - chat_events.add_row(target_user_id, log_timestamp, - chat_pair.first, chat_pair.second); + auto chat = parse_chat(current_line); + if (!chat.second.empty()) { + chat_events.add_row( + target_user_id, ts, + chat.first, chat.second + ); + ++row_count; + } break; } + default: break; } + + // HARD FLUSH CONDITION + if (row_count >= MAX_BATCH_ROWS) { + activity_events.insert_as_batch(client); + chat_events.insert_as_batch(client); + client_events.insert_as_batch(client); + + // Destroy and recreate — ONLY valid reset + client_events = db_utils::ClientEventsBatch{}; + chat_events = db_utils::ChatEventsBatch{}; + activity_events = db_utils::ActivityEventsBatch{}; + + row_count = 0; + } } - // Insert accumulated batches into ClickHouse - activity_events.insert_as_batch(client); - chat_events.insert_as_batch(client); - client_events.insert_as_batch(client); + // Flush partial batch if we read anything + if (read_any && row_count > 0) { + activity_events.insert_as_batch(client); + chat_events.insert_as_batch(client); + client_events.insert_as_batch(client); - current_line.clear(); + client_events = db_utils::ClientEventsBatch{}; + chat_events = db_utils::ChatEventsBatch{}; + activity_events = db_utils::ActivityEventsBatch{}; - // Clear EOF state so we can continue reading - input_file.clear(); + row_count = 0; + } - // Sleep a bit before polling for new data + input_file.clear(); // clear EOF only std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } + + }