holy shit it works

This commit is contained in:
2025-12-18 18:18:52 -06:00
parent e999beb8b6
commit d305a96ca3
3 changed files with 928 additions and 255 deletions

View File

@@ -18,41 +18,109 @@
#include <unistd.h> #include <unistd.h>
#include <utility> #include <utility>
void batched_update(std::shared_ptr<clickhouse::Client> client_ptr, std::string base_uri){
std::vector<database_utils::UserRecord> current_users = database_utils::get_all_users(client_ptr);
void batched_update(std::shared_ptr<clickhouse::Client> client_ptr, std::string base_uri) {
const size_t batch_size = size_t {5}; spdlog::info("Starting batched update cycle");
std::vector<database_utils::UserRecord> current_users = database_utils::get_all_users(client_ptr);
for (size_t base_iteration = size_t{0}; current_users.size(); base_iteration+=batch_size) { if (current_users.empty()) {
std::vector<std::future<bool>> futures_for_this_batch = std::vector<std::future<bool>>(); spdlog::warn("No users to update");
return;
for (size_t j =base_iteration; j< base_iteration + batch_size && j<current_users.size(); j++) { }
spdlog::debug("Created future: {}", j); const size_t batch_size = 5;
futures_for_this_batch.push_back(std::async(std::launch::async, [current_record = current_users.at(j), base_uri, client_ptr] () { for (size_t base_iteration = 0; base_iteration < current_users.size(); base_iteration += batch_size) {
std::vector<std::future<void>> futures_for_this_batch;
api_utils::GradesResponse grades = api_methods::get_grades(base_uri, current_record.login.username, current_record.login.password); for (size_t j = base_iteration; j < base_iteration + batch_size && j < current_users.size(); j++) {
std::optional<database_utils::GradeSnapshot> past_record_unique = database_utils::load_latest_grades(client_ptr, current_record.user_id); spdlog::debug("Creating async task for user index: {}", j);
bool is_unique = database_utils::conditionally_insert_grades(client_ptr, current_record.user_id, grades);
if(!is_unique) return is_unique; futures_for_this_batch.push_back(std::async(std::launch::async,
if(!past_record_unique.has_value()) return is_unique; [current_record = current_users.at(j), base_uri, client_ptr]() {
std::vector<database_utils::AssignmentDiff> diffs = database_utils::diff_grade_responses(past_record_unique->response, grades);
std::optional<database_utils::GradeSnapshot> current_record_unique = database_utils::load_latest_grades(client_ptr, current_record.user_id); try {
if(!current_record_unique.has_value()) return is_unique; spdlog::info("Processing user: {}", current_record.login.username);
database_utils::insert_grade_updates(client_ptr, current_record.user_id, past_record_unique->response_id, current_record_unique->response_id, diffs); api_utils::GradesResponse new_grades = api_methods::get_grades(
spdlog::info("Fully inserted all diffs for user {}", current_record.user_id); base_uri,
return is_unique; current_record.login.username,
})); current_record.login.password
);
if (!new_grades.success) {
spdlog::error("Failed to fetch grades for user {}", current_record.user_id);
return;
}
bool has_changes = database_utils::has_changes(
client_ptr,
current_record.user_id,
new_grades
);
if (!has_changes) {
spdlog::info("No changes detected for user {}", current_record.user_id);
return;
}
spdlog::info("Changes detected for user {}, inserting snapshot", current_record.user_id);
std::optional<database_utils::GradeSnapshot> old_snapshot =
database_utils::load_latest_snapshot(client_ptr, current_record.user_id);
std::string new_response_id = database_utils::insert_grade_snapshot(
client_ptr,
current_record.user_id,
new_grades
);
if (new_response_id.empty()) {
spdlog::error("Failed to insert snapshot for user {}", current_record.user_id);
return;
}
if (!old_snapshot.has_value()) {
spdlog::info("First snapshot for user {}, no diffs to compute", current_record.user_id);
return;
}
std::optional<database_utils::GradeSnapshot> new_snapshot =
database_utils::load_snapshot_by_id(client_ptr, new_response_id);
if (!new_snapshot.has_value()) {
spdlog::error("Failed to load newly inserted snapshot for user {}", current_record.user_id);
return;
}
std::vector<database_utils::AssignmentDiff> diffs =
database_utils::diff_snapshots(old_snapshot.value(), new_snapshot.value());
if (diffs.empty()) {
spdlog::warn("Changes detected but no diffs found for user {} - this shouldn't happen",
current_record.user_id);
return;
}
database_utils::insert_grade_updates(
client_ptr,
current_record.user_id,
old_snapshot->response_id,
new_response_id,
diffs
);
spdlog::info("Successfully processed {} grade updates for user {}",
diffs.size(), current_record.user_id);
} catch (const std::exception& e) {
spdlog::error("Exception processing user {}: {}",
current_record.user_id, e.what());
} catch (...) {
spdlog::error("Unknown exception processing user {}", current_record.user_id);
}
}));
}
for (auto& future : futures_for_this_batch) {
future.get();
}
spdlog::info("Batch complete: processed {} users", futures_for_this_batch.size());
} }
for(std::future<bool>& future : futures_for_this_batch){ spdlog::info("Batched update cycle complete: processed {} total users", current_users.size());
spdlog::info("update logged: {}",future.get());
}
}
} }
int main (int argc, char *argv[]) { int main (int argc, char *argv[]) {
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::debug);
auto config = toml::parse_file("config.toml"); auto config = toml::parse_file("config.toml");
@@ -83,9 +151,9 @@ int main (int argc, char *argv[]) {
std::shared_ptr<clickhouse::Client> client_shared_ptr = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetHost(clickhouse_host_name).SetPassword(clickhouse_password).SetUser(clickhouse_username).SetDefaultDatabase(clickhouse_schema)); std::shared_ptr<clickhouse::Client> client_shared_ptr = std::make_shared<clickhouse::Client>(clickhouse::ClientOptions().SetHost(clickhouse_host_name).SetPassword(clickhouse_password).SetUser(clickhouse_username).SetDefaultDatabase(clickhouse_schema));
// database_utils::register_user(client_shared_ptr, test_username, test_password); //database_utils::register_user(client_shared_ptr, test_username, test_password);
// spdlog::info("USER UUID: {}", database_utils::uuid_to_string( database_utils::get_user_uuid(client_shared_ptr, test_username).value() )); //spdlog::info("USER UUID: {}", database_utils::uuid_to_string( database_utils::get_user_uuid(client_shared_ptr, test_username).value() ));
spdlog::info("Connected to the database"); spdlog::info("Connected to the database");

View File

@@ -1,18 +1,19 @@
#include "types.hpp" #include "types.hpp"
#include "clickhouse/base/uuid.h" #include "clickhouse/base/uuid.h"
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <unordered_map> #include <unordered_map>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <stdexcept> #include <stdexcept>
#include <ctime>
using namespace clickhouse; using namespace clickhouse;
namespace database_utils { namespace database_utils {
// ---------------- UUID helpers ---------------- // ============================================================================
// UUID Helpers
// ============================================================================
clickhouse::UUID parse_uuid(const std::string& str) { clickhouse::UUID parse_uuid(const std::string& str) {
spdlog::debug("parse_uuid: input={}", str); spdlog::debug("parse_uuid: input={}", str);
@@ -56,28 +57,53 @@ std::string uuid_to_string(const clickhouse::UUID& u) {
s.substr(20, 12); s.substr(20, 12);
} }
// ---------------- Users ---------------- // ============================================================================
// Date Parsing Helper
// ============================================================================
uint16_t parse_date_to_clickhouse(const std::string& date_str) {
if (date_str.empty()) {
return 0; // Epoch date
}
// Try to parse YYYY-MM-DD format
std::tm tm = {};
std::istringstream ss(date_str);
ss >> std::get_time(&tm, "%Y-%m-%d");
if (ss.fail()) {
spdlog::warn("Failed to parse date: {}, using epoch", date_str);
return 0;
}
std::time_t time = std::mktime(&tm);
// ClickHouse Date is days since 1970-01-01
return static_cast<uint16_t>(time / 86400);
}
// ============================================================================
// User Operations
// ============================================================================
std::vector<UserRecord> get_all_users(const CHClient& client) { std::vector<UserRecord> get_all_users(const CHClient& client) {
spdlog::debug("get_all_users"); spdlog::debug("get_all_users");
std::vector<UserRecord> out; std::vector<UserRecord> out;
client->Select( client->Select(
"SELECT user_id, username, password FROM users", "SELECT user_id, username, password FROM users",
[&](const Block& b) { [&](const Block& b) {
spdlog::debug("get_all_users: rows={}", b.GetRowCount()); spdlog::debug("get_all_users: batch rows={}", b.GetRowCount());
for (size_t i = 0; i < b.GetRowCount(); ++i) { for (size_t i = 0; i < b.GetRowCount(); ++i) {
UserRecord u; UserRecord u;
u.user_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i)); u.user_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i));
u.login.username = b[1]->As<ColumnString>()->At(i); u.login.username = std::string(b[1]->As<ColumnString>()->At(i));
u.login.password = b[2]->As<ColumnString>()->At(i); u.login.password = std::string(b[2]->As<ColumnString>()->At(i));
out.push_back(std::move(u)); out.push_back(std::move(u));
} }
} }
); );
spdlog::info("get_all_users: total={}", out.size());
return out; return out;
} }
@@ -97,6 +123,7 @@ bool register_user(const CHClient& client,
b.AppendColumn("password", p); b.AppendColumn("password", p);
client->Insert("users", b); client->Insert("users", b);
spdlog::info("register_user: success");
return true; return true;
} }
@@ -106,198 +133,680 @@ bool authenticate_user(const CHClient& client,
spdlog::debug("authenticate_user: username={}", username); spdlog::debug("authenticate_user: username={}", username);
bool ok = false; bool ok = false;
client->Select( client->Select(
"SELECT count() FROM users " "SELECT count() FROM users "
"WHERE username = '" + username + "WHERE username = '" + username +
"' AND password = '" + password + "'", "' AND password = '" + password + "'",
[&](const Block& b) { [&](const Block& b) {
ok = b[0]->As<ColumnUInt64>()->At(0) > 0; ok = b[0]->As<ColumnUInt64>()->At(0) > 0;
spdlog::debug("authenticate_user result={}", ok);
} }
); );
spdlog::debug("authenticate_user: result={}", ok);
return ok; return ok;
} }
// ---------------- Load latest snapshot ---------------- std::optional<clickhouse::UUID> get_user_uuid(const CHClient& client,
const std::string& username) {
std::optional<GradeSnapshot>
load_latest_grades(const CHClient& client,
const std::string& user_id) {
spdlog::debug("load_latest_grades: user_id={}", user_id);
GradeSnapshot snap;
bool found = false;
client->Select(
"SELECT response_id, success, total_classes "
"FROM grade_responses "
"WHERE user_id = '" + user_id +
"' ORDER BY fetched_at DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() == 0) {
spdlog::debug("load_latest_grades: no previous snapshot");
return;
}
found = true;
snap.response_id =
uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
snap.response.success =
b[1]->As<ColumnUInt8>()->At(0);
snap.response.totalClasses =
b[2]->As<ColumnInt32>()->At(0);
}
);
if (!found)
return std::nullopt;
return snap;
}
// ---------------- Diff ----------------
std::vector<AssignmentDiff>
diff_grade_responses(const api_utils::GradesResponse& old_resp,
const api_utils::GradesResponse& new_resp) {
spdlog::debug("diff_grade_responses");
std::unordered_map<std::string,
api_utils::AssignmentGrade> old_map;
for (const auto& c : old_resp.grades)
for (const auto& g : c.grades)
old_map[c.className + "::" + g.name] = g;
std::vector<AssignmentDiff> diffs;
for (const auto& c : new_resp.grades) {
for (const auto& g : c.grades) {
std::string key = c.className + "::" + g.name;
auto it = old_map.find(key);
if (it == old_map.end()) {
diffs.push_back({"", "", std::nullopt, g});
continue;
}
if (it->second.score != g.score ||
it->second.attempts != g.attempts) {
diffs.push_back({"", "", it->second, g});
}
}
}
spdlog::debug("diff_grade_responses: diffs={}", diffs.size());
return diffs;
}
// ---------------- Conditional insert ----------------
bool conditionally_insert_grades(const CHClient& client,
const std::string& user_id,
const api_utils::GradesResponse& new_resp) {
spdlog::debug("conditionally_insert_grades: user_id={}", user_id);
auto old = load_latest_grades(client, user_id);
if (!old) {
spdlog::debug(
"No previous snapshot — MUST insert first snapshot");
return true;
}
bool changed =
!diff_grade_responses(old->response, new_resp).empty();
spdlog::debug("conditionally_insert_grades: changed={}", changed);
return changed;
}
// ---------------- Grade updates ----------------
void insert_grade_updates(const CHClient& client,
const std::string& user_id_str,
const std::string& old_response_id_str,
const std::string& new_response_id_str,
const std::vector<AssignmentDiff>& diffs) {
spdlog::debug("insert_grade_updates: diffs={}", diffs.size());
if (diffs.empty()) {
spdlog::debug("insert_grade_updates: nothing to insert");
return;
}
Query query(
"INSERT INTO grade_updates "
"(user_id, old_response_id, new_response_id, "
"class_grade_id, assignment_id, assignment_name, "
"old_score, new_score, old_attempts, new_attempts) "
"VALUES "
"({user_id:String}, {old_response_id:String}, "
"{new_response_id:String}, {class_grade_id:String}, "
"{assignment_id:String}, {assignment_name:String}, "
"{old_score:Nullable(String)}, {new_score:String}, "
"{old_attempts:Nullable(String)}, {new_attempts:String})"
);
for (const auto& d : diffs) {
query.SetParam("user_id", user_id_str);
query.SetParam("old_response_id", old_response_id_str);
query.SetParam("new_response_id", new_response_id_str);
query.SetParam("class_grade_id", d.class_grade_id);
query.SetParam("assignment_id", d.assignment_id);
query.SetParam("assignment_name", d.new_grade.name);
if (d.old_grade) {
query.SetParam("old_score",
std::to_string(d.old_grade->score));
query.SetParam("old_attempts",
d.old_grade->attempts);
} else {
query.SetParam("old_score", QueryParamValue());
query.SetParam("old_attempts", QueryParamValue());
}
query.SetParam("new_score",
std::to_string(d.new_grade.score));
query.SetParam("new_attempts",
d.new_grade.attempts);
client->Execute(query);
}
spdlog::debug("insert_grade_updates: done");
}
// ---------------- Lookup user UUID ----------------
std::optional<clickhouse::UUID>
get_user_uuid(const CHClient& client,
const std::string& username) {
spdlog::debug("get_user_uuid: username={}", username); spdlog::debug("get_user_uuid: username={}", username);
std::optional<clickhouse::UUID> result; std::optional<clickhouse::UUID> result;
client->Select( client->Select(
"SELECT user_id FROM users " "SELECT user_id FROM users WHERE username = '" + username + "' LIMIT 1",
"WHERE username = '" + username + "' LIMIT 1",
[&](const Block& b) { [&](const Block& b) {
if (b.GetRowCount() == 0) { if (b.GetRowCount() > 0) {
spdlog::debug("get_user_uuid: not found"); result = b[0]->As<ColumnUUID>()->At(0);
return;
} }
result = b[0]->As<ColumnUUID>()->At(0);
spdlog::debug("get_user_uuid: found");
} }
); );
return result; return result;
} }
} // namespace database_utils // ============================================================================
// Get or Create Stable IDs
// ============================================================================
std::string get_or_create_class(const CHClient& client,
const std::string& user_id,
const api_utils::ClassGrades& class_data) {
spdlog::debug("get_or_create_class: user={}, class={}", user_id, class_data.className);
// Try to find existing class
std::string class_id;
client->Select(
"SELECT class_id FROM user_classes "
"WHERE user_id = '" + user_id + "' "
"AND class_name = '" + class_data.className + "' "
"LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
class_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
if (!class_id.empty()) {
// Update last_seen
Block update_block;
auto user_col = std::make_shared<ColumnUUID>();
auto name_col = std::make_shared<ColumnString>();
auto teacher_col = std::make_shared<ColumnString>();
auto period_col = std::make_shared<ColumnString>();
auto category_col = std::make_shared<ColumnString>();
user_col->Append(parse_uuid(user_id));
name_col->Append(class_data.className);
teacher_col->Append(class_data.teacher);
period_col->Append(class_data.period);
category_col->Append(class_data.category);
update_block.AppendColumn("user_id", user_col);
update_block.AppendColumn("class_name", name_col);
update_block.AppendColumn("teacher", teacher_col);
update_block.AppendColumn("period", period_col);
update_block.AppendColumn("category", category_col);
client->Insert("user_classes", update_block);
spdlog::debug("Updated existing class: {}", class_id);
return class_id;
}
// Create new class
Block insert_block;
auto user_col = std::make_shared<ColumnUUID>();
auto name_col = std::make_shared<ColumnString>();
auto teacher_col = std::make_shared<ColumnString>();
auto period_col = std::make_shared<ColumnString>();
auto category_col = std::make_shared<ColumnString>();
user_col->Append(parse_uuid(user_id));
name_col->Append(class_data.className);
teacher_col->Append(class_data.teacher);
period_col->Append(class_data.period);
category_col->Append(class_data.category);
insert_block.AppendColumn("user_id", user_col);
insert_block.AppendColumn("class_name", name_col);
insert_block.AppendColumn("teacher", teacher_col);
insert_block.AppendColumn("period", period_col);
insert_block.AppendColumn("category", category_col);
client->Insert("user_classes", insert_block);
// Retrieve the created class_id
client->Select(
"SELECT class_id FROM user_classes "
"WHERE user_id = '" + user_id + "' "
"AND class_name = '" + class_data.className + "' "
"ORDER BY first_seen DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
class_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
spdlog::info("Created new class: {} -> {}", class_data.className, class_id);
return class_id;
}
std::string get_or_create_assignment(const CHClient& client,
const std::string& user_id,
const std::string& class_id,
const api_utils::AssignmentGrade& assignment_data) {
spdlog::debug("get_or_create_assignment: class={}, assignment={}",
class_id, assignment_data.name);
// Try to find existing assignment
std::string assignment_id;
client->Select(
"SELECT assignment_id FROM user_assignments "
"WHERE user_id = '" + user_id + "' "
"AND class_id = '" + class_id + "' "
"AND assignment_name = '" + assignment_data.name + "' "
"LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
assignment_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
if (!assignment_id.empty()) {
// Update last_seen
Block update_block;
auto class_col = std::make_shared<ColumnUUID>();
auto user_col = std::make_shared<ColumnUUID>();
auto name_col = std::make_shared<ColumnString>();
auto date_col = std::make_shared<ColumnDate>();
auto major_col = std::make_shared<ColumnUInt8>();
class_col->Append(parse_uuid(class_id));
user_col->Append(parse_uuid(user_id));
name_col->Append(assignment_data.name);
date_col->Append(parse_date_to_clickhouse(assignment_data.dueDate));
major_col->Append(assignment_data.isMajorGrade ? 1 : 0);
update_block.AppendColumn("class_id", class_col);
update_block.AppendColumn("user_id", user_col);
update_block.AppendColumn("assignment_name", name_col);
update_block.AppendColumn("due_date", date_col);
update_block.AppendColumn("is_major_grade", major_col);
client->Insert("user_assignments", update_block);
spdlog::debug("Updated existing assignment: {}", assignment_id);
return assignment_id;
}
// Create new assignment
Block insert_block;
auto class_col = std::make_shared<ColumnUUID>();
auto user_col = std::make_shared<ColumnUUID>();
auto name_col = std::make_shared<ColumnString>();
auto date_col = std::make_shared<ColumnDate>();
auto major_col = std::make_shared<ColumnUInt8>();
class_col->Append(parse_uuid(class_id));
user_col->Append(parse_uuid(user_id));
name_col->Append(assignment_data.name);
date_col->Append(parse_date_to_clickhouse(assignment_data.dueDate));
major_col->Append(assignment_data.isMajorGrade ? 1 : 0);
insert_block.AppendColumn("class_id", class_col);
insert_block.AppendColumn("user_id", user_col);
insert_block.AppendColumn("assignment_name", name_col);
insert_block.AppendColumn("due_date", date_col);
insert_block.AppendColumn("is_major_grade", major_col);
client->Insert("user_assignments", insert_block);
// Retrieve the created assignment_id
client->Select(
"SELECT assignment_id FROM user_assignments "
"WHERE user_id = '" + user_id + "' "
"AND class_id = '" + class_id + "' "
"AND assignment_name = '" + assignment_data.name + "' "
"ORDER BY first_seen DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
assignment_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
spdlog::info("Created new assignment: {} -> {}", assignment_data.name, assignment_id);
return assignment_id;
}
// ============================================================================
// Insert Grade Snapshot
// ============================================================================
std::string insert_grade_snapshot(const CHClient& client,
const std::string& user_id,
const api_utils::GradesResponse& api_response) {
spdlog::info("insert_grade_snapshot: user={}", user_id);
// 1. Insert grade_response
Block response_block;
auto resp_user_col = std::make_shared<ColumnUUID>();
auto success_col = std::make_shared<ColumnUInt8>();
auto total_col = std::make_shared<ColumnInt32>();
resp_user_col->Append(parse_uuid(user_id));
success_col->Append(api_response.success ? 1 : 0);
total_col->Append(api_response.totalClasses);
response_block.AppendColumn("user_id", resp_user_col);
response_block.AppendColumn("success", success_col);
response_block.AppendColumn("total_classes", total_col);
client->Insert("grade_responses", response_block);
spdlog::debug("Inserted grade_response");
// 2. Get the response_id
std::string response_id;
client->Select(
"SELECT response_id FROM grade_responses "
"WHERE user_id = '" + user_id + "' "
"ORDER BY fetched_at DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
response_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
if (response_id.empty()) {
spdlog::error("Failed to retrieve response_id after insert");
return "";
}
spdlog::debug("Got response_id: {}", response_id);
// 3. Process each class and its assignments
std::vector<std::string> class_ids;
for (const auto& class_data : api_response.grades) {
// Get or create class
std::string class_id = get_or_create_class(client, user_id, class_data);
if (class_id.empty()) {
spdlog::error("Failed to get/create class: {}", class_data.className);
continue;
}
class_ids.push_back(class_id);
// Link response to class
Block rc_block;
auto rc_response_col = std::make_shared<ColumnUUID>();
auto rc_user_col = std::make_shared<ColumnUUID>();
auto rc_class_col = std::make_shared<ColumnUUID>();
rc_response_col->Append(parse_uuid(response_id));
rc_user_col->Append(parse_uuid(user_id));
rc_class_col->Append(parse_uuid(class_id));
rc_block.AppendColumn("response_id", rc_response_col);
rc_block.AppendColumn("user_id", rc_user_col);
rc_block.AppendColumn("class_id", rc_class_col);
client->Insert("response_classes", rc_block);
// Process assignments
if (class_data.grades.empty()) continue;
Block grade_history_block;
auto gh_grade_resp_col = std::make_shared<ColumnUUID>();
auto gh_user_col = std::make_shared<ColumnUUID>();
auto gh_assign_col = std::make_shared<ColumnUUID>();
auto gh_score_col = std::make_shared<ColumnFloat64>();
auto gh_attempts_col = std::make_shared<ColumnString>();
for (const auto& assignment : class_data.grades) {
// Get or create assignment
std::string assignment_id = get_or_create_assignment(
client, user_id, class_id, assignment
);
if (assignment_id.empty()) {
spdlog::error("Failed to get/create assignment: {}", assignment.name);
continue;
}
// Add to grade history batch
gh_grade_resp_col->Append(parse_uuid(response_id));
gh_user_col->Append(parse_uuid(user_id));
gh_assign_col->Append(parse_uuid(assignment_id));
gh_score_col->Append(assignment.score);
gh_attempts_col->Append(assignment.attempts);
}
// Batch insert grade history for this class
if (gh_assign_col->Size() > 0) {
grade_history_block.AppendColumn("response_id", gh_grade_resp_col);
grade_history_block.AppendColumn("user_id", gh_user_col);
grade_history_block.AppendColumn("assignment_id", gh_assign_col);
grade_history_block.AppendColumn("score", gh_score_col);
grade_history_block.AppendColumn("attempts", gh_attempts_col);
client->Insert("assignment_grade_history", grade_history_block);
spdlog::debug("Inserted {} grade history records for class {}",
gh_assign_col->Size(), class_data.className);
}
}
spdlog::info("Successfully inserted complete snapshot: response_id={}", response_id);
return response_id;
}
// ============================================================================
// Load Snapshots
// ============================================================================
std::optional<GradeSnapshot> load_latest_snapshot(const CHClient& client,
const std::string& user_id) {
spdlog::debug("load_latest_snapshot: user={}", user_id);
// Get latest response_id
std::string response_id;
client->Select(
"SELECT response_id FROM grade_responses "
"WHERE user_id = '" + user_id + "' "
"ORDER BY fetched_at DESC LIMIT 1",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
response_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
if (response_id.empty()) {
spdlog::debug("No snapshot found for user");
return std::nullopt;
}
return load_snapshot_by_id(client, response_id);
}
std::optional<GradeSnapshot> load_snapshot_by_id(const CHClient& client,
const std::string& response_id) {
spdlog::debug("load_snapshot_by_id: response={}", response_id);
GradeSnapshot snapshot;
snapshot.response_id = response_id;
// Get user_id from response
client->Select(
"SELECT user_id FROM grade_responses WHERE response_id = '" + response_id + "'",
[&](const Block& b) {
if (b.GetRowCount() > 0) {
snapshot.user_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(0));
}
}
);
if (snapshot.user_id.empty()) {
spdlog::error("Response not found: {}", response_id);
return std::nullopt;
}
// Load all classes for this response
client->Select(
"SELECT c.class_id, c.user_id, c.class_name, c.teacher, c.period, c.category "
"FROM user_classes c "
"INNER JOIN response_classes rc ON c.class_id = rc.class_id "
"WHERE rc.response_id = '" + response_id + "'",
[&](const Block& b) {
for (size_t i = 0; i < b.GetRowCount(); ++i) {
ClassRecord cls;
cls.class_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i));
cls.user_id = uuid_to_string(b[1]->As<ColumnUUID>()->At(i));
cls.class_name = std::string(b[2]->As<ColumnString>()->At(i));
cls.teacher = std::string(b[3]->As<ColumnString>()->At(i));
cls.period = std::string(b[4]->As<ColumnString>()->At(i));
cls.category = std::string(b[5]->As<ColumnString>()->At(i));
snapshot.classes[cls.class_name] = cls;
}
}
);
// Load all assignments for these classes
std::vector<std::string> class_ids;
for (const auto& [name, cls] : snapshot.classes) {
class_ids.push_back(cls.class_id);
}
if (!class_ids.empty()) {
// Build IN clause
std::string in_clause = "(";
for (size_t i = 0; i < class_ids.size(); ++i) {
if (i > 0) in_clause += ",";
in_clause += "'" + class_ids[i] + "'";
}
in_clause += ")";
client->Select(
"SELECT a.assignment_id, a.class_id, a.user_id, a.assignment_name, "
"c.class_name "
"FROM user_assignments a "
"INNER JOIN user_classes c ON a.class_id = c.class_id "
"WHERE a.class_id IN " + in_clause,
[&](const Block& b) {
for (size_t i = 0; i < b.GetRowCount(); ++i) {
AssignmentRecord assign;
assign.assignment_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i));
assign.class_id = uuid_to_string(b[1]->As<ColumnUUID>()->At(i));
assign.user_id = uuid_to_string(b[2]->As<ColumnUUID>()->At(i));
assign.assignment_name = std::string(b[3]->As<ColumnString>()->At(i));
std::string class_name = std::string(b[4]->As<ColumnString>()->At(i));
std::string key = make_assignment_key(class_name, assign.assignment_name);
snapshot.assignments[key] = assign;
}
}
);
}
// Load all grades for this response
client->Select(
"SELECT grade_id, assignment_id, score, attempts "
"FROM assignment_grade_history "
"WHERE response_id = '" + response_id + "'",
[&](const Block& b) {
for (size_t i = 0; i < b.GetRowCount(); ++i) {
GradeRecord grade;
grade.grade_id = uuid_to_string(b[0]->As<ColumnUUID>()->At(i));
grade.assignment_id = uuid_to_string(b[1]->As<ColumnUUID>()->At(i));
grade.score = b[2]->As<ColumnFloat64>()->At(i);
grade.attempts = std::string(b[3]->As<ColumnString>()->At(i));
snapshot.grades[grade.assignment_id] = grade;
}
}
);
spdlog::info("Loaded snapshot: {} classes, {} assignments, {} grades",
snapshot.classes.size(), snapshot.assignments.size(), snapshot.grades.size());
return snapshot;
}
// ============================================================================
// Diff Operations
// ============================================================================
bool has_changes(const CHClient& client,
const std::string& user_id,
const api_utils::GradesResponse& new_api_response) {
spdlog::debug("has_changes: user={}", user_id);
auto old_snapshot = load_latest_snapshot(client, user_id);
if (!old_snapshot) {
spdlog::debug("No previous snapshot - changes detected");
return true;
}
// Build map of new grades
std::unordered_map<std::string, std::pair<double, std::string>> new_grades;
for (const auto& cls : new_api_response.grades) {
for (const auto& assignment : cls.grades) {
std::string key = make_assignment_key(cls.className, assignment.name);
new_grades[key] = {assignment.score, assignment.attempts};
}
}
// Compare with old snapshot
for (const auto& [key, assign_record] : old_snapshot->assignments) {
auto it = new_grades.find(key);
// Assignment removed?
if (it == new_grades.end()) {
spdlog::debug("Assignment removed: {}", key);
return true;
}
// Grade changed?
auto old_grade_it = old_snapshot->grades.find(assign_record.assignment_id);
if (old_grade_it != old_snapshot->grades.end()) {
const auto& old_grade = old_grade_it->second;
const auto& [new_score, new_attempts] = it->second;
if (old_grade.score != new_score || old_grade.attempts != new_attempts) {
spdlog::debug("Grade changed: {}", key);
return true;
}
}
}
// Check for new assignments
for (const auto& [key, _] : new_grades) {
if (old_snapshot->assignments.find(key) == old_snapshot->assignments.end()) {
spdlog::debug("New assignment: {}", key);
return true;
}
}
spdlog::debug("No changes detected");
return false;
}
std::vector<AssignmentDiff> diff_snapshots(const GradeSnapshot& old_snapshot,
const GradeSnapshot& new_snapshot) {
spdlog::debug("diff_snapshots");
std::vector<AssignmentDiff> diffs;
// Check for new and updated assignments
for (const auto& [key, new_assign] : new_snapshot.assignments) {
auto old_it = old_snapshot.assignments.find(key);
// New assignment
if (old_it == old_snapshot.assignments.end()) {
auto grade_it = new_snapshot.grades.find(new_assign.assignment_id);
if (grade_it != new_snapshot.grades.end()) {
AssignmentDiff diff;
diff.assignment_id = new_assign.assignment_id;
diff.class_name = key.substr(0, key.find("::"));
diff.assignment_name = new_assign.assignment_name;
diff.old_grade = std::nullopt;
diff.new_grade = grade_it->second;
diff.change_type = AssignmentDiff::NEW;
diffs.push_back(diff);
}
continue;
}
// Check if grade changed
auto old_grade_it = old_snapshot.grades.find(old_it->second.assignment_id);
auto new_grade_it = new_snapshot.grades.find(new_assign.assignment_id);
if (old_grade_it != old_snapshot.grades.end() &&
new_grade_it != new_snapshot.grades.end()) {
const auto& old_grade = old_grade_it->second;
const auto& new_grade = new_grade_it->second;
if (old_grade.score != new_grade.score ||
old_grade.attempts != new_grade.attempts) {
AssignmentDiff diff;
diff.assignment_id = new_assign.assignment_id;
diff.class_name = key.substr(0, key.find("::"));
diff.assignment_name = new_assign.assignment_name;
diff.old_grade = old_grade;
diff.new_grade = new_grade;
diff.change_type = AssignmentDiff::UPDATED;
diffs.push_back(diff);
}
}
}
// Check for removed assignments
for (const auto& [key, old_assign] : old_snapshot.assignments) {
if (new_snapshot.assignments.find(key) == new_snapshot.assignments.end()) {
auto old_grade_it = old_snapshot.grades.find(old_assign.assignment_id);
if (old_grade_it != old_snapshot.grades.end()) {
AssignmentDiff diff;
diff.assignment_id = old_assign.assignment_id;
diff.class_name = key.substr(0, key.find("::"));
diff.assignment_name = old_assign.assignment_name;
diff.old_grade = old_grade_it->second;
diff.new_grade = GradeRecord{}; // Empty
diff.change_type = AssignmentDiff::REMOVED;
diffs.push_back(diff);
}
}
}
spdlog::info("diff_snapshots: found {} changes", diffs.size());
return diffs;
}
// ============================================================================
// Insert Grade Updates
// ============================================================================
void insert_grade_updates(const CHClient& client,
const std::string& user_id,
const std::string& old_response_id,
const std::string& new_response_id,
const std::vector<AssignmentDiff>& diffs) {
spdlog::debug("insert_grade_updates: diffs={}", diffs.size());
if (diffs.empty()) {
spdlog::debug("No grade updates to insert");
return;
}
Block block;
auto user_col = std::make_shared<ColumnUUID>();
auto assign_col = std::make_shared<ColumnUUID>();
auto old_resp_col = std::make_shared<ColumnUUID>();
auto new_resp_col = std::make_shared<ColumnUUID>();
// Create nullable columns with proper nested types
auto old_score_nested = std::make_shared<ColumnFloat64>();
auto old_score_col = std::make_shared<ColumnNullable>(old_score_nested, std::make_shared<ColumnUInt8>());
auto new_score_col = std::make_shared<ColumnFloat64>();
auto old_attempts_nested = std::make_shared<ColumnString>();
auto old_attempts_col = std::make_shared<ColumnNullable>(old_attempts_nested, std::make_shared<ColumnUInt8>());
auto new_attempts_col = std::make_shared<ColumnString>();
auto change_type_col = std::make_shared<ColumnInt8>();
for (const auto& diff : diffs) {
user_col->Append(parse_uuid(user_id));
assign_col->Append(parse_uuid(diff.assignment_id));
old_resp_col->Append(parse_uuid(old_response_id));
new_resp_col->Append(parse_uuid(new_response_id));
if (diff.old_grade) {
old_score_nested->Append(diff.old_grade->score);
std::static_pointer_cast<ColumnUInt8>(old_score_col->Nulls())->Append(uint8_t(0)); // 0 = not null
old_attempts_nested->Append(diff.old_grade->attempts);
std::static_pointer_cast<ColumnUInt8>(old_attempts_col->Nulls())->Append(uint8_t(0)); // 0 = not null
} else {
old_score_nested->Append(0.0); // Dummy value
std::static_pointer_cast<ColumnUInt8>(old_score_col->Nulls())->Append(uint8_t(1)); // 1 = null
old_attempts_nested->Append(""); // Dummy value
std::static_pointer_cast<ColumnUInt8>(old_attempts_col->Nulls())->Append(uint8_t(1)); // 1 = null
}
if (diff.change_type != AssignmentDiff::REMOVED) {
new_score_col->Append(diff.new_grade.score);
new_attempts_col->Append(diff.new_grade.attempts);
} else {
new_score_col->Append(0.0); // Placeholder for removed
new_attempts_col->Append("");
}
// Map enum to int8
int8_t change_val = 0;
switch (diff.change_type) {
case AssignmentDiff::NEW: change_val = 1; break;
case AssignmentDiff::UPDATED: change_val = 2; break;
case AssignmentDiff::REMOVED: change_val = 3; break;
}
change_type_col->Append(change_val);
}
block.AppendColumn("user_id", user_col);
block.AppendColumn("assignment_id", assign_col);
block.AppendColumn("old_response_id", old_resp_col);
block.AppendColumn("new_response_id", new_resp_col);
block.AppendColumn("old_score", old_score_col);
block.AppendColumn("new_score", new_score_col);
block.AppendColumn("old_attempts", old_attempts_col);
block.AppendColumn("new_attempts", new_attempts_col);
block.AppendColumn("change_type", change_type_col);
client->Insert("grade_updates", block);
spdlog::info("Inserted {} grade updates", diffs.size());
}
} // namespace database_utils

