Program Listing for File service.cpp¶
↰ Return to documentation for file (src/translator/service.cpp
)
#include "service.h"
#include <string>
#include <utility>
#include "batch.h"
#include "definitions.h"
namespace marian {
namespace bergamot {
Service::Service(Ptr<Options> options, MemoryBundle memoryBundle)
: requestId_(0),
options_(options),
vocabs_(options, std::move(memoryBundle.vocabs)),
text_processor_(vocabs_, options),
batcher_(options),
numWorkers_(std::max<int>(1, options->get<int>("cpu-threads"))),
modelMemory_(std::move(memoryBundle.model)),
shortlistMemory_(std::move(memoryBundle.shortlist))
#ifdef WASM_COMPATIBLE_SOURCE
,
blocking_translator_(DeviceId(0, DeviceType::cpu), vocabs_, options_, &modelMemory_, &shortlistMemory_)
#endif
{
#ifdef WASM_COMPATIBLE_SOURCE
blocking_translator_.initialize();
#else
workers_.reserve(numWorkers_);
for (size_t cpuId = 0; cpuId < numWorkers_; cpuId++) {
workers_.emplace_back([cpuId, this] {
marian::DeviceId deviceId(cpuId, DeviceType::cpu);
BatchTranslator translator(deviceId, vocabs_, options_, &modelMemory_, &shortlistMemory_);
translator.initialize();
Batch batch;
// Run thread mainloop
while (batcher_ >> batch) {
translator.translate(batch);
}
});
}
#endif
}
void Service::blockIfWASM() {
#ifdef WASM_COMPATIBLE_SOURCE
Batch batch;
// There's no need to do shutdown here because it's single threaded.
while (batcher_ >> batch) {
blocking_translator_.translate(batch);
}
#endif
}
std::vector<Response> Service::translateMultiple(std::vector<std::string> &&inputs, ResponseOptions responseOptions) {
// We queue the individual Requests so they get compiled at batches to be
// efficiently translated.
std::vector<std::future<Response>> responseFutures;
for (auto &input : inputs) {
std::future<Response> inputResponse = queueRequest(std::move(input), responseOptions);
responseFutures.push_back(std::move(inputResponse));
}
// Dispatch is called once per request so compilation of sentences from
// multiple Requests happen.
blockIfWASM();
// Now wait for all Requests to complete, the future to fire and return the
// compiled Responses, we can probably return the future, but WASM quirks(?).
std::vector<Response> responses;
for (auto &future : responseFutures) {
future.wait();
responses.push_back(std::move(future.get()));
}
return responses;
}
std::future<Response> Service::queueRequest(std::string &&input, ResponseOptions responseOptions) {
Segments segments;
AnnotatedText source(std::move(input));
text_processor_.process(source, segments);
std::promise<Response> responsePromise;
auto future = responsePromise.get_future();
ResponseBuilder responseBuilder(responseOptions, std::move(source), vocabs_, std::move(responsePromise));
Ptr<Request> request = New<Request>(requestId_++, std::move(segments), std::move(responseBuilder));
batcher_.addWholeRequest(request);
return future;
}
std::future<Response> Service::translate(std::string &&input, ResponseOptions responseOptions) {
std::future<Response> future = queueRequest(std::move(input), responseOptions);
blockIfWASM();
return future;
}
Service::~Service() {
batcher_.shutdown();
#ifndef WASM_COMPATIBLE_SOURCE
for (std::thread &worker : workers_) {
assert(worker.joinable());
worker.join();
}
#endif
}
} // namespace bergamot
} // namespace marian