Compare commits

...

3 Commits

Author SHA1 Message Date
Mars Ultor
1b6cf3d37e nvim session file and zlib added 2025-12-24 19:41:07 -06:00
Mars Ultor
dbc28ec002 nvim session file and zlib added 2025-12-24 00:20:50 -06:00
Mars Ultor
bea7653433 created file processing module 2025-12-23 23:39:42 -06:00
7 changed files with 566 additions and 1 deletions

3
.gitmodules vendored
View File

@@ -8,3 +8,6 @@
[submodule "external/spdlog"]
path = external/spdlog
url = https://github.com/gabime/spdlog
[submodule "external/zlib"]
path = external/zlib
url = https://github.com/madler/zlib

View File

@@ -24,7 +24,7 @@ set(CMAKE_BUILD_TYPE Release)
# Dependencies (local submodules / cloned repos)
# -----------------------------
add_subdirectory(external/spdlog)
add_subdirectory(external/zlib)
add_subdirectory(external/clickhouse-cpp)
add_subdirectory(external/tomlplusplus)
# -----------------------------
@@ -62,6 +62,7 @@ target_link_libraries(${PROJECT_NAME} PRIVATE
spdlog::spdlog
clickhouse-cpp-lib
tomlplusplus::tomlplusplus
zlib
)
# -----------------------------

1
external/zlib vendored Submodule

Submodule external/zlib added at 570720b0c2

254
nvim-session Normal file
View File

