nvim session file and zlib added
This commit is contained in:
34
src/main.cpp
34
src/main.cpp
@@ -1,9 +1,41 @@
|
||||
#include "clickhouse/client.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <filesystem>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <sys/types.h>
|
||||
#include <toml++/toml.h>
|
||||
#include "processing_utils.hpp"
|
||||
|
||||
void process_directory(const std::string& dir_path,
|
||||
std::shared_ptr<clickhouse::Client> client,
|
||||
const std::string& target_user_id,
|
||||
bool all)
|
||||
{
|
||||
|
||||
if (all) {
|
||||
for (auto& entry : std::filesystem::directory_iterator(dir_path)) {
|
||||
if (!entry.is_regular_file()) continue;
|
||||
|
||||
std::string filename = entry.path().filename().string();
|
||||
|
||||
if (filename == "latest.log") continue;
|
||||
|
||||
if (filename.ends_with(".log") || filename.ends_with(".log.gz")) {
|
||||
processing_utils::process_to_eof(entry.path().string(), client, target_user_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: always follow latest.log
|
||||
std::filesystem::path latest_path = std::filesystem::path(dir_path) / "latest.log";
|
||||
if (std::filesystem::exists(latest_path) && std::filesystem::is_regular_file(latest_path)) {
|
||||
processing_utils::process_blocking(latest_path.string(), client, target_user_id);
|
||||
} else {
|
||||
spdlog::warn("latest.log not found in {}", dir_path);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int main (int argc, char *argv[]) {
|
||||
auto config = toml::parse_file("config.toml");
|
||||
@@ -22,5 +54,7 @@ int main (int argc, char *argv[]) {
|
||||
|
||||
std::shared_ptr<clickhouse::Client> shared_client = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetPassword(clickouse_password).SetDefaultDatabase(clickhouse_db).SetUser(clickhouse_username).SetHost(clickhouse_host).SetPort(clickhouse_port));
|
||||
spdlog::info("Server connection: {}", shared_client->GetServerInfo().display_name);
|
||||
spdlog::info("runnign on given path: {}", log_path);
|
||||
process_directory(log_path, shared_client, user_id, ingest_all_logs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,263 @@
|
||||
#include "processing_utils.hpp"
|
||||
#include "spdlog/spdlog.h"
|
||||
|
||||
#include "zlib.h"
|
||||
#include <cstdint>
|
||||
#include <fstream>
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <variant>
|
||||
#include "database_utils.hpp"
|
||||
namespace processing_utils {
|
||||
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, bool compressed){
|
||||
spdlog::info("Processing file {} ", path);
|
||||
}
|
||||
void process_blocking(std::string path, std::shared_ptr<clickhouse::Client> client){
|
||||
|
||||
bool parse_date_from_filename(std::string_view path,
|
||||
int& year, int& month, int& day)
|
||||
{
|
||||
// Match YYYY-MM-DD anywhere in the path
|
||||
static const std::regex re(R"((\d{4})-(\d{2})-(\d{2}))",
|
||||
std::regex::optimize);
|
||||
|
||||
std::cmatch m;
|
||||
if (!std::regex_search(path.begin(), path.end(), m, re))
|
||||
return false;
|
||||
|
||||
year = std::stoi(m[1].str());
|
||||
month = std::stoi(m[2].str());
|
||||
day = std::stoi(m[3].str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool parse_time_from_line(std::string_view line,
|
||||
int& hour, int& min, int& sec)
|
||||
{
|
||||
// Expect: [HH:MM:SS]
|
||||
if (line.size() < 10 || line[0] != '[')
|
||||
return false;
|
||||
|
||||
hour = (line[1] - '0') * 10 + (line[2] - '0');
|
||||
min = (line[4] - '0') * 10 + (line[5] - '0');
|
||||
sec = (line[7] - '0') * 10 + (line[8] - '0');
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#include <ctime>
|
||||
#include <string_view>
|
||||
|
||||
time_t extract_timestamp_utc(std::string_view filename,
|
||||
std::string_view line)
|
||||
{
|
||||
int year, month, day;
|
||||
int hour, min, sec;
|
||||
if (!parse_date_from_filename(filename, year, month, day)) {
|
||||
// latest.log → use today's UTC date
|
||||
std::time_t now = std::time(nullptr);
|
||||
std::tm utc{};
|
||||
gmtime_r(&now, &utc);
|
||||
|
||||
year = utc.tm_year + 1900;
|
||||
month = utc.tm_mon + 1;
|
||||
day = utc.tm_mday;
|
||||
}
|
||||
|
||||
|
||||
if (!parse_time_from_line(line, hour, min, sec)) {
|
||||
return static_cast<time_t>(-1);
|
||||
}
|
||||
|
||||
|
||||
std::tm tm{};
|
||||
tm.tm_year = year - 1900;
|
||||
tm.tm_mon = month - 1;
|
||||
tm.tm_mday = day;
|
||||
tm.tm_hour = hour;
|
||||
tm.tm_min = min;
|
||||
tm.tm_sec = sec;
|
||||
tm.tm_isdst = 0;
|
||||
|
||||
|
||||
#if defined(__linux__)
|
||||
return timegm(&tm);
|
||||
#else
|
||||
// Portable fallback
|
||||
std::time_t local = std::mktime(&tm);
|
||||
return local - timezone;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
enum class Route {
|
||||
NONE,
|
||||
|
||||
CLIENT_START,
|
||||
CLIENT_END,
|
||||
|
||||
CHAT,
|
||||
|
||||
ACT_COMMAND,
|
||||
ACT_DEATH,
|
||||
ACT_DIMENSION,
|
||||
ACT_OTHER
|
||||
};
|
||||
|
||||
Route classify_line(std::string_view line) {
|
||||
|
||||
// ---- Client lifecycle ----
|
||||
if (line.find("Starting Minecraft client") != std::string_view::npos)
|
||||
return Route::CLIENT_START;
|
||||
|
||||
if (line.find("Stopping!") != std::string_view::npos)
|
||||
return Route::CLIENT_END;
|
||||
|
||||
// ---- Chat ----
|
||||
if (line.find("[CHAT] <") != std::string_view::npos)
|
||||
return Route::CHAT;
|
||||
|
||||
// ---- Activity ----
|
||||
if (line.find("[CHAT] /") != std::string_view::npos)
|
||||
return Route::ACT_COMMAND;
|
||||
|
||||
if (line.find(" was slain by ") != std::string_view::npos ||
|
||||
line.find(" fell from ") != std::string_view::npos)
|
||||
return Route::ACT_DEATH;
|
||||
|
||||
if (line.find(" moved to the ") != std::string_view::npos)
|
||||
return Route::ACT_DIMENSION;
|
||||
|
||||
return Route::NONE;
|
||||
}
|
||||
|
||||
std::pair<std::string, std::string>
|
||||
parse_chat(std::string_view line) {
|
||||
// [CHAT] <Steve> hello world
|
||||
|
||||
auto start = line.find("[CHAT] <");
|
||||
start += 8;
|
||||
|
||||
auto end = line.find('>', start);
|
||||
auto msg = end + 2;
|
||||
|
||||
return {
|
||||
std::string(line.substr(start, end - start)),
|
||||
std::string(line.substr(msg))
|
||||
};
|
||||
}
|
||||
|
||||
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){
|
||||
spdlog::info("Processing file {} ", path);
|
||||
gzFile gz_file_log = gzopen(path.c_str(), "rb");
|
||||
char buffer[8192];
|
||||
std::string current_line = std::string{};
|
||||
db_utils::ClientEventsBatch client_events = db_utils::ClientEventsBatch {};
|
||||
db_utils::ChatEventsBatch chat_events = db_utils::ChatEventsBatch {};
|
||||
db_utils::ActivityEventsBatch activity_events = db_utils::ActivityEventsBatch {};
|
||||
while(gzgets(gz_file_log, buffer, sizeof(buffer))!=nullptr){
|
||||
current_line.append(buffer);
|
||||
if(current_line.empty() || current_line.back()!='\n') continue;
|
||||
current_line.pop_back();
|
||||
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
|
||||
switch (classify_line(current_line)) {
|
||||
case Route::CLIENT_START:
|
||||
client_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"start"}, path);
|
||||
break;
|
||||
case Route::CLIENT_END:
|
||||
client_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"end"}, path);
|
||||
break;
|
||||
case Route::ACT_COMMAND:
|
||||
activity_events.add_row(target_user_id, log_timestamp, "command", float {1});
|
||||
break;
|
||||
case Route::ACT_DEATH:
|
||||
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"death"}, float {0.5});
|
||||
break;
|
||||
case Route::ACT_DIMENSION:
|
||||
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"dimension"}, float {0.3});
|
||||
break;
|
||||
case Route::ACT_OTHER:
|
||||
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"other"}, float {0.2});
|
||||
break;
|
||||
case Route::CHAT :
|
||||
std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line);
|
||||
chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ;
|
||||
}
|
||||
}
|
||||
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
|
||||
current_line.clear();
|
||||
}
|
||||
void process_blocking(std::string path,
|
||||
std::shared_ptr<clickhouse::Client> client,
|
||||
std::string target_user_id)
|
||||
{
|
||||
spdlog::info("Processing (blocking) file {}", path);
|
||||
|
||||
std::ifstream input_file(path);
|
||||
if (!input_file.is_open()) {
|
||||
spdlog::error("Failed to open file {}", path);
|
||||
return;
|
||||
}
|
||||
|
||||
// Seek to the end like tail -f
|
||||
input_file.seekg(0, std::ios::end);
|
||||
|
||||
std::string current_line;
|
||||
db_utils::ClientEventsBatch client_events{};
|
||||
db_utils::ChatEventsBatch chat_events{};
|
||||
db_utils::ActivityEventsBatch activity_events{};
|
||||
|
||||
while (true) {
|
||||
// Read all available lines
|
||||
while (std::getline(input_file, current_line)) {
|
||||
if (current_line.empty()) continue;
|
||||
|
||||
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
|
||||
|
||||
switch (classify_line(current_line)) {
|
||||
case Route::CLIENT_START:
|
||||
client_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"start"}, path);
|
||||
break;
|
||||
case Route::CLIENT_END:
|
||||
client_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"end"}, path);
|
||||
break;
|
||||
case Route::ACT_COMMAND:
|
||||
activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f);
|
||||
break;
|
||||
case Route::ACT_DEATH:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"death"}, 0.5f);
|
||||
break;
|
||||
case Route::ACT_DIMENSION:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"dimension"}, 0.3f);
|
||||
break;
|
||||
case Route::ACT_OTHER:
|
||||
activity_events.add_row(target_user_id, log_timestamp,
|
||||
std::variant<std::string, uint8_t>{"other"}, 0.2f);
|
||||
break;
|
||||
case Route::CHAT: {
|
||||
auto chat_pair = parse_chat(current_line);
|
||||
chat_events.add_row(target_user_id, log_timestamp,
|
||||
chat_pair.first, chat_pair.second);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert accumulated batches into ClickHouse
|
||||
activity_events.insert_as_batch(client);
|
||||
chat_events.insert_as_batch(client);
|
||||
client_events.insert_as_batch(client);
|
||||
|
||||
current_line.clear();
|
||||
|
||||
// Clear EOF state so we can continue reading
|
||||
input_file.clear();
|
||||
|
||||
// Sleep a bit before polling for new data
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
namespace processing_utils {
|
||||
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, bool compressed = true);
|
||||
void process_blocking(std::string path, std::shared_ptr<clickhouse::Client> client);
|
||||
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id);
|
||||
void process_blocking(std::string path, std::shared_ptr<clickhouse::Client> client, std::string target_user_id);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user