From 1b6cf3d37ecc69959497ac5c1d399cd14624368a Mon Sep 17 00:00:00 2001 From: Mars Ultor Date: Wed, 24 Dec 2025 19:41:07 -0600 Subject: [PATCH] nvim session file and zlib added --- src/main.cpp | 34 +++++ src/processing_utils.cpp | 266 +++++++++++++++++++++++++++++++++++++-- src/processing_utils.hpp | 4 +- 3 files changed, 295 insertions(+), 9 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 5c43321..e0096c3 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,9 +1,41 @@ #include "clickhouse/client.h" #include "spdlog/spdlog.h" +#include #include #include #include #include +#include "processing_utils.hpp" + +void process_directory(const std::string& dir_path, + std::shared_ptr client, + const std::string& target_user_id, + bool all) +{ + + if (all) { + for (auto& entry : std::filesystem::directory_iterator(dir_path)) { + if (!entry.is_regular_file()) continue; + + std::string filename = entry.path().filename().string(); + + if (filename == "latest.log") continue; + + if (filename.ends_with(".log") || filename.ends_with(".log.gz")) { + processing_utils::process_to_eof(entry.path().string(), client, target_user_id); + } + } + } + + // Second pass: always follow latest.log + std::filesystem::path latest_path = std::filesystem::path(dir_path) / "latest.log"; + if (std::filesystem::exists(latest_path) && std::filesystem::is_regular_file(latest_path)) { + processing_utils::process_blocking(latest_path.string(), client, target_user_id); + } else { + spdlog::warn("latest.log not found in {}", dir_path); + } +} + int main (int argc, char *argv[]) { auto config = toml::parse_file("config.toml"); @@ -22,5 +54,7 @@ int main (int argc, char *argv[]) { std::shared_ptr shared_client = std::make_shared(clickhouse::ClientOptions().SetPassword(clickouse_password).SetDefaultDatabase(clickhouse_db).SetUser(clickhouse_username).SetHost(clickhouse_host).SetPort(clickhouse_port)); spdlog::info("Server connection: {}", shared_client->GetServerInfo().display_name); + spdlog::info("runnign on given path: {}", log_path); + process_directory(log_path, shared_client, user_id, ingest_all_logs); return 0; } diff --git a/src/processing_utils.cpp b/src/processing_utils.cpp index 0dfb889..bdc8483 100644 --- a/src/processing_utils.cpp +++ b/src/processing_utils.cpp @@ -1,11 +1,263 @@ #include "processing_utils.hpp" #include "spdlog/spdlog.h" - +#include "zlib.h" +#include +#include +#include +#include +#include +#include "database_utils.hpp" namespace processing_utils { - void process_to_eof(std::string path, std::shared_ptrclient, bool compressed){ - spdlog::info("Processing file {} ", path); - } - void process_blocking(std::string path, std::shared_ptr client){ - spdlog::info("Processing file {}", path); - } + +bool parse_date_from_filename(std::string_view path, + int& year, int& month, int& day) +{ + // Match YYYY-MM-DD anywhere in the path + static const std::regex re(R"((\d{4})-(\d{2})-(\d{2}))", + std::regex::optimize); + + std::cmatch m; + if (!std::regex_search(path.begin(), path.end(), m, re)) + return false; + + year = std::stoi(m[1].str()); + month = std::stoi(m[2].str()); + day = std::stoi(m[3].str()); + + return true; +} + + +bool parse_time_from_line(std::string_view line, + int& hour, int& min, int& sec) +{ + // Expect: [HH:MM:SS] + if (line.size() < 10 || line[0] != '[') + return false; + + hour = (line[1] - '0') * 10 + (line[2] - '0'); + min = (line[4] - '0') * 10 + (line[5] - '0'); + sec = (line[7] - '0') * 10 + (line[8] - '0'); + + return true; +} + +#include +#include + +time_t extract_timestamp_utc(std::string_view filename, + std::string_view line) +{ + int year, month, day; + int hour, min, sec; + if (!parse_date_from_filename(filename, year, month, day)) { + // latest.log → use today's UTC date + std::time_t now = std::time(nullptr); + std::tm utc{}; + gmtime_r(&now, &utc); + + year = utc.tm_year + 1900; + month = utc.tm_mon + 1; + day = utc.tm_mday; + } + + + if (!parse_time_from_line(line, hour, min, sec)) { + return static_cast(-1); + } + + + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + tm.tm_hour = hour; + tm.tm_min = min; + tm.tm_sec = sec; + tm.tm_isdst = 0; + + +#if defined(__linux__) + return timegm(&tm); +#else + // Portable fallback + std::time_t local = std::mktime(&tm); + return local - timezone; +#endif +} + + + enum class Route { + NONE, + + CLIENT_START, + CLIENT_END, + + CHAT, + + ACT_COMMAND, + ACT_DEATH, + ACT_DIMENSION, + ACT_OTHER +}; + + Route classify_line(std::string_view line) { + + // ---- Client lifecycle ---- + if (line.find("Starting Minecraft client") != std::string_view::npos) + return Route::CLIENT_START; + + if (line.find("Stopping!") != std::string_view::npos) + return Route::CLIENT_END; + + // ---- Chat ---- + if (line.find("[CHAT] <") != std::string_view::npos) + return Route::CHAT; + + // ---- Activity ---- + if (line.find("[CHAT] /") != std::string_view::npos) + return Route::ACT_COMMAND; + + if (line.find(" was slain by ") != std::string_view::npos || + line.find(" fell from ") != std::string_view::npos) + return Route::ACT_DEATH; + + if (line.find(" moved to the ") != std::string_view::npos) + return Route::ACT_DIMENSION; + + return Route::NONE; + } + + std::pair +parse_chat(std::string_view line) { + // [CHAT] hello world + + auto start = line.find("[CHAT] <"); + start += 8; + + auto end = line.find('>', start); + auto msg = end + 2; + + return { + std::string(line.substr(start, end - start)), + std::string(line.substr(msg)) + }; +} + + void process_to_eof(std::string path, std::shared_ptrclient, std::string target_user_id){ + spdlog::info("Processing file {} ", path); + gzFile gz_file_log = gzopen(path.c_str(), "rb"); + char buffer[8192]; + std::string current_line = std::string{}; + db_utils::ClientEventsBatch client_events = db_utils::ClientEventsBatch {}; + db_utils::ChatEventsBatch chat_events = db_utils::ChatEventsBatch {}; + db_utils::ActivityEventsBatch activity_events = db_utils::ActivityEventsBatch {}; + while(gzgets(gz_file_log, buffer, sizeof(buffer))!=nullptr){ + current_line.append(buffer); + if(current_line.empty() || current_line.back()!='\n') continue; + current_line.pop_back(); + std::time_t log_timestamp = extract_timestamp_utc(path, current_line); + switch (classify_line(current_line)) { + case Route::CLIENT_START: + client_events.add_row(target_user_id, log_timestamp, std::variant {"start"}, path); + break; + case Route::CLIENT_END: + client_events.add_row(target_user_id, log_timestamp, std::variant {"end"}, path); + break; + case Route::ACT_COMMAND: + activity_events.add_row(target_user_id, log_timestamp, "command", float {1}); + break; + case Route::ACT_DEATH: + activity_events.add_row(target_user_id, log_timestamp, std::variant {"death"}, float {0.5}); + break; + case Route::ACT_DIMENSION: + activity_events.add_row(target_user_id, log_timestamp, std::variant {"dimension"}, float {0.3}); + break; + case Route::ACT_OTHER: + activity_events.add_row(target_user_id, log_timestamp, std::variant {"other"}, float {0.2}); + break; + case Route::CHAT : + std::pair chat_message_pair = parse_chat(current_line); + chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ; + } + } + activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client); + current_line.clear(); + } + void process_blocking(std::string path, + std::shared_ptr client, + std::string target_user_id) +{ + spdlog::info("Processing (blocking) file {}", path); + + std::ifstream input_file(path); + if (!input_file.is_open()) { + spdlog::error("Failed to open file {}", path); + return; + } + + // Seek to the end like tail -f + input_file.seekg(0, std::ios::end); + + std::string current_line; + db_utils::ClientEventsBatch client_events{}; + db_utils::ChatEventsBatch chat_events{}; + db_utils::ActivityEventsBatch activity_events{}; + + while (true) { + // Read all available lines + while (std::getline(input_file, current_line)) { + if (current_line.empty()) continue; + + std::time_t log_timestamp = extract_timestamp_utc(path, current_line); + + switch (classify_line(current_line)) { + case Route::CLIENT_START: + client_events.add_row(target_user_id, log_timestamp, + std::variant{"start"}, path); + break; + case Route::CLIENT_END: + client_events.add_row(target_user_id, log_timestamp, + std::variant{"end"}, path); + break; + case Route::ACT_COMMAND: + activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f); + break; + case Route::ACT_DEATH: + activity_events.add_row(target_user_id, log_timestamp, + std::variant{"death"}, 0.5f); + break; + case Route::ACT_DIMENSION: + activity_events.add_row(target_user_id, log_timestamp, + std::variant{"dimension"}, 0.3f); + break; + case Route::ACT_OTHER: + activity_events.add_row(target_user_id, log_timestamp, + std::variant{"other"}, 0.2f); + break; + case Route::CHAT: { + auto chat_pair = parse_chat(current_line); + chat_events.add_row(target_user_id, log_timestamp, + chat_pair.first, chat_pair.second); + break; + } + default: + break; + } + } + + // Insert accumulated batches into ClickHouse + activity_events.insert_as_batch(client); + chat_events.insert_as_batch(client); + client_events.insert_as_batch(client); + + current_line.clear(); + + // Clear EOF state so we can continue reading + input_file.clear(); + + // Sleep a bit before polling for new data + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} } diff --git a/src/processing_utils.hpp b/src/processing_utils.hpp index cf8d85f..d031e34 100644 --- a/src/processing_utils.hpp +++ b/src/processing_utils.hpp @@ -4,6 +4,6 @@ #include #include namespace processing_utils { - void process_to_eof(std::string path, std::shared_ptrclient, bool compressed = true); - void process_blocking(std::string path, std::shared_ptr client); + void process_to_eof(std::string path, std::shared_ptrclient, std::string target_user_id); + void process_blocking(std::string path, std::shared_ptr client, std::string target_user_id); }