10 min read

We ran into a fairly common problem I’d think, but one with no good solution that I could find.

Problem:

We have a large number of “processors” or things that can take accept something as input, do something, then generate something as output, which can be fed into another “processor” as input, and so on until we get to the final result. The problem is that the order of these daisy chain are driven by what the starting point is and what the desired ending point is.

Some examples of this includes:

  • We want to get a word count on a PDF. The PDF might be flat (a picture of text) and need OCR, or it might not need OCR. We need to first run it through something to check if it needs OCR, then if it does, we run it through an OCR tool that creates a DOCX. If it doesn’t need OCR, we run it through another tool that converts it to RTF, and then we convert the RTF to a DOCX. Once it’s a DOCX we can extract the text from it, and count the words. So we need to daisy-chain a number of tools together for this to work.
  • We have a business process workflow where we have some steps in the business that can take a unit of work through a pipeline, and which need the unit of work to be in a particular state in order for them to be able to work on it. Enterprise Service Bus / Integration frameworks workflows fall under this problem as well, since the process of daisy-chaining the processors can be quite complicated.

 

This is where we came up with a generic solution to this, in the process of which we came up with a few other pieces

  1. Representing “states” in the process as nodes┬áin a graph (ex in a document workflow: File { format: docx } or ExtractedText or Translated or TranslationReviewed or TranslatedDocumentAssembled) and processors as edges (ex: Convert document with MS Office, extract text with Aspose, send to a human translator and wait to get it back, spell check, etc)
  2. Passing decision trees as parameters internally (in reality pointers to the top node, which has up to 2 nodes it points to, one that would get promoted to the top if the operation succeeds, and another one if it fails)
  3. Enriching the state machine for edges to support metadata requirements and metadata output
  4. Passing partial decision trees (branches) as parameters between multiple distributed instances of the service to offer services that only exist on some instances, to the entire system
  5. Reducing complexity of the entire distributed system with recursion and Elastic Stack
  6. Make full use of HTTP in cases where we need to pass some files and some instructions on what to do with those files, by using MultiPart requests, where most of the parts are files, and one of them is a json object describing what’s needed
  7. When submitting the request, get back an estimated time of completion, so we don’t have to keep polling every X time to see if it’s done. Instead we wait for the minimum amount of time it’s likely to be done in, then we might get an updated time estimate, and finally the result, reducing polling to as little as one call for a long-running operation and polling really frequently for tiny quick operations.
  8. Add a feedback loop to get better over time.

 

Building the graph

The first challenge we have is representing the capabilities of the system in a logical way. We can get a list of all the possible general classes of states, and represent inheritance for them, which will come in handy later.

var derivedTypes = ReflectionHelpers.FindAllDerivedTypes<ProcessableState>();
foreach (var type in derivedTypes)
    graphClient.Cypher
        .Create("(statetype:StateType {newState})")
        .WithParam("newState", new { type.Name })
        .ExecuteWithoutResults();

this is done with a simple extension class

public class ReflectionHelpers
{
    public static List<Type> FindAllDerivedTypes<T>()
    {
        return FindAllDerivedTypes<T>(Assembly.GetAssembly(typeof(T)));
    }

    public static List<Type> FindAllDerivedTypes<T>(Assembly assembly)
    {
        var derivedType = typeof(T);
        return assembly
            .GetTypes()
            .Where(t =>
                t != derivedType &&
                derivedType.IsAssignableFrom(t)
            ).ToList();
    }
}

Next, we need to import all the components (edges) we have and all the states (nodes).

var allComponents = _componentService.GetComponents().SelectMany(t => t.Value.Capabilities).ToList();
var allStates = allComponents.Select(t => t.From).Union(allComponents.Select(t => t.To)).Distinct().ToList();
foreach (var state in allStates)
{
    var stateType = state.GetType().Name;
    graphClient.Cypher
        .Match("(type:StateType)")
        .Where((Type type) => type.Name == stateType)
        .Create("(state:ProcessableState {newState})-[:IS_TYPE]->(type)")
        .WithParam("newState", state)
        .ExecuteWithoutResults();
}

foreach (var capability in allComponents)
{
    graphClient.Cypher
        .Match("(state1:ProcessableState)", "(state2:ProcessableState)")
        .Where((ProcessableState state1) => state1.UniqueId == capability.From.UniqueId)
        .AndWhere((ProcessableState state2) => state2.UniqueId == capability.To.UniqueId)
        .Create("(state1)-[:PROCESSING {capability}]->(state2)")
        .WithParam("capability", new ProcessingRelationship { Name = capability.Name, Weight = capability.Weight, OpType = capability.OpType, Requirements = capability.Requirements, Resolves = capability.Resolves })
        .ExecuteWithoutResults();
}

