thread is running now too, but no full workflow executed

This commit is contained in:
2025-12-18 17:23:24 -06:00
parent 9ca1e8784f
commit e999beb8b6
4 changed files with 259 additions and 81 deletions

View File

@@ -1,16 +1,60 @@
#include "clickhouse/client.h" #include "clickhouse/client.h"
#include "cpr/api.h"
#include "cpr/response.h"
#include "nlohmann/json_fwd.hpp" #include "nlohmann/json_fwd.hpp"
#include "skwyward-api-utils.hpp" #include "skwyward-api-utils.hpp"
#include "spdlog/common.h"
#include "spdlog/spdlog.h" #include "spdlog/spdlog.h"
#include "types.hpp" #include "types.hpp"
#include <cctype>
#include <cstddef>
#include <exception> #include <exception>
#include <future>
#include <memory> #include <memory>
#include <sodium.h> #include <sodium.h>
#include <sodium/core.h> #include <sodium/core.h>
#include <string_view> #include <string_view>
#include <toml++/toml.h> #include <toml++/toml.h>
#include <unistd.h>
#include <utility>
void batched_update(std::shared_ptr<clickhouse::Client> client_ptr, std::string base_uri){
std::vector<database_utils::UserRecord> current_users = database_utils::get_all_users(client_ptr);
const size_t batch_size = size_t {5};
for (size_t base_iteration = size_t{0}; current_users.size(); base_iteration+=batch_size) {
std::vector<std::future<bool>> futures_for_this_batch = std::vector<std::future<bool>>();
for (size_t j =base_iteration; j< base_iteration + batch_size && j<current_users.size(); j++) {
spdlog::debug("Created future: {}", j);
futures_for_this_batch.push_back(std::async(std::launch::async, [current_record = current_users.at(j), base_uri, client_ptr] () {
api_utils::GradesResponse grades = api_methods::get_grades(base_uri, current_record.login.username, current_record.login.password);
std::optional<database_utils::GradeSnapshot> past_record_unique = database_utils::load_latest_grades(client_ptr, current_record.user_id);
bool is_unique = database_utils::conditionally_insert_grades(client_ptr, current_record.user_id, grades);
if(!is_unique) return is_unique;
if(!past_record_unique.has_value()) return is_unique;
std::vector<database_utils::AssignmentDiff> diffs = database_utils::diff_grade_responses(past_record_unique->response, grades);
std::optional<database_utils::GradeSnapshot> current_record_unique = database_utils::load_latest_grades(client_ptr, current_record.user_id);
if(!current_record_unique.has_value()) return is_unique;
database_utils::insert_grade_updates(client_ptr, current_record.user_id, past_record_unique->response_id, current_record_unique->response_id, diffs);
spdlog::info("Fully inserted all diffs for user {}", current_record.user_id);
return is_unique;
}));
}
for(std::future<bool>& future : futures_for_this_batch){
spdlog::info("update logged: {}",future.get());
}
}
}
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
spdlog::set_level(spdlog::level::debug);
auto config = toml::parse_file("config.toml"); auto config = toml::parse_file("config.toml");
std::string base_uri = config["host"].value_or(""); std::string base_uri = config["host"].value_or("");
std::string test_username = config["test_username"].value_or(""); std::string test_username = config["test_username"].value_or("");
@@ -29,8 +73,8 @@ int main (int argc, char *argv[]) {
api_utils::ErrorResponse test_login = api_methods::get_auth_status(base_uri, test_username, test_password); api_utils::ErrorResponse test_login = api_methods::get_auth_status(base_uri, test_username, test_password);
spdlog::info("Auth attempt response : {}", test_login.success); spdlog::info("Auth attempt response : {}", test_login.success);
api_utils::GradesResponse test_grades = api_methods::get_grades(base_uri, test_username, test_password); //api_utils::GradesResponse test_grades = api_methods::get_grades(base_uri, test_username, test_password);
spdlog::info("Grades: {}", nlohmann::json {test_grades}.dump()); //spdlog::info("Grades: {}", nlohmann::json {test_grades}.dump());
if(sodium_init()==-1){ if(sodium_init()==-1){
spdlog::error("Sodium, the crypto lib, cannot be inited. bailing"); spdlog::error("Sodium, the crypto lib, cannot be inited. bailing");
@@ -39,11 +83,25 @@ int main (int argc, char *argv[]) {
std::shared_ptr<clickhouse::Client> client_shared_ptr = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetHost(clickhouse_host_name).SetPassword(clickhouse_password).SetUser(clickhouse_username).SetDefaultDatabase(clickhouse_schema)); std::shared_ptr<clickhouse::Client> client_shared_ptr = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetHost(clickhouse_host_name).SetPassword(clickhouse_password).SetUser(clickhouse_username).SetDefaultDatabase(clickhouse_schema));
database_utils::register_user(client_shared_ptr, test_username, test_password); // database_utils::register_user(client_shared_ptr, test_username, test_password);
spdlog::info("USER UUID: {}", database_utils::uuid_to_string( database_utils::get_user_uuid(client_shared_ptr, test_username).value() )); // spdlog::info("USER UUID: {}", database_utils::uuid_to_string( database_utils::get_user_uuid(client_shared_ptr, test_username).value() ));
spdlog::info("Connected to the database"); spdlog::info("Connected to the database");
spdlog::info("runnign a thread");
std::thread update_thread = std::thread { [client_shared_ptr, base_uri](){
while(true){
batched_update(client_shared_ptr, base_uri);
sleep(5*60);
spdlog::info("Rerunning update cycle");
}
} };
update_thread.join();
return 0; return 0;
} }