@@ -0,0 +1,254 @@
let SessionLoad = 1
let s:so_save = &g:so | let s:siso_save = &g:siso | setg so=0 siso=0 | setl so=-1 siso=-1
let v:this_session=expand("<sfile>:p")
silent only
silent tabonly
cd /mnt/minecraft-watchdog
if expand('%') == '' && !&modified && line('$') <= 1 && getline(1) == ''
let s:wipebuf = bufnr('%')
endif
let s:shortmess_save = &shortmess
if &shortmess =~ 'A'
set shortmess=aoOA
else
set shortmess=aoO
endif
badd +26 src/main.cpp
badd +38 .gitignore
badd +1 .gitmodules
badd +65 CMakeLists.txt
badd +11 config.toml
badd +38 setup.sql
badd +1 health://
badd +56 src/database_utils.cpp
badd +42 src/database_utils.hpp
badd +1 .git/config
badd +7 src/processing_utils.hpp
badd +9 src/processing_utils.cpp
argglobal
%argdel
$argadd NvimTree_1
edit src/main.cpp
let s:save_splitbelow = &splitbelow
let s:save_splitright = &splitright
set splitbelow splitright
wincmd _ | wincmd |
vsplit
1wincmd h
wincmd w
wincmd _ | wincmd |
split
1wincmd k
wincmd _ | wincmd |
vsplit
1wincmd h
wincmd _ | wincmd |
split
1wincmd k
wincmd w
wincmd w
wincmd _ | wincmd |
split
1wincmd k
wincmd w
wincmd w
wincmd _ | wincmd |
vsplit
1wincmd h
wincmd w
let &splitbelow = s:save_splitbelow
let &splitright = s:save_splitright
wincmd t
let s:save_winminheight = &winminheight
let s:save_winminwidth = &winminwidth
set winminheight=0
set winheight=1
set winminwidth=0
set winwidth=1
exe 'vert 1resize ' . ((&columns * 30 + 173) / 347)
exe '2resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 2resize ' . ((&columns * 157 + 173) / 347)
exe '3resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 3resize ' . ((&columns * 157 + 173) / 347)
exe '4resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 4resize ' . ((&columns * 158 + 173) / 347)
exe '5resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 5resize ' . ((&columns * 158 + 173) / 347)
exe '6resize ' . ((&lines * 28 + 45) / 91)
exe 'vert 6resize ' . ((&columns * 158 + 173) / 347)
exe '7resize ' . ((&lines * 28 + 45) / 91)
exe 'vert 7resize ' . ((&columns * 157 + 173) / 347)
argglobal
enew
file NvimTree_1
balt src/main.cpp
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal nofoldenable
wincmd w
argglobal
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
silent! normal! zE
let &fdl = &fdl
let s:l = 18 - ((17 * winheight(0) + 14) / 29)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 18
normal! 069|
wincmd w
argglobal
if bufexists(fnamemodify("CMakeLists.txt", ":p")) | buffer CMakeLists.txt | else | edit CMakeLists.txt | endif
if &buftype ==# 'terminal'
silent file CMakeLists.txt
endif
balt src/main.cpp
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
silent! normal! zE
let &fdl = &fdl
let s:l = 29 - ((28 * winheight(0) + 14) / 29)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 29
normal! 08|
wincmd w
argglobal
if bufexists(fnamemodify("src/processing_utils.hpp", ":p")) | buffer src/processing_utils.hpp | else | edit src/processing_utils.hpp | endif
if &buftype ==# 'terminal'
silent file src/processing_utils.hpp
endif
balt src/main.cpp
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
silent! normal! zE
let &fdl = &fdl
let s:l = 9 - ((8 * winheight(0) + 14) / 29)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 9
normal! 0
wincmd w
argglobal
if bufexists(fnamemodify("src/processing_utils.cpp", ":p")) | buffer src/processing_utils.cpp | else | edit src/processing_utils.cpp | endif
if &buftype ==# 'terminal'
silent file src/processing_utils.cpp
endif
balt src/processing_utils.hpp
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
silent! normal! zE
let &fdl = &fdl
let s:l = 9 - ((8 * winheight(0) + 14) / 29)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 9
normal! 045|
wincmd w
argglobal
if bufexists(fnamemodify("term:///mnt/minecraft-watchdog//291786:/usr/bin/zsh", ":p")) | buffer term:///mnt/minecraft-watchdog//291786:/usr/bin/zsh | else | edit term:///mnt/minecraft-watchdog//291786:/usr/bin/zsh | endif
if &buftype ==# 'terminal'
silent file term:///mnt/minecraft-watchdog//291786:/usr/bin/zsh
endif
balt src/main.cpp
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
let s:l = 691 - ((27 * winheight(0) + 14) / 28)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 691
normal! 02|
wincmd w
argglobal
if bufexists(fnamemodify("term:///mnt/minecraft-watchdog//294756:/usr/bin/zsh", ":p")) | buffer term:///mnt/minecraft-watchdog//294756:/usr/bin/zsh | else | edit term:///mnt/minecraft-watchdog//294756:/usr/bin/zsh | endif
if &buftype ==# 'terminal'
silent file term:///mnt/minecraft-watchdog//294756:/usr/bin/zsh
endif
setlocal foldmethod=manual
setlocal foldexpr=0
setlocal foldmarker={{{,}}}
setlocal foldignore=#
setlocal foldlevel=0
setlocal foldminlines=1
setlocal foldnestmax=20
setlocal foldenable
let s:l = 1063 - ((27 * winheight(0) + 14) / 28)
if s:l < 1 | let s:l = 1 | endif
keepjumps exe s:l
normal! zt
keepjumps 1063
normal! 07|
wincmd w
2wincmd w
exe 'vert 1resize ' . ((&columns * 30 + 173) / 347)
exe '2resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 2resize ' . ((&columns * 157 + 173) / 347)
exe '3resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 3resize ' . ((&columns * 157 + 173) / 347)
exe '4resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 4resize ' . ((&columns * 158 + 173) / 347)
exe '5resize ' . ((&lines * 29 + 45) / 91)
exe 'vert 5resize ' . ((&columns * 158 + 173) / 347)
exe '6resize ' . ((&lines * 28 + 45) / 91)
exe 'vert 6resize ' . ((&columns * 158 + 173) / 347)
exe '7resize ' . ((&lines * 28 + 45) / 91)
exe 'vert 7resize ' . ((&columns * 157 + 173) / 347)
tabnext 1
if exists('s:wipebuf') && len(win_findbuf(s:wipebuf)) == 0 && getbufvar(s:wipebuf, '&buftype') isnot# 'terminal'
silent exe 'bwipe ' . s:wipebuf
endif
unlet! s:wipebuf
set winheight=1 winwidth=20
let &shortmess = s:shortmess_save
let &winminheight = s:save_winminheight
let &winminwidth = s:save_winminwidth
let s:sx = expand("<sfile>:p:r")."x.vim"
if filereadable(s:sx)
exe "source " . fnameescape(s:sx)
endif
let &g:so = s:so_save | let &g:siso = s:siso_save
set hlsearch
nohlsearch
doautoall SessionLoadPost
unlet SessionLoad
" vim: set ft=vim :

View File