Now we have a full representation of all our possible states, and all the things that connect them. They can be sparsely or densely connected, it makes little difference.

 

Using decision trees as parameters

Now the next thing to do is figure out the list of “processors” and order we need to pass through, then we need to actually run through that, possibly reach out to other servers, and more importantly, we need to figure out how long this whole thing will take, so we can report it back to the user.

In order to do this, we split the figuring out what to do from the actual execution. First we build a binary tree of operations we’re going to perform, created using the shortest weighted path. We can do this because we know the node we need to get to, and we have some information about the starting node, so we use Dijkstra’s shortest weighted path algorithm to find the most efficient route.

In C# using the neo4j client library, it looks something like this:

var pathsQuery = _graphClient.Cypher
    .Match("(from:ProcessableState)", "(to:ProcessableState)")
    .Where((FileState from) => from.Format == fromState.Format)
    .AndWhere((ProcessableState to) => to.UniqueId == toFileState.UniqueId)
    .Call("apoc.algo.dijkstra(from, to, 'PROCESSING>', 'Weight')")
            .Yield("path, weight")
            .Return(p => new PathsResult<string>
            {
                Nodes = Return.As<IEnumerable<Node<string>>>("nodes(path)"),
                Relationships = Return.As<IEnumerable<RelationshipInstance<ProcessingRelationship>>>("rels(path)"),
                Weight = Return.As<double>("weight")
            });

var results = pathsQuery.Results.OrderBy(t => t.Weight).ToList();
var result = results.FirstOrDefault();
if (result == null)
    throw new NotSupportedException($"Could not find path between {fromState} and {toFileState}");

var allNodes = results.SelectMany(t => t.Nodes).ToList();

Now we quickly realized that while it might be a given where we’re trying to end up, we don’t necessarily know for sure where we’re starting from. So we find the paths from every likely starting point, and add operations into the tree to determine the actual starting node during processing. Remember, this part has to be really fast because the HTTP request is waiting for an estimate on how long processing will take, so we don’t want to hold up that request while we do other operations.

 

 

Next, we just need to loop through the paths we got back, and build the processing tree. In the case of multiple paths, we can re-query the graph for operations that are able to test and clear the conflict of knowing which one is the real starting point. This is where the binary part of the tree comes in, since both trees get built, and the discriminator gets inserted as the head node, with each branch being the tree to process in case the test is positive vs negative.

Each node in this tree has the parameters it needs either added to it during this step, based on the parameters sent in the request, or they get appended out of the output of the previous node. The first node always has the input files in this case, and during processing it will pass its output files as the input files to the next operation.

Now we can just take the most expensive path (which every node computes based on the data of the input passed in) and estimate how long all the operations will take. During processing we record statistics on this, which allow the estimation to get better over time.

 

Enriching the state machine for edges to support metadata requirements and metadata output

One of the problems we briefly discussed is the multiple possible starting points caused by ambiguity, but there’s another one where we need additional information in order to be able to process it. The sender can pass this information to us, but in some cases they don’t know it, and we have operations that can determine it. Ex: we need the language of a file in order to apply the correct word count strategy.

In this case the word count processor requires language, and if we don’t have it sent to us, then we can query the graph for another processor (or sequence of processors to daisy chain) in order to resolve it. These again are just added as steps to process later as part of the processing tree, and no actual processing is done yet. This does add a bit to the estimated completion time, so it gets added there as well.

 

Multiple distributed instances of the service

Since we know the current state of all possible operations our node can do (and some capabilities can just be temporarily unavailable), we also know about the current capabilities of every node in the network, and we know how to get to that node (I will discuss how we know that in a future article, it’s based on a distributed system discovery/gossip algorithm). Now since the current node doesn’t know how to do something, we just build the tree up to the point where we can, and add a tree node called ProcessRemotely, having the input be the output of the latest operation we were able to process here, and the output be the end state we’re after. Doing it this way allows this to be processed passing through as many systems as needed. Statistics get computed by the source system for all of this, so it has an approximate idea of how long sending things out with those parameters will take, and it will get better over time as it learns.

 

Processing the tree

