#include "processing_utils.hpp" #include "spdlog/spdlog.h" #include "zlib.h" #include #include #include #include #include #include "database_utils.hpp" namespace processing_utils { bool parse_date_from_filename(std::string_view path, int& year, int& month, int& day) { // Match YYYY-MM-DD anywhere in the path static const std::regex re(R"((\d{4})-(\d{2})-(\d{2}))", std::regex::optimize); std::cmatch m; if (!std::regex_search(path.begin(), path.end(), m, re)) return false; year = std::stoi(m[1].str()); month = std::stoi(m[2].str()); day = std::stoi(m[3].str()); return true; } bool parse_time_from_line(std::string_view line, int& hour, int& min, int& sec) { // Expect: [HH:MM:SS] if (line.size() < 10 || line[0] != '[') return false; hour = (line[1] - '0') * 10 + (line[2] - '0'); min = (line[4] - '0') * 10 + (line[5] - '0'); sec = (line[7] - '0') * 10 + (line[8] - '0'); return true; } #include #include time_t extract_timestamp_utc(std::string_view filename, std::string_view line) { int year, month, day; int hour, min, sec; if (!parse_date_from_filename(filename, year, month, day)) { // latest.log → use today's UTC date std::time_t now = std::time(nullptr); std::tm utc{}; gmtime_r(&now, &utc); year = utc.tm_year + 1900; month = utc.tm_mon + 1; day = utc.tm_mday; } if (!parse_time_from_line(line, hour, min, sec)) { return static_cast(-1); } std::tm tm{}; tm.tm_year = year - 1900; tm.tm_mon = month - 1; tm.tm_mday = day; tm.tm_hour = hour; tm.tm_min = min; tm.tm_sec = sec; tm.tm_isdst = 0; #if defined(__linux__) return timegm(&tm); #else // Portable fallback std::time_t local = std::mktime(&tm); return local - timezone; #endif } enum class Route { NONE, CLIENT_START, CLIENT_END, CHAT, ACT_COMMAND, ACT_DEATH, ACT_DIMENSION, ACT_OTHER }; Route classify_line(std::string_view line) { // ---- Client lifecycle ---- if (line.find("Starting Minecraft client") != std::string_view::npos) return Route::CLIENT_START; if (line.find("Stopping!") != std::string_view::npos) return Route::CLIENT_END; // ---- Chat ---- if (line.find("[CHAT] <") != std::string_view::npos) return Route::CHAT; // ---- Activity ---- if (line.find("[CHAT] /") != std::string_view::npos) return Route::ACT_COMMAND; if (line.find(" was slain by ") != std::string_view::npos || line.find(" fell from ") != std::string_view::npos) return Route::ACT_DEATH; if (line.find(" moved to the ") != std::string_view::npos) return Route::ACT_DIMENSION; return Route::NONE; } std::pair parse_chat(std::string_view line) { // [CHAT] hello world auto start = line.find("[CHAT] <"); start += 8; auto end = line.find('>', start); auto msg = end + 2; return { std::string(line.substr(start, end - start)), std::string(line.substr(msg)) }; } void process_to_eof(std::string path, std::shared_ptrclient, std::string target_user_id){ spdlog::info("Processing file {} ", path); gzFile gz_file_log = gzopen(path.c_str(), "rb"); char buffer[8192]; std::string current_line = std::string{}; db_utils::ClientEventsBatch client_events = db_utils::ClientEventsBatch {}; db_utils::ChatEventsBatch chat_events = db_utils::ChatEventsBatch {}; db_utils::ActivityEventsBatch activity_events = db_utils::ActivityEventsBatch {}; while(gzgets(gz_file_log, buffer, sizeof(buffer))!=nullptr){ current_line.append(buffer); if(current_line.empty() || current_line.back()!='\n') continue; current_line.pop_back(); std::time_t log_timestamp = extract_timestamp_utc(path, current_line); switch (classify_line(current_line)) { case Route::CLIENT_START: client_events.add_row(target_user_id, log_timestamp, std::variant {"start"}, path); break; case Route::CLIENT_END: client_events.add_row(target_user_id, log_timestamp, std::variant {"end"}, path); break; case Route::ACT_COMMAND: activity_events.add_row(target_user_id, log_timestamp, "command", float {1}); break; case Route::ACT_DEATH: activity_events.add_row(target_user_id, log_timestamp, std::variant {"death"}, float {0.5}); break; case Route::ACT_DIMENSION: activity_events.add_row(target_user_id, log_timestamp, std::variant {"dimension"}, float {0.3}); break; case Route::ACT_OTHER: activity_events.add_row(target_user_id, log_timestamp, std::variant {"other"}, float {0.2}); break; case Route::CHAT : std::pair chat_message_pair = parse_chat(current_line); chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ; } current_line.clear(); } activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client); current_line.clear(); } void process_blocking(std::string path, std::shared_ptr client, std::string target_user_id) { spdlog::info("Processing (blocking) file {}", path); std::ifstream input_file(path); if (!input_file.is_open()) { spdlog::error("Failed to open file {}", path); return; } // Seek to the end like tail -f input_file.seekg(0, std::ios::end); std::string current_line; db_utils::ClientEventsBatch client_events{}; db_utils::ChatEventsBatch chat_events{}; db_utils::ActivityEventsBatch activity_events{}; while (true) { // Read all available lines while (std::getline(input_file, current_line)) { if (current_line.empty()) continue; std::time_t log_timestamp = extract_timestamp_utc(path, current_line); switch (classify_line(current_line)) { case Route::CLIENT_START: client_events.add_row(target_user_id, log_timestamp, std::variant{"start"}, path); break; case Route::CLIENT_END: client_events.add_row(target_user_id, log_timestamp, std::variant{"end"}, path); break; case Route::ACT_COMMAND: activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f); break; case Route::ACT_DEATH: activity_events.add_row(target_user_id, log_timestamp, std::variant{"death"}, 0.5f); break; case Route::ACT_DIMENSION: activity_events.add_row(target_user_id, log_timestamp, std::variant{"dimension"}, 0.3f); break; case Route::ACT_OTHER: activity_events.add_row(target_user_id, log_timestamp, std::variant{"other"}, 0.2f); break; case Route::CHAT: { auto chat_pair = parse_chat(current_line); chat_events.add_row(target_user_id, log_timestamp, chat_pair.first, chat_pair.second); break; } default: break; } } // Insert accumulated batches into ClickHouse activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client); current_line.clear(); // Clear EOF state so we can continue reading input_file.clear(); // Sleep a bit before polling for new data std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } }