Runners

Runners in depth

Learn how to configure and use our runners

This reference guide assumes you already completed the steps described in the 5-minute getting started experience. To test that the Nextmv CLI is correctly configured, you can optionally run the following command on your terminal. It will get some files that are necessary to work with the Nextmv Platform. You can see the expected output as well.

nextmv sdk install
Copy

The Nextmv runners make it easy to read input data and options, run your algorithm and write output data. Data can come from Stdin, a file or an HTTP request for example and it can write to many different places. The options of your algorithm can be freely defined and will automatically be available via flags and environment variables. You can configure what the encoding of the input and output data is and set a variety of other options.

Let's dive in.

The CLI Runner

The CLI runner is the default runner and is intended to run your app from the Command Line Interface. It reads input data from Stdin or a file, runs your algorithm and writes the output to Stdout or a file. It also provides a --help flag that will print the options of your algorithm.

Here is a simple example.

// package main holds the implementation of a simple runner example.
package main

import (
	"context"
	"log"
	"time"

	"github.com/nextmv-io/sdk/run"
	"github.com/nextmv-io/sdk/run/schema"
)

func main() {
	err := run.CLI(algorithm).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

type input struct {
	Message string `json:"message" usage:"Message to print."`
}

type option struct {
	Duration time.Duration `json:"duration" default:"1s" usage:"Sleep duration."`
}

type output struct {
	Message string `json:"message"`
}

func algorithm(_ context.Context, input input, opts option) (schema.Output, error) {
	// sleep for the specified duration, 1s by default as defined via go tags
	time.Sleep(opts.Duration)
	return schema.NewOutput(opts, output{Message: input.Message + " World!"}), nil
}
Copy

In this file we define a function called algorithm. This function defines an input struct which holds a field called Message and an option struct which holds a field called Duration. The output is defined as a struct with a Message field. The algorithm will take the duration that came in via the option, wait for that duration and then print the message that came in via the input with an additional word " World!". We are calling run.CLI to get the runner and then we call Run on it to start the actual run, which might return an error, so we handle that error.

But how do the input and opts parameters in our algorithm function get populated when you run the program above by executing the bash script?

Automatic flag and environment variable parsing

The CLI runner will automatically parse the options of your algorithm from flags and environment variables. It also has some built-in options that you can use to configure the runner. When you build the above example and run the resulting binary with the --help flag, you will see the following output:

Nextmv Hybrid Optimization Platform VERSION
Usage:
  -duration duration
    	Sleep duration. (env DURATION) (default 1s)
  -runner.input.path string
    	The input file path (env RUNNER_INPUT_PATH)
  -runner.output.path string
    	The output file path (env RUNNER_OUTPUT_PATH)
  -runner.output.solutions string
    	{all, last} (env RUNNER_OUTPUT_SOLUTIONS) (default "last")
  -runner.profile.cpu string
    	The CPU profile file path (env RUNNER_PROFILE_CPU)
  -runner.profile.memory string
    	The memory profile file path (env RUNNER_PROFILE_MEMORY)
Copy

The interesting part is the duration flag. It is automatically generated from the option struct of our algorithm. Note that the default value is 1s. This is because we defined 1s as the default via the go struct tag. Also note that the usage we defined in the go struct tag is also used in the help output. This means that you can document your algorithm options and they will be available via the --help command.

The flags with the runner prefix are the runner specific options that come with the runner itself. As you can see in the bash script, we are passing a file path to the runner. runner.input.path is a built-in option that you can use to specify the path to the input file. If you don't specify a path, the runner will read from Stdin. Similarly, runner.output.path is a built-in option that you can use to specify the output path. If you don't specify a path, the runner will write to Stdout.

Input decoding and output encoding

The CLIRunner can decode input data from JSON, XML, GOB and CSV as well as encode output data to JSON, XML and GOB currently.

In the following example, we will use the CSV decoding for the input and GOB encoding for the output. Note that GOB is a binary encoding and therefore the output contains some special characters.

// package main holds the implementation of a simple runner example.
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/nextmv-io/sdk/dataframe"
	"github.com/nextmv-io/sdk/run"
	"github.com/nextmv-io/sdk/run/cli"
	"github.com/nextmv-io/sdk/run/encode"
)