When an item is queued up for processing, it should have no dependencies to be satisfied; it instead should contain all the information to be processed immediately if the server satisfies the processing capabilities, or the entire item can be passed onto another server that meets the requirements since nothing on the queue can depend on it to be processed.

The item contains all the parameters for the current operation, and specifications for 2 optional operations to process in the case of a success or failure; thus a decision tree of queue-able items.

IQueableOperation
string Processor - name of the processor/engine to use
string Operation - the processor might support multiple operations, or multiple Processors might support the same operation, thus using 2 parameters
TimeSpan Timeout - time to allow for the operation to complete. If it doesn't complete in this time, it's aborted, considered failed, and the operations tree is continued
Success - a json object describing what to do in case of the operation being successful. This can be a termination object indicating that processing is complete, or another processing object
Fail - a json object describing what to do in case of the operation failing. This can be a termination object indicating that processing is complete, or another processing object

Now that we have the full decision tree built out, all we need to do is take the top node and send it to a processor that accepts it as input. The processor can be on the same system or called out to another system itself, have a queue, or anything else it needs to perform its job, but that’s not something the main system needs to care about. All the main system does is call the processor, and at some point, the processor will signal back that it completed, either successfully or failing.

Once the completed signal is received, we record stats on how long it took, and promote the appropriate branch to be the new head node, and recursively keep doing this until we run out of nodes. This is all there is to processing, because the individual components can worry about their job, and the decision tree is already built.

In case this path fails to reach a result, then we can go back to the tree builder and ask it to find an alternative path, which updates the processing time estimate, and starts to process the new tree. This happens until we either get a successful result, or we exhaust the possible paths we have to get to a successful result.

 

Observability

Since we have a lot of systems in play, we need to be able to know when something breaks, and we also need statistics on what all the systems are processing. This is where Elastic Stack shines. We have beats installed on every server, some monitoring our app and web server log files, syslog/event log, and system metrics. The beats send the information to the nearest Logstash to them, which parses and enriches the log entries (add GeoIP info if it sees an IP address, browser info if it sees a UserAgent string, etc), which write the logs into Elasticsearch, from where we visualize them using Kibana.

Notice: we don’t connect our application to the beats, or send directly to logstash. Instead, we write to local log files, and have them pick up the logs. This has the advantage that if the beats go down, whenever we get them back up things resume, if logstash goes down, then again when we get it back up, things eventually catch up, and same if elasticsearch goes down. We never lose information, worst case scenario is we only get it once we fixed communication.

 

Multipart requests

We struggled with the chicken and egg problem for a while in other systems. We need to upload some files, but they need to be associated with a project, but we can’t have a project without files. Well.. HTTP actually solves this, and it looks something like this:

POST /process HTTP/1.1
User-Agent: Mozilla/5.0 Gecko/2009042316 Firefox/3.0.10
Authorization: Bearer MYJWTAUTHTOKEN
Accept-Encoding: gzip,deflate
Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7
Keep-Alive: 300
Connection: keep-alive
Content-Type: multipart/form-data; boundary=----------287032381131322
Content-Length: 514
 
------------287032381131322
Content-Disposition: form-data; name="form"; filename="form.pdf"; language="English"; password="xyz"; retention="30"
Content-Type: image/pdf
 
PDF87a.............,...........D..;
------------287032381131322
Content-Disposition: form-data; name="scan1"; filename="scan1.gif"; language="English"
Content-Type: image/gif
 
GIF87a.............,...........D..;
------------287032381131322
Content-Disposition: form-data
Content-Type: application/json
{
    ops :[
        {
            inputs: ["/files/0", "/files/1"],
            opType: "OCR",
            outputFormat: "Docx",
            retention: 30
        }
    ]
};
------------287032381131322--

We send all the files, and the instructions of what to do with the files, all in one request, then we don’t have any multi-request dependencies. We also have file headers that give us info about each file, and request headers that cover things like authentication.

 

Feedback loop

We record statistics after every operation we process, whether it’s successful or not. Then based on the aggregation we determine, we update the weights of each edge. We store multiple weights based on parameters that are important to us like duration vs quality loss (ex: formatting), and choose the one to use based on the incoming request.

We also accept post-processing feedback on quality of output to be posted back, and update weights and statistics using that as well.

 

Summary

Overall such distributed systems are very resilient, and offer some powerful capabilities, and although there are a lot of uncommon ideas presented here, the architecture is quite simple, and each piece is only responsible for one thing, but overall solve some complicated problems.

Was this post helpful?