should be done now, wrote a new batched logger for the last thing
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
#include <variant>
|
||||
#include "database_utils.hpp"
|
||||
namespace processing_utils {
|
||||
|
||||
constexpr std::size_t MAX_BATCH_ROWS = 500;
|
||||
bool parse_date_from_filename(std::string_view path,
|
||||
int& year, int& month, int& day)
|
||||
{
|
||||
@@ -204,10 +204,12 @@ parse_chat(std::string_view line)
|
||||
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
|
||||
current_line.clear();
|
||||
}
|
||||
void process_blocking(std::string path,
|
||||
void process_blocking(std::string path,
|
||||
std::shared_ptr<clickhouse::Client> client,
|
||||
std::string target_user_id)
|
||||
{
|
||||
constexpr uint64_t MAX_BATCH_ROWS = 500;
|
||||
|
||||
spdlog::info("Processing (blocking) file {}", path);
|
||||
|
||||
std::ifstream input_file(path);
|
||||
@@ -216,68 +218,123 @@ parse_chat(std::string_view line)
|
||||
return;
|
||||
}
|
||||
|
||||
// Seek to the end like tail -f
|
||||
// tail -f semantics
|
||||
input_file.seekg(0, std::ios::end);
|
||||
|
||||
std::string current_line;
|
||||
db_utils::ClientEventsBatch client_events{};
|
||||
db_utils::ChatEventsBatch chat_events{};
|
||||
db_utils::ActivityEventsBatch activity_events{};
|
||||
|
||||
db_utils::ClientEventsBatch client_events;
|
||||
db_utils::ChatEventsBatch chat_events;
|
||||
db_utils::ActivityEventsBatch activity_events;
|
||||
|
||||
uint64_t row_count = 0;
|
||||
|
||||
while (true) {
|
||||
// Read all available lines
|
||||
while (std::getline(input_file, current_line)) {
|
||||
if (current_line.empty()) continue;
|
||||
bool read_any = false;
|
||||
|
||||
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
|
||||
while (std::getline(input_file, current_line)) {
|
||||
read_any = true;
|
||||
if (current_line.empty())
|
||||
continue;
|
||||
|
||||
std::time_t ts = extract_timestamp_utc(path, current_line);
|
||||
if (ts == static_cast<std::time_t>(-1))
|
||||
continue;
|
||||
|
||||
switch (classify_line(current_line)) {
|
||||
|
||||
case Route::CLIENT_START:
|
||||
client_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"start"}, path);
|
||||
client_events.add_row(
|
||||
target_user_id, ts,
|
||||
std::variant<std::string, uint8_t>{"start"},
|
||||
path
|
||||
);
|
||||
++row_count;
|
||||
break;
|
||||
|
||||
case Route::CLIENT_END:
|
||||
client_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"end"}, path);
|
||||
client_events.add_row(
|
||||
target_user_id, ts,
|
||||
std::variant<std::string, uint8_t>{"end"},
|
||||
path
|
||||
);
|
||||
++row_count;
|
||||
break;
|
||||
|
||||
case Route::ACT_COMMAND:
|
||||
activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f);
|
||||
activity_events.add_row(
|
||||
target_user_id, ts,
|
||||
"command", 1.0f
|
||||
);
|
||||
++row_count;
|
||||
break;
|
||||
|
||||
case Route::ACT_DEATH:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"death"}, 0.5f);
|
||||
activity_events.add_row(
|
||||
target_user_id, ts,
|
||||
std::variant<std::string, uint8_t>{"death"},
|
||||
0.5f
|
||||
);
|
||||
++row_count;
|
||||
break;
|
||||
|
||||
case Route::ACT_DIMENSION:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"dimension"}, 0.3f);
|
||||
break;
|
||||
case Route::ACT_OTHER:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"other"}, 0.2f);
|
||||
activity_events.add_row(
|
||||
target_user_id, ts,
|
||||
std::variant<std::string, uint8_t>{"dimension"},
|
||||
0.3f
|
||||
);
|
||||
++row_count;
|
||||
break;
|
||||
|
||||
case Route::CHAT: {
|
||||
auto chat_pair = parse_chat(current_line);
|
||||
chat_events.add_row(target_user_id, log_timestamp,
|
||||
chat_pair.first, chat_pair.second);
|
||||
auto chat = parse_chat(current_line);
|
||||
if (!chat.second.empty()) {
|
||||
chat_events.add_row(
|
||||
target_user_id, ts,
|
||||
chat.first, chat.second
|
||||
);
|
||||
++row_count;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert accumulated batches into ClickHouse
|
||||
// HARD FLUSH CONDITION
|
||||
if (row_count >= MAX_BATCH_ROWS) {
|
||||
activity_events.insert_as_batch(client);
|
||||
chat_events.insert_as_batch(client);
|
||||
client_events.insert_as_batch(client);
|
||||
|
||||
current_line.clear();
|
||||
// Destroy and recreate — ONLY valid reset
|
||||
client_events = db_utils::ClientEventsBatch{};
|
||||
chat_events = db_utils::ChatEventsBatch{};
|
||||
activity_events = db_utils::ActivityEventsBatch{};
|
||||
|
||||
// Clear EOF state so we can continue reading
|
||||
input_file.clear();
|
||||
row_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep a bit before polling for new data
|
||||
// Flush partial batch if we read anything
|
||||
if (read_any && row_count > 0) {
|
||||
activity_events.insert_as_batch(client);
|
||||
chat_events.insert_as_batch(client);
|
||||
client_events.insert_as_batch(client);
|
||||
|
||||
client_events = db_utils::ClientEventsBatch{};
|
||||
chat_events = db_utils::ChatEventsBatch{};
|
||||
activity_events = db_utils::ActivityEventsBatch{};
|
||||
|
||||
row_count = 0;
|
||||
}
|
||||
|
||||
input_file.clear(); // clear EOF only
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user