var (
	firstName = dataframe.Strings("FIRST_NAME")
	lastName  = dataframe.Strings("LAST_NAME")
)

func main() {
	err := run.CLI(
		algorithm,
		cli.Validate[dataframe.DataFrame, option, output](nil),
		cli.Decode[dataframe.DataFrame, option, output](dataframe.FromCSV()),
		cli.Encode[dataframe.DataFrame, option, output](encode.Gob()),
	).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

type option struct {
	Duration time.Duration `json:"duration" default:"1s" usage:"Sleep duration."`
}

type output struct {
	Message string `json:"message"`
}

func algorithm(_ context.Context, d dataframe.DataFrame, opts option) (output, error) {
	// sleep for the specified duration, 1s by default as defined via go tags
	time.Sleep(opts.Duration)
	d = d.
		Filter(firstName.Equals("John").Or(firstName.Equals("Jane"))).
		Filter(lastName.Equals("Doe"))

	message := ""
	for i := 0; i < d.Len(); i++ {
		message += fmt.Sprintf(
			"Hello %s %s\n", firstName.Value(d, i), lastName.Value(d, i),
		)
	}

	return output{Message: message}, nil
}
Copy

Turn your attention to the following lines:

cli.Validate[dataframe.DataFrame, option, output](nil),
cli.Decode[dataframe.DataFrame, option, output](dataframe.FromCSV()),
Copy

Here we configure the input decoding and output encoding. Note how we are providing type hints for the generic types. The first type is the type of our input (here [dataframe.DataFrame]), the second is the type of our options and lastly the type of our output.

The algorithm itself is similar to the one in the previous example, but given we are now reading CSV data, we are showcasing some filtering on the Dataframe and then returning a message. If you look at the output tab, you'll notice some special characters which are there because the output is GOB encoded.

The HTTP/S Runner

The HTTP runner exposes your algorithm via an HTTP endpoint. You send the input data via a POST request and you can set the options of your algorithm via query parameters.

Synchronous runs

In the synchronous case, the output of the algorithm is returned to the caller synchronously. This means that the HTTP call will block until a run is finished. Let's look at an example.

// package main holds the implementation of a simple runner example.
package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/nextmv-io/sdk/run"
	"github.com/nextmv-io/sdk/run/schema"
)