@@ -1,9 +1,41 @@
#include "clickhouse/client.h"
#include "spdlog/spdlog.h"
#include <filesystem>
#include <memory>
#include <string>
#include <sys/types.h>
#include <toml++/toml.h>
#include "processing_utils.hpp"
void process_directory(const std::string& dir_path,
std::shared_ptr<clickhouse::Client> client,
const std::string& target_user_id,
bool all)
{
if (all) {
for (auto& entry : std::filesystem::directory_iterator(dir_path)) {
if (!entry.is_regular_file()) continue;
std::string filename = entry.path().filename().string();
if (filename == "latest.log") continue;
if (filename.ends_with(".log") || filename.ends_with(".log.gz")) {
processing_utils::process_to_eof(entry.path().string(), client, target_user_id);
}
}
}
// Second pass: always follow latest.log
std::filesystem::path latest_path = std::filesystem::path(dir_path) / "latest.log";
if (std::filesystem::exists(latest_path) && std::filesystem::is_regular_file(latest_path)) {
processing_utils::process_blocking(latest_path.string(), client, target_user_id);
} else {
spdlog::warn("latest.log not found in {}", dir_path);
}
}
int main (int argc, char *argv[]) {
auto config = toml::parse_file("config.toml");
@@ -22,5 +54,7 @@ int main (int argc, char *argv[]) {
std::shared_ptr<clickhouse::Client> shared_client = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetPassword(clickouse_password).SetDefaultDatabase(clickhouse_db).SetUser(clickhouse_username).SetHost(clickhouse_host).SetPort(clickhouse_port));
spdlog::info("Server connection: {}", shared_client->GetServerInfo().display_name);
spdlog::info("runnign on given path: {}", log_path);
process_directory(log_path, shared_client, user_id, ingest_all_logs);
return 0;
}

263
src/processing_utils.cpp Normal file
View File