View File

@@ -1,95 +1,143 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_map>
#include <clickhouse/client.h> #include <clickhouse/client.h>
#include "clickhouse/base/uuid.h" #include "clickhouse/base/uuid.h"
#include "skwyward-api-utils.hpp" #include "skwyward-api-utils.hpp"
namespace database_utils { namespace database_utils {
// ============================================================================
// UUID Helpers
// ============================================================================
clickhouse::UUID parse_uuid(const std::string& str); clickhouse::UUID parse_uuid(const std::string& str);
std::string uuid_to_string(const clickhouse::UUID& u); std::string uuid_to_string(const clickhouse::UUID& u);
// ---------- DB Handle ---------- // ============================================================================
// DB Handle
// ============================================================================
using CHClient = std::shared_ptr<clickhouse::Client>; using CHClient = std::shared_ptr<clickhouse::Client>;
// ---------- User ---------- // ============================================================================
// User
// ============================================================================
struct UserRecord { struct UserRecord {
std::string user_id; // UUID std::string user_id;
api_utils::Login login; api_utils::Login login;
}; };
// ---------- Snapshot ---------- // ============================================================================
// Stable Entity Records (from database)
// ============================================================================
struct ClassRecord {
std::string class_id;
std::string user_id;
std::string class_name;
std::string teacher;
std::string period;
std::string category;
};
struct AssignmentRecord {
std::string assignment_id;
std::string class_id;
std::string user_id;
std::string assignment_name;
std::string due_date;
bool is_major_grade;
};
struct GradeRecord {
std::string grade_id;
std::string assignment_id;
double score;
std::string attempts;
};
// ============================================================================
// Complete Snapshot (loaded from database with all data)
// ============================================================================
struct GradeSnapshot { struct GradeSnapshot {
std::string response_id; std::string response_id;
api_utils::GradesResponse response; std::string user_id;
// Map: "class_name" -> ClassRecord
std::unordered_map<std::string, ClassRecord> classes;
// Map: "class_name::assignment_name" -> AssignmentRecord
std::unordered_map<std::string, AssignmentRecord> assignments;
// Map: assignment_id -> GradeRecord
std::unordered_map<std::string, GradeRecord> grades;
}; };
// ---------- Assignment Diff ---------- // ============================================================================
// Assignment Diff (for grade_updates table)
// ============================================================================
struct AssignmentDiff { struct AssignmentDiff {
std::string class_grade_id; std::string assignment_id; // Stable assignment ID
std::string assignment_id; std::string class_name; // For logging
std::string assignment_name; // For logging
std::optional<api_utils::AssignmentGrade> old_grade; std::optional<GradeRecord> old_grade;
api_utils::AssignmentGrade new_grade; GradeRecord new_grade;
enum ChangeType { NEW, UPDATED, REMOVED } change_type;
}; };
// ---------- User ops ---------- // ============================================================================
// User Operations
// ============================================================================
std::vector<UserRecord> get_all_users(const CHClient& client);
std::vector<UserRecord> bool register_user(
get_all_users(const CHClient& client);
bool
register_user(
const CHClient& client, const CHClient& client,
const std::string& username, const std::string& username,
const std::string& password const std::string& password
); );
bool bool authenticate_user(
authenticate_user(
const CHClient& client, const CHClient& client,
const std::string& username, const std::string& username,
const std::string& password const std::string& password
); );
// ---------- Grades ---------- std::optional<clickhouse::UUID> get_user_uuid(
const CHClient& client,
const std::string& username
);
std::optional<GradeSnapshot> // ============================================================================
load_latest_grades( // Snapshot Operations
// ============================================================================
// Load the most recent complete snapshot for a user (with all grades)
std::optional<GradeSnapshot> load_latest_snapshot(
const CHClient& client, const CHClient& client,
const std::string& user_id const std::string& user_id
); );
// ---------- Diff ---------- // Load a specific snapshot by response_id
std::optional<GradeSnapshot> load_snapshot_by_id(
std::vector<AssignmentDiff> const CHClient& client,
diff_grade_responses( const std::string& response_id
const api_utils::GradesResponse& old_resp,
const api_utils::GradesResponse& new_resp
); );
// ---------- Conditional insert ---------- // ============================================================================
// Insert Operations
// ============================================================================
bool // Insert a complete grade snapshot from API response
conditionally_insert_grades( // Returns: response_id if successful, empty string on failure
std::string insert_grade_snapshot(
const CHClient& client, const CHClient& client,
const std::string& user_id, const std::string& user_id,
const api_utils::GradesResponse& new_resp const api_utils::GradesResponse& api_response
); );
// ---------- Persist diffs ---------- // Insert grade update records
void insert_grade_updates(
void
insert_grade_updates(
const CHClient& client, const CHClient& client,
const std::string& user_id, const std::string& user_id,
const std::string& old_response_id, const std::string& old_response_id,
@@ -97,7 +145,55 @@ insert_grade_updates(
const std::vector<AssignmentDiff>& diffs const std::vector<AssignmentDiff>& diffs
); );
std::optional<clickhouse::UUID> get_user_uuid(const CHClient& client, const std::string& username); // ============================================================================
// Diff Operations
// ============================================================================
// Check if a new API response differs from the latest snapshot
bool has_changes(
const CHClient& client,
const std::string& user_id,
const api_utils::GradesResponse& new_api_response
);
// Diff two complete snapshots
std::vector<AssignmentDiff> diff_snapshots(
const GradeSnapshot& old_snapshot,
const GradeSnapshot& new_snapshot
);
// ============================================================================
// Helper: Get or Create Stable IDs
// ============================================================================
// Get existing class_id or create new class record
std::string get_or_create_class(
const CHClient& client,
const std::string& user_id,
const api_utils::ClassGrades& class_data
);
// Get existing assignment_id or create new assignment record
std::string get_or_create_assignment(
const CHClient& client,
const std::string& user_id,
const std::string& class_id,
const api_utils::AssignmentGrade& assignment_data
);
// ============================================================================
// Utility Functions
// ============================================================================
// Parse date string from API (format: "YYYY-MM-DD" or similar)
uint16_t parse_date_to_clickhouse(const std::string& date_str);
// Build lookup key for assignments
inline std::string make_assignment_key(
const std::string& class_name,
const std::string& assignment_name
) {
return class_name + "::" + assignment_name;
}
} // namespace database_utils } // namespace database_utils