Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ddb77e71e | ||
|
|
519f37e385 | ||
|
|
a80b1a2293 | ||
|
|
a4749501f4 | ||
|
|
7a70c0d6a9 |
@@ -8,7 +8,7 @@
|
|||||||
#include <variant>
|
#include <variant>
|
||||||
#include "database_utils.hpp"
|
#include "database_utils.hpp"
|
||||||
namespace processing_utils {
|
namespace processing_utils {
|
||||||
|
constexpr std::size_t MAX_BATCH_ROWS = 500;
|
||||||
bool parse_date_from_filename(std::string_view path,
|
bool parse_date_from_filename(std::string_view path,
|
||||||
int& year, int& month, int& day)
|
int& year, int& month, int& day)
|
||||||
{
|
{
|
||||||
@@ -31,17 +31,24 @@ bool parse_date_from_filename(std::string_view path,
|
|||||||
bool parse_time_from_line(std::string_view line,
|
bool parse_time_from_line(std::string_view line,
|
||||||
int& hour, int& min, int& sec)
|
int& hour, int& min, int& sec)
|
||||||
{
|
{
|
||||||
// Expect: [HH:MM:SS]
|
// Match [HH:MM:SS] anywhere in the line
|
||||||
if (line.size() < 10 || line[0] != '[')
|
static const std::regex re(
|
||||||
|
R"(\[(\d{2}):(\d{2}):(\d{2})\])",
|
||||||
|
std::regex::optimize
|
||||||
|
);
|
||||||
|
|
||||||
|
std::cmatch m;
|
||||||
|
if (!std::regex_search(line.begin(), line.end(), m, re))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
hour = (line[1] - '0') * 10 + (line[2] - '0');
|
hour = (m[1].str()[0] - '0') * 10 + (m[1].str()[1] - '0');
|
||||||
min = (line[4] - '0') * 10 + (line[5] - '0');
|
min = (m[2].str()[0] - '0') * 10 + (m[2].str()[1] - '0');
|
||||||
sec = (line[7] - '0') * 10 + (line[8] - '0');
|
sec = (m[3].str()[0] - '0') * 10 + (m[3].str()[1] - '0');
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
|
|
||||||
@@ -104,7 +111,7 @@ time_t extract_timestamp_utc(std::string_view filename,
|
|||||||
Route classify_line(std::string_view line) {
|
Route classify_line(std::string_view line) {
|
||||||
|
|
||||||
// ---- Client lifecycle ----
|
// ---- Client lifecycle ----
|
||||||
if (line.find("Starting Minecraft client") != std::string_view::npos)
|
if (line.find("Loading Minecraft") != std::string_view::npos)
|
||||||
return Route::CLIENT_START;
|
return Route::CLIENT_START;
|
||||||
|
|
||||||
if (line.find("Stopping!") != std::string_view::npos)
|
if (line.find("Stopping!") != std::string_view::npos)
|
||||||
@@ -128,22 +135,34 @@ time_t extract_timestamp_utc(std::string_view filename,
|
|||||||
return Route::NONE;
|
return Route::NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<std::string, std::string>
|
std::pair<std::string, std::string>
|
||||||
parse_chat(std::string_view line) {
|
parse_chat(std::string_view line)
|
||||||
// [CHAT] <Steve> hello world
|
{
|
||||||
|
constexpr std::string_view tag = "[CHAT]";
|
||||||
|
|
||||||
auto start = line.find("[CHAT] <");
|
auto tag_pos = line.find(tag);
|
||||||
start += 8;
|
if (tag_pos == std::string_view::npos)
|
||||||
|
return {"", ""};
|
||||||
|
|
||||||
auto end = line.find('>', start);
|
std::size_t pos = tag_pos + tag.size();
|
||||||
auto msg = end + 2;
|
if (pos >= line.size())
|
||||||
|
return {"", ""};
|
||||||
|
|
||||||
|
// Skip whitespace after [CHAT]
|
||||||
|
while (pos < line.size() && line[pos] == ' ')
|
||||||
|
++pos;
|
||||||
|
|
||||||
|
if (pos >= line.size())
|
||||||
|
return {"", ""};
|
||||||
|
|
||||||
|
// Entire remainder is "message"
|
||||||
return {
|
return {
|
||||||
std::string(line.substr(start, end - start)),
|
"", // sender unknown / system / to be inferred elsewhere
|
||||||
std::string(line.substr(msg))
|
std::string(line.substr(pos))
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){
|
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){
|
||||||
spdlog::info("Processing file {} ", path);
|
spdlog::info("Processing file {} ", path);
|
||||||
gzFile gz_file_log = gzopen(path.c_str(), "rb");
|
gzFile gz_file_log = gzopen(path.c_str(), "rb");
|
||||||
@@ -180,14 +199,17 @@ parse_chat(std::string_view line) {
|
|||||||
std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line);
|
std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line);
|
||||||
chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ;
|
chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ;
|
||||||
}
|
}
|
||||||
|
current_line.clear();
|
||||||
}
|
}
|
||||||
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
|
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
|
||||||
current_line.clear();
|
current_line.clear();
|
||||||
}
|
}
|
||||||
void process_blocking(std::string path,
|
void process_blocking(std::string path,
|
||||||
std::shared_ptr<clickhouse::Client> client,
|
std::shared_ptr<clickhouse::Client> client,
|
||||||
std::string target_user_id)
|
std::string target_user_id)
|
||||||
{
|
{
|
||||||
|
constexpr uint64_t MAX_BATCH_ROWS = 500;
|
||||||
|
|
||||||
spdlog::info("Processing (blocking) file {}", path);
|
spdlog::info("Processing (blocking) file {}", path);
|
||||||
|
|
||||||
std::ifstream input_file(path);
|
std::ifstream input_file(path);
|
||||||
@@ -196,68 +218,123 @@ parse_chat(std::string_view line) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek to the end like tail -f
|
// tail -f semantics
|
||||||
input_file.seekg(0, std::ios::end);
|
input_file.seekg(0, std::ios::end);
|
||||||
|
|
||||||
std::string current_line;
|
std::string current_line;
|
||||||
db_utils::ClientEventsBatch client_events{};
|
|
||||||
db_utils::ChatEventsBatch chat_events{};
|
db_utils::ClientEventsBatch client_events;
|
||||||
db_utils::ActivityEventsBatch activity_events{};
|
db_utils::ChatEventsBatch chat_events;
|
||||||
|
db_utils::ActivityEventsBatch activity_events;
|
||||||
|
|
||||||
|
uint64_t row_count = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
// Read all available lines
|
bool read_any = false;
|
||||||
while (std::getline(input_file, current_line)) {
|
|
||||||
if (current_line.empty()) continue;
|
|
||||||
|
|
||||||
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
|
while (std::getline(input_file, current_line)) {
|
||||||
|
read_any = true;
|
||||||
|
if (current_line.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
std::time_t ts = extract_timestamp_utc(path, current_line);
|
||||||
|
if (ts == static_cast<std::time_t>(-1))
|
||||||
|
continue;
|
||||||
|
|
||||||
switch (classify_line(current_line)) {
|
switch (classify_line(current_line)) {
|
||||||
|
|
||||||
case Route::CLIENT_START:
|
case Route::CLIENT_START:
|
||||||
client_events.add_row(target_user_id, log_timestamp,
|
client_events.add_row(
|
||||||
std::variant<std::string, uint8_t>{"start"}, path);
|
target_user_id, ts,
|
||||||
|
std::variant<std::string, uint8_t>{"start"},
|
||||||
|
path
|
||||||
|
);
|
||||||
|
++row_count;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Route::CLIENT_END:
|
case Route::CLIENT_END:
|
||||||
client_events.add_row(target_user_id, log_timestamp,
|
client_events.add_row(
|
||||||
std::variant<std::string, uint8_t>{"end"}, path);
|
target_user_id, ts,
|
||||||
|
std::variant<std::string, uint8_t>{"end"},
|
||||||
|
path
|
||||||
|
);
|
||||||
|
++row_count;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Route::ACT_COMMAND:
|
case Route::ACT_COMMAND:
|
||||||
activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f);
|
activity_events.add_row(
|
||||||
|
target_user_id, ts,
|
||||||
|
"command", 1.0f
|
||||||
|
);
|
||||||
|
++row_count;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Route::ACT_DEATH:
|
case Route::ACT_DEATH:
|
||||||
activity_events.add_row(target_user_id, log_timestamp,
|
activity_events.add_row(
|
||||||
std::variant<std::string, uint8_t>{"death"}, 0.5f);
|
target_user_id, ts,
|
||||||
|
std::variant<std::string, uint8_t>{"death"},
|
||||||
|
0.5f
|
||||||
|
);
|
||||||
|
++row_count;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Route::ACT_DIMENSION:
|
case Route::ACT_DIMENSION:
|
||||||
activity_events.add_row(target_user_id, log_timestamp,
|
activity_events.add_row(
|
||||||
std::variant<std::string, uint8_t>{"dimension"}, 0.3f);
|
target_user_id, ts,
|
||||||
break;
|
std::variant<std::string, uint8_t>{"dimension"},
|
||||||
case Route::ACT_OTHER:
|
0.3f
|
||||||
activity_events.add_row(target_user_id, log_timestamp,
|
);
|
||||||
std::variant<std::string, uint8_t>{"other"}, 0.2f);
|
++row_count;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Route::CHAT: {
|
case Route::CHAT: {
|
||||||
auto chat_pair = parse_chat(current_line);
|
auto chat = parse_chat(current_line);
|
||||||
chat_events.add_row(target_user_id, log_timestamp,
|
if (!chat.second.empty()) {
|
||||||
chat_pair.first, chat_pair.second);
|
chat_events.add_row(
|
||||||
|
target_user_id, ts,
|
||||||
|
chat.first, chat.second
|
||||||
|
);
|
||||||
|
++row_count;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HARD FLUSH CONDITION
|
||||||
|
if (row_count >= MAX_BATCH_ROWS) {
|
||||||
|
activity_events.insert_as_batch(client);
|
||||||
|
chat_events.insert_as_batch(client);
|
||||||
|
client_events.insert_as_batch(client);
|
||||||
|
|
||||||
|
// Destroy and recreate — ONLY valid reset
|
||||||
|
client_events = db_utils::ClientEventsBatch{};
|
||||||
|
chat_events = db_utils::ChatEventsBatch{};
|
||||||
|
activity_events = db_utils::ActivityEventsBatch{};
|
||||||
|
|
||||||
|
row_count = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert accumulated batches into ClickHouse
|
// Flush partial batch if we read anything
|
||||||
activity_events.insert_as_batch(client);
|
if (read_any && row_count > 0) {
|
||||||
chat_events.insert_as_batch(client);
|
activity_events.insert_as_batch(client);
|
||||||
client_events.insert_as_batch(client);
|
chat_events.insert_as_batch(client);
|
||||||
|
client_events.insert_as_batch(client);
|
||||||
|
|
||||||
current_line.clear();
|
client_events = db_utils::ClientEventsBatch{};
|
||||||
|
chat_events = db_utils::ChatEventsBatch{};
|
||||||
|
activity_events = db_utils::ActivityEventsBatch{};
|
||||||
|
|
||||||
// Clear EOF state so we can continue reading
|
row_count = 0;
|
||||||
input_file.clear();
|
}
|
||||||
|
|
||||||
// Sleep a bit before polling for new data
|
input_file.clear(); // clear EOF only
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user