Compare commits

5 Commits
v0.0.1 ... main

Author SHA1 Message Date
Mars Ultor
4ddb77e71e start signal 2025-12-24 20:49:59 -06:00
Mars Ultor
519f37e385 should be done now, wrote a new batched logger for the last thing 2025-12-24 20:11:34 -06:00
Mars Ultor
a80b1a2293 moved to regex in parse time from line 2025-12-24 20:04:14 -06:00
Mars Ultor
a4749501f4 UB fixed in parse_chat 2025-12-24 20:00:30 -06:00
Mars Ultor
7a70c0d6a9 shoudl work without dumpign all the time 2025-12-24 19:55:02 -06:00

View File

@@ -8,7 +8,7 @@
#include <variant> #include <variant>
#include "database_utils.hpp" #include "database_utils.hpp"
namespace processing_utils { namespace processing_utils {
constexpr std::size_t MAX_BATCH_ROWS = 500;
bool parse_date_from_filename(std::string_view path, bool parse_date_from_filename(std::string_view path,
int& year, int& month, int& day) int& year, int& month, int& day)
{ {
@@ -31,17 +31,24 @@ bool parse_date_from_filename(std::string_view path,
bool parse_time_from_line(std::string_view line, bool parse_time_from_line(std::string_view line,
int& hour, int& min, int& sec) int& hour, int& min, int& sec)
{ {
// Expect: [HH:MM:SS] // Match [HH:MM:SS] anywhere in the line
if (line.size() < 10 || line[0] != '[') static const std::regex re(
R"(\[(\d{2}):(\d{2}):(\d{2})\])",
std::regex::optimize
);
std::cmatch m;
if (!std::regex_search(line.begin(), line.end(), m, re))
return false; return false;
hour = (line[1] - '0') * 10 + (line[2] - '0'); hour = (m[1].str()[0] - '0') * 10 + (m[1].str()[1] - '0');
min = (line[4] - '0') * 10 + (line[5] - '0'); min = (m[2].str()[0] - '0') * 10 + (m[2].str()[1] - '0');
sec = (line[7] - '0') * 10 + (line[8] - '0'); sec = (m[3].str()[0] - '0') * 10 + (m[3].str()[1] - '0');
return true; return true;
} }
#include <ctime> #include <ctime>
#include <string_view> #include <string_view>
@@ -104,7 +111,7 @@ time_t extract_timestamp_utc(std::string_view filename,
Route classify_line(std::string_view line) { Route classify_line(std::string_view line) {
// ---- Client lifecycle ---- // ---- Client lifecycle ----
if (line.find("Starting Minecraft client") != std::string_view::npos) if (line.find("Loading Minecraft") != std::string_view::npos)
return Route::CLIENT_START; return Route::CLIENT_START;
if (line.find("Stopping!") != std::string_view::npos) if (line.find("Stopping!") != std::string_view::npos)
@@ -128,22 +135,34 @@ time_t extract_timestamp_utc(std::string_view filename,
return Route::NONE; return Route::NONE;
} }
std::pair<std::string, std::string> std::pair<std::string, std::string>
parse_chat(std::string_view line) { parse_chat(std::string_view line)
// [CHAT] <Steve> hello world {
constexpr std::string_view tag = "[CHAT]";
auto start = line.find("[CHAT] <"); auto tag_pos = line.find(tag);
start += 8; if (tag_pos == std::string_view::npos)
return {"", ""};
auto end = line.find('>', start); std::size_t pos = tag_pos + tag.size();
auto msg = end + 2; if (pos >= line.size())
return {"", ""};
// Skip whitespace after [CHAT]
while (pos < line.size() && line[pos] == ' ')
++pos;
if (pos >= line.size())
return {"", ""};
// Entire remainder is "message"
return { return {
std::string(line.substr(start, end - start)), "", // sender unknown / system / to be inferred elsewhere
std::string(line.substr(msg)) std::string(line.substr(pos))
}; };
} }
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){ void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){
spdlog::info("Processing file {} ", path); spdlog::info("Processing file {} ", path);
gzFile gz_file_log = gzopen(path.c_str(), "rb"); gzFile gz_file_log = gzopen(path.c_str(), "rb");
@@ -180,14 +199,17 @@ parse_chat(std::string_view line) {
std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line); std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line);
chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ; chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ;
} }
current_line.clear();
} }
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client); activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
current_line.clear(); current_line.clear();
} }
void process_blocking(std::string path, void process_blocking(std::string path,
std::shared_ptr<clickhouse::Client> client, std::shared_ptr<clickhouse::Client> client,
std::string target_user_id) std::string target_user_id)
{ {
constexpr uint64_t MAX_BATCH_ROWS = 500;
spdlog::info("Processing (blocking) file {}", path); spdlog::info("Processing (blocking) file {}", path);
std::ifstream input_file(path); std::ifstream input_file(path);
@@ -196,68 +218,123 @@ parse_chat(std::string_view line) {
return; return;
} }
// Seek to the end like tail -f // tail -f semantics
input_file.seekg(0, std::ios::end); input_file.seekg(0, std::ios::end);
std::string current_line; std::string current_line;
db_utils::ClientEventsBatch client_events{};
db_utils::ChatEventsBatch chat_events{}; db_utils::ClientEventsBatch client_events;
db_utils::ActivityEventsBatch activity_events{}; db_utils::ChatEventsBatch chat_events;
db_utils::ActivityEventsBatch activity_events;
uint64_t row_count = 0;
while (true) { while (true) {
// Read all available lines bool read_any = false;
while (std::getline(input_file, current_line)) {
if (current_line.empty()) continue;
std::time_t log_timestamp = extract_timestamp_utc(path, current_line); while (std::getline(input_file, current_line)) {
read_any = true;
if (current_line.empty())
continue;
std::time_t ts = extract_timestamp_utc(path, current_line);
if (ts == static_cast<std::time_t>(-1))
continue;
switch (classify_line(current_line)) { switch (classify_line(current_line)) {
case Route::CLIENT_START: case Route::CLIENT_START:
client_events.add_row(target_user_id, log_timestamp, client_events.add_row(
std::variant<std::string, uint8_t>{"start"}, path); target_user_id, ts,
std::variant<std::string, uint8_t>{"start"},
path
);
++row_count;
break; break;
case Route::CLIENT_END: case Route::CLIENT_END:
client_events.add_row(target_user_id, log_timestamp, client_events.add_row(
std::variant<std::string, uint8_t>{"end"}, path); target_user_id, ts,
std::variant<std::string, uint8_t>{"end"},
path
);
++row_count;
break; break;
case Route::ACT_COMMAND: case Route::ACT_COMMAND:
activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f); activity_events.add_row(
target_user_id, ts,
"command", 1.0f
);
++row_count;
break; break;
case Route::ACT_DEATH: case Route::ACT_DEATH:
activity_events.add_row(target_user_id, log_timestamp, activity_events.add_row(
std::variant<std::string, uint8_t>{"death"}, 0.5f); target_user_id, ts,
std::variant<std::string, uint8_t>{"death"},
0.5f
);
++row_count;
break; break;
case Route::ACT_DIMENSION: case Route::ACT_DIMENSION:
activity_events.add_row(target_user_id, log_timestamp, activity_events.add_row(
std::variant<std::string, uint8_t>{"dimension"}, 0.3f); target_user_id, ts,
break; std::variant<std::string, uint8_t>{"dimension"},
case Route::ACT_OTHER: 0.3f
activity_events.add_row(target_user_id, log_timestamp, );
std::variant<std::string, uint8_t>{"other"}, 0.2f); ++row_count;
break; break;
case Route::CHAT: { case Route::CHAT: {
auto chat_pair = parse_chat(current_line); auto chat = parse_chat(current_line);
chat_events.add_row(target_user_id, log_timestamp, if (!chat.second.empty()) {
chat_pair.first, chat_pair.second); chat_events.add_row(
target_user_id, ts,
chat.first, chat.second
);
++row_count;
}
break; break;
} }
default: default:
break; break;
} }
}
// Insert accumulated batches into ClickHouse // HARD FLUSH CONDITION
if (row_count >= MAX_BATCH_ROWS) {
activity_events.insert_as_batch(client); activity_events.insert_as_batch(client);
chat_events.insert_as_batch(client); chat_events.insert_as_batch(client);
client_events.insert_as_batch(client); client_events.insert_as_batch(client);
current_line.clear(); // Destroy and recreate — ONLY valid reset
client_events = db_utils::ClientEventsBatch{};
chat_events = db_utils::ChatEventsBatch{};
activity_events = db_utils::ActivityEventsBatch{};
// Clear EOF state so we can continue reading row_count = 0;
input_file.clear(); }
}
// Sleep a bit before polling for new data // Flush partial batch if we read anything
if (read_any && row_count > 0) {
activity_events.insert_as_batch(client);
chat_events.insert_as_batch(client);
client_events.insert_as_batch(client);
client_events = db_utils::ClientEventsBatch{};
chat_events = db_utils::ChatEventsBatch{};
activity_events = db_utils::ActivityEventsBatch{};
row_count = 0;
}
input_file.clear(); // clear EOF only
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
} }
} }
} }