@@ -0,0 +1,263 @@
#include "processing_utils.hpp"
#include "spdlog/spdlog.h"
#include "zlib.h"
#include <cstdint>
#include <fstream>
#include <regex>
#include <string>
#include <variant>
#include "database_utils.hpp"
namespace processing_utils {
bool parse_date_from_filename(std::string_view path,
int& year, int& month, int& day)
{
// Match YYYY-MM-DD anywhere in the path
static const std::regex re(R"((\d{4})-(\d{2})-(\d{2}))",
std::regex::optimize);
std::cmatch m;
if (!std::regex_search(path.begin(), path.end(), m, re))
return false;
year = std::stoi(m[1].str());
month = std::stoi(m[2].str());
day = std::stoi(m[3].str());
return true;
}
bool parse_time_from_line(std::string_view line,
int& hour, int& min, int& sec)
{
// Expect: [HH:MM:SS]
if (line.size() < 10 || line[0] != '[')
return false;
hour = (line[1] - '0') * 10 + (line[2] - '0');
min = (line[4] - '0') * 10 + (line[5] - '0');
sec = (line[7] - '0') * 10 + (line[8] - '0');
return true;
}
#include <ctime>
#include <string_view>
time_t extract_timestamp_utc(std::string_view filename,
std::string_view line)
{
int year, month, day;
int hour, min, sec;
if (!parse_date_from_filename(filename, year, month, day)) {
// latest.log → use today's UTC date
std::time_t now = std::time(nullptr);
std::tm utc{};
gmtime_r(&now, &utc);
year = utc.tm_year + 1900;
month = utc.tm_mon + 1;
day = utc.tm_mday;
}
if (!parse_time_from_line(line, hour, min, sec)) {
return static_cast<time_t>(-1);
}
std::tm tm{};
tm.tm_year = year - 1900;
tm.tm_mon = month - 1;
tm.tm_mday = day;
tm.tm_hour = hour;
tm.tm_min = min;
tm.tm_sec = sec;
tm.tm_isdst = 0;
#if defined(__linux__)
return timegm(&tm);
#else
// Portable fallback
std::time_t local = std::mktime(&tm);
return local - timezone;
#endif
}
enum class Route {
NONE,
CLIENT_START,
CLIENT_END,
CHAT,
ACT_COMMAND,
ACT_DEATH,
ACT_DIMENSION,
ACT_OTHER
};
Route classify_line(std::string_view line) {
// ---- Client lifecycle ----
if (line.find("Starting Minecraft client") != std::string_view::npos)
return Route::CLIENT_START;
if (line.find("Stopping!") != std::string_view::npos)
return Route::CLIENT_END;
// ---- Chat ----
if (line.find("[CHAT] <") != std::string_view::npos)
return Route::CHAT;
// ---- Activity ----
if (line.find("[CHAT] /") != std::string_view::npos)
return Route::ACT_COMMAND;
if (line.find(" was slain by ") != std::string_view::npos ||
line.find(" fell from ") != std::string_view::npos)
return Route::ACT_DEATH;
if (line.find(" moved to the ") != std::string_view::npos)
return Route::ACT_DIMENSION;
return Route::NONE;
}
std::pair<std::string, std::string>
parse_chat(std::string_view line) {
// [CHAT] <Steve> hello world
auto start = line.find("[CHAT] <");
start += 8;
auto end = line.find('>', start);
auto msg = end + 2;
return {
std::string(line.substr(start, end - start)),
std::string(line.substr(msg))
};
}
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id){
spdlog::info("Processing file {} ", path);
gzFile gz_file_log = gzopen(path.c_str(), "rb");
char buffer[8192];
std::string current_line = std::string{};
db_utils::ClientEventsBatch client_events = db_utils::ClientEventsBatch {};
db_utils::ChatEventsBatch chat_events = db_utils::ChatEventsBatch {};
db_utils::ActivityEventsBatch activity_events = db_utils::ActivityEventsBatch {};
while(gzgets(gz_file_log, buffer, sizeof(buffer))!=nullptr){
current_line.append(buffer);
if(current_line.empty() || current_line.back()!='\n') continue;
current_line.pop_back();
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
switch (classify_line(current_line)) {
case Route::CLIENT_START:
client_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"start"}, path);
break;
case Route::CLIENT_END:
client_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"end"}, path);
break;
case Route::ACT_COMMAND:
activity_events.add_row(target_user_id, log_timestamp, "command", float {1});
break;
case Route::ACT_DEATH:
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"death"}, float {0.5});
break;
case Route::ACT_DIMENSION:
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"dimension"}, float {0.3});
break;
case Route::ACT_OTHER:
activity_events.add_row(target_user_id, log_timestamp, std::variant<std::string, uint8_t> {"other"}, float {0.2});
break;
case Route::CHAT :
std::pair<std::string, std::string> chat_message_pair = parse_chat(current_line);
chat_events.add_row(target_user_id, log_timestamp, chat_message_pair.first, chat_message_pair.second) ;
}
}
activity_events.insert_as_batch(client); chat_events.insert_as_batch(client); client_events.insert_as_batch(client);
current_line.clear();
}
void process_blocking(std::string path,
std::shared_ptr<clickhouse::Client> client,
std::string target_user_id)
{
spdlog::info("Processing (blocking) file {}", path);
std::ifstream input_file(path);
if (!input_file.is_open()) {
spdlog::error("Failed to open file {}", path);
return;
}
// Seek to the end like tail -f
input_file.seekg(0, std::ios::end);
std::string current_line;
db_utils::ClientEventsBatch client_events{};
db_utils::ChatEventsBatch chat_events{};
db_utils::ActivityEventsBatch activity_events{};
while (true) {
// Read all available lines
while (std::getline(input_file, current_line)) {
if (current_line.empty()) continue;
std::time_t log_timestamp = extract_timestamp_utc(path, current_line);
switch (classify_line(current_line)) {
case Route::CLIENT_START:
client_events.add_row(target_user_id, log_timestamp,
std::variant<std::string, uint8_t>{"start"}, path);
break;
case Route::CLIENT_END:
client_events.add_row(target_user_id, log_timestamp,
std::variant<std::string, uint8_t>{"end"}, path);
break;
case Route::ACT_COMMAND:
activity_events.add_row(target_user_id, log_timestamp, "command", 1.0f);
break;
case Route::ACT_DEATH:
activity_events.add_row(target_user_id, log_timestamp,
std::variant<std::string, uint8_t>{"death"}, 0.5f);
break;
case Route::ACT_DIMENSION:
activity_events.add_row(target_user_id, log_timestamp,
std::variant<std::string, uint8_t>{"dimension"}, 0.3f);
break;
case Route::ACT_OTHER:
activity_events.add_row(target_user_id, log_timestamp,
std::variant<std::string, uint8_t>{"other"}, 0.2f);
break;
case Route::CHAT: {
auto chat_pair = parse_chat(current_line);
chat_events.add_row(target_user_id, log_timestamp,
chat_pair.first, chat_pair.second);
break;
}
default:
break;
}
}
// Insert accumulated batches into ClickHouse
activity_events.insert_as_batch(client);
chat_events.insert_as_batch(client);
client_events.insert_as_batch(client);
current_line.clear();
// Clear EOF state so we can continue reading
input_file.clear();
// Sleep a bit before polling for new data
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
}

9
src/processing_utils.hpp Normal file
View File

@@ -0,0 +1,9 @@
#pragma once
#include "clickhouse/client.h"
#include <memory>
#include <string>
namespace processing_utils {
void process_to_eof(std::string path, std::shared_ptr<clickhouse::Client>client, std::string target_user_id);
void process_blocking(std::string path, std::shared_ptr<clickhouse::Client> client, std::string target_user_id);
}