func main() {
	err := run.HTTP(algorithm,
		// listen on port 9000
		run.SetAddr[input, option, schema.Output](":9000"),
		// set the maximum number of parallel requests to 2
		run.SetMaxParallel[input, option, schema.Output](2),
		// override the default logger
		run.SetLogger[input, option, schema.Output](
			log.New(os.Stdout, "[demo] - ", log.Lshortfile),
		),
	).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

type input struct {
	Message string `json:"message" usage:"Message to print."`
}

type option struct {
	Duration time.Duration `json:"duration" default:"1s" usage:"Sleep duration."`
}

type output struct {
	Message string `json:"message"`
}

func algorithm(_ context.Context, input input, opts option) (schema.Output, error) {
	// sleep for the specified duration, 1s by default as defined via go tags
	time.Sleep(opts.Duration)
	return schema.NewOutput(opts, output{Message: input.Message + " World!"}), nil
}
Copy

As you can see in the example above, the main function sets up the HTTP runner. We'll go into detail about the options in a moment. For now, turn your attention to the Bash tab. We are sending a POST request to the HTTP endpoint with the JSON input data in the body. We are also setting the duration as a query parameter. The output is returned as a JSON response. Just like you can configure decoders and encoders for the CLI runner, you can also do that for the HTTP runner in which case the input and output encodings would change appropriately - JSON is just the default.

In the Output tab you can see the response to the curl request.

In the Help tab, you can see the help output of the HTTP runner. It is similar to the help output of the CLI runner. You will find the duration option, because the algorithm hasn't changed. What has changed is the runner itself. The HTTP runner comes with different options, some of which you have seen in the code as well (address, maxparallel). Instead of setting them in the code, you can also set them via the command line flags and environment variables. If you want the HTTP runner to listen via HTTPS, you will have to set the path to both the certificate and the key file.

Now, let us get back to setting the options for the HTTP runner and let us look at them in detail. SetAddr is used to set the port on which to listen, in the above example we are using port 9000. Furthermore SetMaxParallel is used to set the maximum number of parallel requests. If more requests than the maximum number of parallel requests come in at the same time, the ones that cause the overage will be rejected and a 429 status is returned (see below).

max number of parallel requests exceeded
Copy

Finally SetLogger is used to set the logger that the runner will use. In the following example we are provoking an error by sending a request with an invalid payload, which is just an opening curly bracket {.

nextmv sdk run main.go
nextmv sdk run main.go > /dev/null 2>&1 &
curl -s -X POST "http://localhost:9000?duration=500000000" -H 'Content-Type: application/json' -d '{'
Copy

Asynchronous runs

In the asynchronous case, the output of the algorithm is not returned to the caller. Instead, the caller receives a request ID and the output of the algorithm is posted to a given callback URL. Let's look at an example.

// package main holds the implementation of a simple runner example.
package main

import (
	"bytes"
	"context"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/nextmv-io/sdk/run"
	"github.com/nextmv-io/sdk/run/schema"
)

func main() {
	// start a callback server listening on port 8080
	go func() {
		handler := http.HandlerFunc(callback)
		http.Handle("/callback", handler)
		err := http.ListenAndServe(":8080", nil)
		if err != nil {
			log.Fatal(err)
		}
	}()

	err := run.HTTP(algorithm,
		// listen on port 9001
		run.SetAddr[input, option, schema.Output](":9001"),
		// set the maximum number of parallel requests to 2
		run.SetMaxParallel[input, option, schema.Output](2),
		// override the default logger
		run.SetLogger[input, option, schema.Output](
			log.New(os.Stdout, "[demo] - ", log.LstdFlags),
		),
		// send solutions to the callback URL instead of returning them directly
		run.SetHTTPRequestHandler[input, option, schema.Output](
			run.AsyncHTTPRequestHandler(
				// configure the callback URL to send solutions to the callback
				// server started above
				run.CallbackURL("http://localhost:8080/callback"),
				// a caller of the HTTP endpoint is not allowed to override the
				// default callback URL defined above
				run.RequestOverride(false),
			),
		),
	).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

func callback(_ http.ResponseWriter, r *http.Request) {
	var b bytes.Buffer
	_, err := b.ReadFrom(r.Body)
	if err != nil {
		log.Fatal(err)
	}
	file, err := os.Create("callback.txt")
	if err != nil {
		log.Fatal(err)
	}
	_, err = file.WriteString(b.String())
	if err != nil {
		log.Fatal(err)
	}
	// you can access the request ID via the request header
	// r.Header.Get("request_id")
}

type input struct {
	Message string `json:"message" usage:"Message to print."`
}

type option struct {
	Duration time.Duration `json:"duration" default:"1s" usage:"Sleep duration."`
}

type output struct {
	Message string `json:"message"`
}

func algorithm(_ context.Context, input input, opts option) (schema.Output, error) {
	// sleep for the specified duration, 1s by default as defined via go tags
	time.Sleep(opts.Duration)
	return schema.NewOutput(opts, output{Message: input.Message + " World!"}), nil
}
Copy

The first go func in the main function uses Go's http module to start a callback server listening on port 8080. Of course in a real world scenario, this callback endpoint would not live in the same process and most likely not even on the same machine.

func main() {
	// start a callback server listening on port 8080
	go func() {
		handler := http.HandlerFunc(callback)
		http.Handle("/callback", handler)
		err := http.ListenAndServe(":8080", nil)
		if err != nil {
			log.Fatal(err)
		}
Copy

Looking at the Bash tab, you can see that we are sending the same request as in the synchronous case. In contrast to the synchronous case, the response (see Output tab) is now a request ID. The callback function (see below) is configured to handle incoming requests. It writes the body of the request (which is the output of the algorithm) to a file called callback.txt (see tab callback.txt above). The request_id is the same id that the caller received when they initially sent the POST request and can be retrieved as depicted in the commented code in the callback function. This way a caller will be able to map requests to callback responses.

}

func callback(_ http.ResponseWriter, r *http.Request) {
	var b bytes.Buffer
	_, err := b.ReadFrom(r.Body)
	if err != nil {
		log.Fatal(err)
	}
	file, err := os.Create("callback.txt")
	if err != nil {
		log.Fatal(err)
	}
	_, err = file.WriteString(b.String())
	if err != nil {
		log.Fatal(err)
	}
	// you can access the request ID via the request header
	// r.Header.Get("request_id")
}
Copy

This is the callback-side of the example. Now let's look at the HTTP runner once more.

),
// send solutions to the callback URL instead of returning them directly
run.SetHTTPRequestHandler[input, option, schema.Output](
	run.AsyncHTTPRequestHandler(
		// configure the callback URL to send solutions to the callback
		// server started above
		run.CallbackURL("http://localhost:8080/callback"),
		// a caller of the HTTP endpoint is not allowed to override the
		// default callback URL defined above
		run.RequestOverride(false),
	),
Copy

We configured the HTTP runner to work asynchronously. The default callback URL is set to post results to the endpoint that the first part of our code opened. The second option RequestOverride configures whether a caller can override the callback URL. In this case overriding the callback URL is forbidden. In case you want to allow the caller to specify a callback URL, set this value to true. A caller then passes callback_url with the appropriate URL in the header of the request.

Example using Router

The HTTP runner can also be used with Router. The following code is taken mainly from initializing the routing template. Let's take a look.

// package main holds the implementation of a simple runner example.
package main

import (
	"bytes"
	"context"
	"log"
	"net/http"
	"os"

	"github.com/nextmv-io/sdk/route"
	"github.com/nextmv-io/sdk/run"
	"github.com/nextmv-io/sdk/run/schema"
	"github.com/nextmv-io/sdk/store"
)

func main() {
	// start a callback server listening on port 8081
	go func() {
		handler := http.HandlerFunc(callback)
		http.Handle("/callback", handler)
		err := http.ListenAndServe(":8081", nil)
		if err != nil {
			log.Fatal(err)
		}
	}()

	err := run.HTTP(solver,
		// listen on port 9004
		run.SetAddr[input, store.Options, schema.Output](":9004"),
		// set the maximum number of parallel requests to 1
		run.SetMaxParallel[input, store.Options, schema.Output](1),
		// override the default logger
		run.SetLogger[input, store.Options, schema.Output](
			log.New(os.Stdout, "[router demo] - ", log.Lshortfile),
		),
		// send solutions to the callback URL instead of returning them directly
		run.SetHTTPRequestHandler[input, store.Options, schema.Output](
			run.AsyncHTTPRequestHandler(
				// configure the callback URL to send solutions to the callback
				// server started above
				run.CallbackURL("http://localhost:8081/callback"),
				// a caller of the HTTP endpoint is not allowed to override the
				// default callback URL defined above
				run.RequestOverride(false),
			),
		),
	).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}

func callback(_ http.ResponseWriter, r *http.Request) {
	var b bytes.Buffer
	_, err := b.ReadFrom(r.Body)
	if err != nil {
		log.Fatal(err)
	}
	file, err := os.Create("callback.txt")
	if err != nil {
		log.Fatal(err)
	}
	_, err = file.WriteString(b.String())
	if err != nil {
		log.Fatal(err)
	}
	// you can access the request ID via the request header
	// r.Header.Get("request_id")
}

// This struct describes the expected json input by the runner.
// Features not needed can simply be deleted or commented out, but make
// sure that the corresponding option in `solver` is also commented out.
// In case you would like to support a different input format you can
// change the struct as you see fit. You may need to change some code in
// `solver` to use the new structure.
type input struct {
	Stops               []route.Stop         `json:"stops"`
	Vehicles            []string             `json:"vehicles"`
	InitializationCosts []float64            `json:"initialization_costs"`
	Starts              []route.Position     `json:"starts"`
	Ends                []route.Position     `json:"ends"`
	Quantities          []int                `json:"quantities"`
	Capacities          []int                `json:"capacities"`
	Precedences         []route.Job          `json:"precedences"`
	Windows             []route.Window       `json:"windows"`
	Shifts              []route.TimeWindow   `json:"shifts"`
	Penalties           []int                `json:"penalties"`
	Backlogs            []route.Backlog      `json:"backlogs"`
	VehicleAttributes   []route.Attributes   `json:"vehicle_attributes"`
	StopAttributes      []route.Attributes   `json:"stop_attributes"`
	Velocities          []float64            `json:"velocities"`
	Groups              [][]string           `json:"groups"`
	ServiceTimes        []route.Service      `json:"service_times"`
	AlternateStops      []route.Alternate    `json:"alternate_stops"`
	Limits              []route.Limit        `json:"limits"`
	DurationLimits      []float64            `json:"duration_limits"`
	DistanceLimits      []float64            `json:"distance_limits"`
	ServiceGroups       []route.ServiceGroup `json:"service_groups"`
}

// solver takes the input and solver options and constructs a routing solver.
// All route features/options depend on the input format. Depending on your
// goal you can add, delete or fix options or add more input validations. Please
// see the [route package
// documentation](https://pkg.go.dev/github.com/nextmv-io/sdk/route) for further
// information on the options available to you.
func solver(ctx context.Context, i input, opts store.Options) (schema.Output, error) {
	// In case you directly expose the solver to untrusted, external input,
	// it is advisable from a security point of view to add strong
	// input validations before passing the data to the solver.

	// Define base router.
	router, err := route.NewRouter(
		i.Stops,
		i.Vehicles,
		route.Threads(2),
		route.Velocities(i.Velocities),
		route.Starts(i.Starts),
		route.Ends(i.Ends),
		route.Capacity(i.Quantities, i.Capacities),
		route.Precedence(i.Precedences),
		route.Services(i.ServiceTimes),
		route.Shifts(i.Shifts),
		route.Windows(i.Windows),
		route.Unassigned(i.Penalties),
		route.InitializationCosts(i.InitializationCosts),
		route.Backlogs(i.Backlogs),
		route.LimitDurations(
			i.DurationLimits,
			true, /*ignoreTriangular*/
		),
		route.LimitDistances(
			i.DistanceLimits,
			true, /*ignoreTriangular*/
		),
		route.Attribute(i.VehicleAttributes, i.StopAttributes),
		route.Grouper(i.Groups),
		route.Alternates(i.AlternateStops),
		route.ServiceGroups(i.ServiceGroups),
	)
	if err != nil {
		return schema.Output{}, err
	}

	solver, err := router.Solver(opts)
	solutions, err := run.AllSolutions(ctx, solver, err)
	if err != nil {
		return schema.Output{}, err
	}

	return schema.NewOutput(opts, solutions...), nil
}
Copy

This example is very similar to the asynchronous example above. After all it is using the asynchronous HTTP runner. Here are some key differences:

func solver(ctx context.Context, i input, opts store.Options) (schema.Output, error) {
Copy

Looking at the line above, you can see that we are using options and a solution structure that are not defined in this file. They come from the store package and are structures that are used by Nextmv's solver technology. store.Options exposes a field limits.duration and therefore the query parameter in the following curl statement sets limits.duration.

curl --data "@input.json" -s "http://localhost:9004?limits.duration=1000000000" -H 'Content-Type: application/json'
Copy

The last thing to mention here is not so much a difference but rather an important function to note.

solutions, err := run.AllSolutions(ctx, solver, err)
Copy

As you can see, we are calling run.AllSolutions(router.Solver(opts)). The signature that our runners expect are of the form (input, option) ([]solution, error) and we do adhere to that in this example. However, the router.Solver function returns (store.Solver, error) so in order to make it easy to work with our runners, we provide the run.AllSolutions function.

Page last updated

Go to on-page nav menu