multithreaded singleton WolframLink class

This commit is contained in:
2026-02-19 19:22:20 -06:00
parent 8123c1af4a
commit 3ef111e991
2 changed files with 239 additions and 0 deletions

171
src/WolframLink.cpp Normal file
View File

@@ -0,0 +1,171 @@
#include "WolframLink.hpp"
#include <spdlog/spdlog.h>
// ----------------------------------------------------------------
// Static member definitions
// ----------------------------------------------------------------
std::shared_ptr<WolframLink> WolframLink::instance_ = nullptr;
std::mutex WolframLink::instance_mutex_;
// ----------------------------------------------------------------
// Singleton
// ----------------------------------------------------------------
std::shared_ptr<WolframLink> WolframLink::instance(WSLINK link) {
std::lock_guard<std::mutex> lock(instance_mutex_);
if (!instance_) {
if (!link) {
throw std::runtime_error("WolframLink: first call must provide a WSLINK");
}
instance_ = std::shared_ptr<WolframLink>(new WolframLink(link));
}
return instance_;
}
void WolframLink::destroy() {
std::lock_guard<std::mutex> lock(instance_mutex_);
instance_.reset();
}
// ----------------------------------------------------------------
// Constructor / Destructor
// ----------------------------------------------------------------
WolframLink::WolframLink(WSLINK link)
: link_(link)
, shutdown_(false)
, worker_([this]() { worker_loop(); })
{
spdlog::debug("WolframLink created, worker started");
}
WolframLink::~WolframLink() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
shutdown_ = true;
}
queue_cv_.notify_all();
if (worker_.joinable()) {
worker_.join();
}
spdlog::debug("WolframLink destroyed, worker joined");
}
// ----------------------------------------------------------------
// Public API
// ----------------------------------------------------------------
std::future<std::optional<std::string>> WolframLink::evaluate(const std::string& expr) {
auto promise = std::make_shared<std::promise<std::optional<std::string>>>();
auto future = promise->get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (shutdown_) {
promise->set_value(std::nullopt);
return future;
}
queue_.push([this, expr, promise]() {
promise->set_value(evaluate_on_worker(expr));
});
}
queue_cv_.notify_one();
return future;
}
std::future<std::optional<std::string>> WolframLink::evaluate_and_log(
const std::string& label,
const std::string& expr)
{
auto promise = std::make_shared<std::promise<std::optional<std::string>>>();
auto future = promise->get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (shutdown_) {
promise->set_value(std::nullopt);
return future;
}
queue_.push([this, label, expr, promise]() {
auto result = evaluate_on_worker(expr);
if (result) {
spdlog::info("[{}]\n expr => {}\n result => {}\n",
label, expr, *result);
} else {
spdlog::error("[{}] Failed to evaluate: {}", label, expr);
}
promise->set_value(result);
});
}
queue_cv_.notify_one();
return future;
}
std::size_t WolframLink::queue_depth() const {
std::lock_guard<std::mutex> lock(queue_mutex_);
return queue_.size();
}
// ----------------------------------------------------------------
// Worker thread
// ----------------------------------------------------------------
void WolframLink::worker_loop() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
queue_cv_.wait(lock, [this]() {
return !queue_.empty() || shutdown_;
});
if (shutdown_ && queue_.empty()) {
break;
}
job = std::move(queue_.front());
queue_.pop();
}
try {
job();
} catch (const std::exception& e) {
spdlog::error("WolframLink worker caught exception: {}", e.what());
} catch (...) {
spdlog::error("WolframLink worker caught unknown exception");
}
}
spdlog::debug("WolframLink worker loop exiting");
}
// ----------------------------------------------------------------
// WSTP I/O — only ever called from worker_loop()
// ----------------------------------------------------------------
std::optional<std::string> WolframLink::evaluate_on_worker(const std::string& expr) {
std::string wrapped = "ToString[" + expr + ", InputForm]";
WSPutFunction(link_, "EvaluatePacket", 1);
WSPutFunction(link_, "ToExpression", 1);
WSPutString(link_, wrapped.c_str());
WSEndPacket(link_);
WSFlush(link_);
int pkt;
while ((pkt = WSNextPacket(link_)) != RETURNPKT) {
if (pkt == 0) {
spdlog::error("WSNextPacket returned 0 (link error): {}",
WSErrorMessage(link_));
return std::nullopt;
}
WSNewPacket(link_);
}
const char* result = nullptr;
if (!WSGetString(link_, &result)) {
spdlog::error("WSGetString failed: {}", WSErrorMessage(link_));
WSNewPacket(link_);
return std::nullopt;
}
std::string out(result);
WSReleaseString(link_, result);
WSNewPacket(link_);
return out;
}

68
src/WolframLink.hpp Normal file
View File

@@ -0,0 +1,68 @@
#pragma once
#include <wstp.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <thread>
class WolframLink {
public:
// ----------------------------------------------------------------
// Singleton access
// ----------------------------------------------------------------
static std::shared_ptr<WolframLink> instance(WSLINK link = nullptr);
static void destroy();
// ----------------------------------------------------------------
// Non-copyable / non-movable
// ----------------------------------------------------------------
WolframLink(const WolframLink&) = delete;
WolframLink& operator=(const WolframLink&) = delete;
WolframLink(WolframLink&&) = delete;
WolframLink& operator=(WolframLink&&) = delete;
~WolframLink();
// ----------------------------------------------------------------
// Public API — callable from any thread
// ----------------------------------------------------------------
std::future<std::optional<std::string>> evaluate(const std::string& expr);
std::future<std::optional<std::string>> evaluate_and_log(
const std::string& label,
const std::string& expr);
std::size_t queue_depth() const;
private:
explicit WolframLink(WSLINK link);
void worker_loop();
std::optional<std::string> evaluate_on_worker(const std::string& expr);
// ----------------------------------------------------------------
// Members
// ----------------------------------------------------------------
WSLINK link_;
std::atomic<bool> shutdown_;
mutable std::mutex queue_mutex_;
std::condition_variable queue_cv_;
std::queue<std::function<void()>> queue_;
std::thread worker_;
// ----------------------------------------------------------------
// Singleton state
// ----------------------------------------------------------------
static std::shared_ptr<WolframLink> instance_;
static std::mutex instance_mutex_;
};