View File

@@ -153,12 +153,21 @@ void to_json(json& j, const GradesResponse& r) {
}; };
} }
void from_json(const json& j, GradesResponse& r) { void from_json(const nlohmann::json& j, api_utils::GradesResponse& r) {
j.at("success").get_to(r.success); // success may or may not exist
j.at("totalClasses").get_to(r.totalClasses); r.success = j.value("success", false);
j.at("grades").get_to(r.grades);
r.totalClasses = j.value("totalClasses", 0);
// grades MUST exist, but still guard
if (j.contains("grades") && j["grades"].is_array()) {
j.at("grades").get_to(r.grades);
} else {
r.grades.clear();
}
} }
} // namespace api_utils } // namespace api_utils
@@ -191,15 +200,30 @@ api_utils::ErrorResponse get_auth_status(
return nlohmann::json::parse(r.text) return nlohmann::json::parse(r.text)
.get<api_utils::ErrorResponse>(); .get<api_utils::ErrorResponse>();
} }
api_utils::GradesResponse get_grades(std::string url, std::string username, std::string password) { api_utils::GradesResponse get_grades(
api_utils::Login login = api_utils::Login{username, password}; std::string url,
std::string username,
std::string password
) {
api_utils::Login login{username, password};
cpr::Response r = cpr::Post( cpr::Response r = cpr::Post(
cpr::Url{url + "/fetch-grades"}, cpr::Url{url + "/fetch-grades"},
cpr::Body{nlohmann::json(login).dump()}, cpr::Body{nlohmann::json(login).dump()},
cpr::Header{{"Content-Type", "application/json"}} cpr::Header{{"Content-Type", "application/json"}}
); );
return nlohmann::json::parse(r.text).get<api_utils::GradesResponse>(); auto j = nlohmann::json::parse(r.text);
}
if (j.is_array()) {
if (j.empty()) {
throw std::runtime_error("Grades API returned empty array");
}
j = j.at(0);
}
return j.get<api_utils::GradesResponse>();
}
} // namespace api_methods } // namespace api_methods

View File

