How to create a Django ViewFlow software process

Summary

I am developing a web application for learning Django (python 3.4 and Django 1.6.10). The web application has complex and frequently updated workflows. I decided to integrate the Django-Viewflow library ( https://github.com/viewflow/viewflow/ ), because it seems to be a very convenient way to process workflows and does not include logic workflow using application models.

In this case, I created a workflow for collecting copyright information and copyrights using the Django-Viewflow library. The workflow should begin every time the author is added to the book.

My problem

The documentation provides step-by-step instructions for integrating the end-to-end worklfow solution (frontend and backend). My problem is that it is difficult for me to manage the workflow programmatically (in particular, from the book).

Application description

I have a book model (main model) with many relationships to authors.

MYAPP / models.py

class Book(models.Model): title = models.CharField(max_length=100) authors = models.ManyToManyField(Author) publisher = models.ForeignKey(Publisher) publication_date = models.DateField() 

Workflow components:

myFlow / models.py

 from viewflow.models import Process class AuthorInvitation(process) consent_confirmed = models.BooleanField(default=False) signature = models.CharField(max_length=150) 

myFlow / flows.py

 from viewflow import flow from viewflow.base import this, Flow from viewflow.contrib import celery from viewflow.views import StartProcessView, ProcessView from . import models, tasks class AuthorInvitationFlow(Flow): process_cls = models.AuthorInvitation start = flow.Start(StartProcessView) \ .Permission(auto_create=True) \ .Next(this.notify) notify = celery.Job(tasks.send_authorship_request) \ .Next(this.approve) approve = flow.View(ProcessView, fields=["confirmed","signature"]) \ .Permission(auto_create=True) \ .Next(this.check_approve) check_approve = flow.If(cond=lambda p: p.confirmed) \ .OnTrue(this.send) \ .OnFalse(this.end) send = celery.Job(tasks.send_authorship) \ .Next(this.end) end = flow.End() 

Question

How can you programmatically manage the workflow process (activate, confirm steps, repeat steps, cancel the process ...)? I tried to delve into the code of the library. It seems that class activate contains the correct method, but is not sure how everything should be organized.

Thanks in advance!

+8
python workflow django django-viewflow
source share
2 answers

In streams <2> there are two additional tasks for creating a finished assembly

StartFunction - starts a thread when a function is called somewhere:

 @flow_start_func def create_flow(activation, **kwargs): activation.prepare() activation.done() return activation class FunctionFlow(Flow): start = flow.StartFunction(create_flow) \ .Next(this.end) # somewhere in the code FunctionFlow.start.run(**some_kwargs) 

StartSignal - starts a stream when a django signal is received:

 class SignalFlow(Flow): start = flow.StartSignal(some_signal, create_flow) \ .Next(this.end) 

You can check their use and the rest of the built-in task in this test suite for viewing .

To manually process the status of a task, you must first get the task from the database, activate it, and call any activation method.

 task = MyFlow.task_cls.objects.get(...) activation = task.activate() if activation.undo.can_proceed(): activation.undo() 

Any activation transition has a .can_proceed() method that helps you check if the job is in a state that allows the transition.

+9
source share

I needed to manually or programmatically start a thread instance. The model with which I StartFunction , based on the above link to StartFunction looks like this:

 class MyRunFlow(flow.Flow): process_class = Run start = flow.Start(ProcessCreate, fields=['schedule']). \ Permission(auto_create=True). \ Next(this.wait_data_collect_start) start2 = flow.StartFunction(process_create). \ Next(this.wait_data_collect_start) 

Note that it is important that process_create has a Process object and this code must programmatically set up the same fields as the ProcessCreate forms manually through the ProcessCreate field ProcessCreate :

 @flow_start_func def process_create(activation: FuncActivation, **kwargs): # # Update the database record. # db_sch = Schedule.objects.get(id=kwargs['schedule']) activation.process.schedule = db_sch # <<<< Same fields as ProcessCreate activation.process.save() # # Go! # activation.prepare() activation.done() return activation 

Note that activation subclass inside flow_start_func is a FuncActivation that has prepare () and save () methods. kwargs come from a call to launch, which happens something like this:

 start_node = <walk the flow class looking for nodes of type StartFunction> activation = start_node.run(schedule=self.id) 
+1
source share

All Articles