@@ -1,5 +1,8 @@
#include "types.hpp" #include "types.hpp"
#include "clickhouse/base/uuid.h" #include "clickhouse/base/uuid.h"
#include <spdlog/spdlog.h>
#include <unordered_map> #include <unordered_map>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
@@ -10,19 +13,29 @@ using namespace clickhouse;
namespace database_utils { namespace database_utils {
// ---------------- UUID helpers ---------------- // ---------------- UUID helpers ----------------
clickhouse::UUID parse_uuid(const std::string& str) { clickhouse::UUID parse_uuid(const std::string& str) {
if (str.size() != 36) throw std::runtime_error("Invalid UUID string: " + str); spdlog::debug("parse_uuid: input={}", str);
if (str.size() != 36)
throw std::runtime_error("Invalid UUID string: " + str);
std::string hexstr; std::string hexstr;
for (char c : str) if (c != '-') hexstr += c; for (char c : str)
if (c != '-') hexstr += c;
if (hexstr.size() != 32) throw std::runtime_error("Invalid UUID format: " + str); if (hexstr.size() != 32)
throw std::runtime_error("Invalid UUID format: " + str);
uint64_t high = 0, low = 0; uint64_t high = 0, low = 0;
std::stringstream ss; std::stringstream ss;
ss << std::hex << hexstr.substr(0, 16); ss << std::hex << hexstr.substr(0, 16);
ss >> high; ss >> high;
ss.clear(); ss.str("");
ss.clear();
ss.str("");
ss << std::hex << hexstr.substr(16, 16); ss << std::hex << hexstr.substr(16, 16);
ss >> low; ss >> low;
@@ -33,17 +46,28 @@ std::string uuid_to_string(const clickhouse::UUID& u) {
std::stringstream ss; std::stringstream ss;
ss << std::hex << std::setw(16) << std::setfill('0') << u.first ss << std::hex << std::setw(16) << std::setfill('0') << u.first
<< std::setw(16) << std::setfill('0') << u.second; << std::setw(16) << std::setfill('0') << u.second;
std::string s = ss.str(); std::string s = ss.str();
// Insert dashes for standard UUID format
return s.substr(0,8) + "-" + s.substr(8,4) + "-" + s.substr(12,4) + "-" + s.substr(16,4) + "-" + s.substr(20,12); return s.substr(0, 8) + "-" +
s.substr(8, 4) + "-" +
s.substr(12, 4) + "-" +
s.substr(16, 4) + "-" +
s.substr(20, 12);
} }
// ---------------- Users ---------------- // ---------------- Users ----------------
std::vector<UserRecord> get_all_users(const CHClient& client) { std::vector<UserRecord> get_all_users(const CHClient& client) {
spdlog::debug("get_all_users");
std::vector<UserRecord> out; std::vector<UserRecord> out;
client->Select( client->Select(
"SELECT user_id, username, password FROM users", "SELECT user_id, username, password FROM users",
[&](const Block& b){ [&](const Block& b) {
spdlog::debug("get_all_users: rows={}", b.GetRowCount());
for (size_t i = 0; i < b.GetRowCount(); ++i) { for (size_t i = 0; i < b.GetRowCount(); ++i) {
UserRecord u; UserRecord u;
u.user_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i)); u.user_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i));
@@ -53,15 +77,22 @@ std::vector<UserRecord> get_all_users(const CHClient& client) {
} }
} }
); );
return out; return out;
} }
bool register_user(const CHClient& client, const std::string& username, const std::string& password) { bool register_user(const CHClient& client,
const std::string& username,
const std::string& password) {
spdlog::debug("register_user: username={}", username);
Block b; Block b;
auto u = std::make_shared<ColumnString>(); auto u = std::make_shared<ColumnString>();
auto p = std::make_shared<ColumnString>(); auto p = std::make_shared<ColumnString>();
u->Append(username); u->Append(username);
p->Append(password); p->Append(password);
b.AppendColumn("username", u); b.AppendColumn("username", u);
b.AppendColumn("password", p); b.AppendColumn("password", p);
@@ -69,134 +100,199 @@ bool register_user(const CHClient& client, const std::string& username, const st
return true; return true;
} }
bool authenticate_user(const CHClient& client, const std::string& username, const std::string& password) { bool authenticate_user(const CHClient& client,
const std::string& username,
const std::string& password) {
spdlog::debug("authenticate_user: username={}", username);
bool ok = false; bool ok = false;
client->Select( client->Select(
"SELECT count() FROM users WHERE username = '" + username + "' AND password = '" + password + "'", "SELECT count() FROM users "
[&](const Block& b){ ok = b[0]->As<ColumnUInt64>()->At(0) > 0; } "WHERE username = '" + username +
"' AND password = '" + password + "'",
[&](const Block& b) {
ok = b[0]->As<ColumnUInt64>()->At(0) > 0;
spdlog::debug("authenticate_user result={}", ok);
}
); );
return ok; return ok;
} }
// ---------------- Load latest snapshot ---------------- // ---------------- Load latest snapshot ----------------
std::optional<GradeSnapshot> load_latest_grades(const CHClient& client, const std::string& user_id) {
std::optional<GradeSnapshot>
load_latest_grades(const CHClient& client,
const std::string& user_id) {
spdlog::debug("load_latest_grades: user_id={}", user_id);
GradeSnapshot snap; GradeSnapshot snap;
bool found = false; bool found = false;
client->Select( client->Select(
"SELECT response_id, success, total_classes FROM grade_responses " "SELECT response_id, success, total_classes "
"WHERE user_id = '" + user_id + "' ORDER BY fetched_at DESC LIMIT 1", "FROM grade_responses "
[&](const Block& b){ "WHERE user_id = '" + user_id +
if (b.GetRowCount() == 0) return; "' ORDER BY fetched_at DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() == 0) {
spdlog::debug("load_latest_grades: no previous snapshot");
return;
}
found = true; found = true;
snap.response_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0)); snap.response_id =
snap.response.success = b[1]->As<ColumnUInt8>()->At(0); uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
snap.response.totalClasses = b[2]->As<ColumnInt32>()->At(0); snap.response.success =
b[1]->As<ColumnUInt8>()->At(0);
snap.response.totalClasses =
b[2]->As<ColumnInt32>()->At(0);
} }
); );
if (!found) return std::nullopt;
if (!found)
return std::nullopt;
return snap; return snap;
} }
// ---------------- Diff ---------------- // ---------------- Diff ----------------
std::vector<AssignmentDiff> diff_grade_responses(
const api_utils::GradesResponse& old_resp, std::vector<AssignmentDiff>
const api_utils::GradesResponse& new_resp diff_grade_responses(const api_utils::GradesResponse& old_resp,
) { const api_utils::GradesResponse& new_resp) {
std::unordered_map<std::string, api_utils::AssignmentGrade> old_map; spdlog::debug("diff_grade_responses");
std::unordered_map<std::string,
api_utils::AssignmentGrade> old_map;
for (const auto& c : old_resp.grades) for (const auto& c : old_resp.grades)
for (const auto& g : c.grades) for (const auto& g : c.grades)
old_map[c.className + "::" + g.name] = g; old_map[c.className + "::" + g.name] = g;
std::vector<AssignmentDiff> diffs; std::vector<AssignmentDiff> diffs;
for (const auto& c : new_resp.grades) { for (const auto& c : new_resp.grades) {
for (const auto& g : c.grades) { for (const auto& g : c.grades) {
std::string key = c.className + "::" + g.name; std::string key = c.className + "::" + g.name;
auto it = old_map.find(key); auto it = old_map.find(key);
if (it == old_map.end()) { if (it == old_map.end()) {
diffs.push_back({"", "", std::nullopt, g}); diffs.push_back({"", "", std::nullopt, g});
continue; continue;
} }
if (it->second.score != g.score || it->second.attempts != g.attempts) {
if (it->second.score != g.score ||
it->second.attempts != g.attempts) {
diffs.push_back({"", "", it->second, g}); diffs.push_back({"", "", it->second, g});
} }
} }
} }
spdlog::debug("diff_grade_responses: diffs={}", diffs.size());
return diffs; return diffs;
} }
// ---------------- Conditional insert ---------------- // ---------------- Conditional insert ----------------
bool conditionally_insert_grades(
const CHClient& client, bool conditionally_insert_grades(const CHClient& client,
const std::string& user_id, const std::string& user_id,
const api_utils::GradesResponse& new_resp const api_utils::GradesResponse& new_resp) {
) { spdlog::debug("conditionally_insert_grades: user_id={}", user_id);
auto old = load_latest_grades(client, user_id); auto old = load_latest_grades(client, user_id);
if (!old) return true;
return !diff_grade_responses(old->response, new_resp).empty(); if (!old) {
spdlog::debug(
"No previous snapshot — MUST insert first snapshot");
return true;
}
bool changed =
!diff_grade_responses(old->response, new_resp).empty();
spdlog::debug("conditionally_insert_grades: changed={}", changed);
return changed;
} }
void insert_grade_updates( // ---------------- Grade updates ----------------
std::shared_ptr<clickhouse::Client> client,
const std::string& user_id_str, void insert_grade_updates(const CHClient& client,
const std::string& old_response_id_str, const std::string& user_id_str,
const std::string& new_response_id_str, const std::string& old_response_id_str,
const std::vector<AssignmentDiff>& diffs const std::string& new_response_id_str,
) { const std::vector<AssignmentDiff>& diffs) {
if (diffs.empty()) spdlog::debug("insert_grade_updates: diffs={}", diffs.size());
if (diffs.empty()) {
spdlog::debug("insert_grade_updates: nothing to insert");
return; return;
}
Query query( Query query(
"INSERT INTO grade_updates " "INSERT INTO grade_updates "
"(user_id, old_response_id, new_response_id, class_grade_id, assignment_id, assignment_name, " "(user_id, old_response_id, new_response_id, "
"class_grade_id, assignment_id, assignment_name, "
"old_score, new_score, old_attempts, new_attempts) " "old_score, new_score, old_attempts, new_attempts) "
"VALUES " "VALUES "
"({user_id: String}, {old_response_id: String}, {new_response_id: String}, " "({user_id:String}, {old_response_id:String}, "
"{class_grade_id: String}, {assignment_id: String}, {assignment_name: String}, " "{new_response_id:String}, {class_grade_id:String}, "
"{old_score: Nullable(String)}, {new_score: String}, " "{assignment_id:String}, {assignment_name:String}, "
"{old_attempts: Nullable(String)}, {new_attempts: String})" "{old_score:Nullable(String)}, {new_score:String}, "
"{old_attempts:Nullable(String)}, {new_attempts:String})"
); );
std::string user_uuid_str = uuid_to_string(parse_uuid(user_id_str));
std::string old_response_uuid_str = uuid_to_string(parse_uuid(old_response_id_str));
std::string new_response_uuid_str = uuid_to_string(parse_uuid(new_response_id_str));
for (const auto& d : diffs) { for (const auto& d : diffs) {
query.SetParam("user_id", user_uuid_str); query.SetParam("user_id", user_id_str);
query.SetParam("old_response_id", old_response_uuid_str); query.SetParam("old_response_id", old_response_id_str);
query.SetParam("new_response_id", new_response_uuid_str); query.SetParam("new_response_id", new_response_id_str);
query.SetParam("class_grade_id", uuid_to_string(parse_uuid(d.class_grade_id))); query.SetParam("class_grade_id", d.class_grade_id);
query.SetParam("assignment_id", uuid_to_string(parse_uuid(d.assignment_id))); query.SetParam("assignment_id", d.assignment_id);
query.SetParam("assignment_name", d.new_grade.name); query.SetParam("assignment_name", d.new_grade.name);
if (d.old_grade) { if (d.old_grade) {
query.SetParam("old_score", std::to_string(d.old_grade->score)); query.SetParam("old_score",
query.SetParam("old_attempts", d.old_grade->attempts); std::to_string(d.old_grade->score));
query.SetParam("old_attempts",
d.old_grade->attempts);
} else { } else {
query.SetParam("old_score", QueryParamValue()); query.SetParam("old_score", QueryParamValue());
query.SetParam("old_attempts", QueryParamValue()); query.SetParam("old_attempts", QueryParamValue());
} }
query.SetParam("new_score", std::to_string(d.new_grade.score)); query.SetParam("new_score",
query.SetParam("new_attempts", d.new_grade.attempts); std::to_string(d.new_grade.score));
query.SetParam("new_attempts",
d.new_grade.attempts);
client->Execute(query); client->Execute(query);
} }
spdlog::debug("insert_grade_updates: done");
} }
// ---------------- Lookup user UUID ----------------
std::optional<clickhouse::UUID>
get_user_uuid(const CHClient& client,
const std::string& username) {
spdlog::debug("get_user_uuid: username={}", username);
std::optional<clickhouse::UUID> get_user_uuid(const CHClient& client, const std::string& username) {
std::optional<clickhouse::UUID> result; std::optional<clickhouse::UUID> result;
client->Select( client->Select(
"SELECT user_id FROM users WHERE username = '" + username + "' LIMIT 1", "SELECT user_id FROM users "
[&](const clickhouse::Block& b) { "WHERE username = '" + username + "' LIMIT 1",
if (b.GetRowCount() == 0) [&](const Block& b) {
if (b.GetRowCount() == 0) {
spdlog::debug("get_user_uuid: not found");
return; return;
}
auto col_uuid = b[0]->As<clickhouse::ColumnUUID>(); result = b[0]->As<ColumnUUID>()->At(0);
auto uuid_val = col_uuid->At(0); spdlog::debug("get_user_uuid: found");
result = uuid_val; // your helper that converts UUID pair → string
} }
); );