None

Remote task executor


Using Django channels for real time task submitting and monitor.

By Kostas Koutsogiannopoulos

Introduction

Channels is a Django application that is using websockets technology for real time client - server communication in accordance with the basic http request/response cycle. The day of writing, channels is stable enough to be a part of the basic Django installation. Developers, using channels are able to create web applications that send/receive real time odjects with the server. That enables users to integrate with the application beyond http requests. Use cases are chat rooms, desktop notifications etc.

With this article we hope to inspire you about some new capabilities for your projects. The code is straight forward and easy to begin with.

Unfortunately is going to be relatively long so lets start with a screenshot:

In this demo, an operator submit tasks running in parallel. Each one has its own console tab. There is also an alert area (to the right) that informs the 'operators' group about the start/end of any task.

For a task submit you need to fill in the fields:

  • Server
  • Job Type
  • Job Name
  • Job Body

Requirements

Imagine a team of operators who submit tasks on multiple remote linux servers.

  • An operator can choose a server, a job type (like sql, bash, ansible playbook, JCL, anything you can describe as a Jinja2 template of a shell script), write his task and submit.
  • The system must know how to run each type of task from a linux shell.
  • Every operator can submit multiple tasks from the same web page.
  • Every operator can monitor the execution of each one of his tasks via systemout/systemerr output.
  • The whole team must be informed real time when a task is starting.
  • The whole team must be informed real time when a task is complete and for the exit code (success or failure).

Dependencies

We begin with a linux machine with python3 and python virtual environments installed.

Channels need a message broker like Redis so:

ubuntu@ubuntuDev:~# sudo apt-get install redis-server

Create a new virtual environment, activate it and install python packages:

ubuntu@ubuntuDev:~$ virtualenv -p python3 operations_env

ubuntu@ubuntuDev:~$ source operations_env/bin/activate

(operations_env) ubuntu@ubuntuDev:~$ pip install django channels asgi_redis

We also need paramiko and jinja2 for ssh connections and for templating our tasks:

(operations_env) ubuntu@ubuntuDev:~$ pip install paramiko jinja2

For the front end we use the following javascript libraries:

  • jquery-3.1.1
  • bootstrap-3.3.7
  • reconnecting-websocket-1.0.0 (for keeping the web socket connection always alive)
  • jquery.serialize-object-2.5.0

The Application

Start a new project and a new app:

(operations_env) ubuntu@ubuntuDev:~$ cd operations_env

(operations_env) ubuntu@ubuntuDev:~/operations_env$ django-admin startproject operations

(operations_env) ubuntu@ubuntuDev:~/operations_env$ cd operations/

(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py startapp piston

Prepare the database (we use the default sqlite3):

(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py migrate

Create the administrator:

(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py createsuperuser

 

Now create two models, one for the servers and one for out job-types. For this example we will save our credentials on the server model (in production there is ldap directory):

 ~/operations_env/operations/piston/models.py

from django.db import models


class JobType(models.Model):
    name = models.CharField(max_length=50)
    date_created = models.DateTimeField( auto_now_add=True )
    date_modified = models.DateTimeField( auto_now=True )
    description = models.TextField( blank=True, max_length=1024 )
    template = models.TextField( max_length=1024 )

class Server(models.Model):
    name = models.CharField(max_length=50)
    hostname = models.CharField(max_length=50)
    username = models.CharField(max_length=50)
    password = models.CharField(max_length=50)

 

We also need to register our models to the admin application:

 ~/operations_env/operations/piston/admin.py

from django.contrib import admin
from .models import JobType, Server


@admin.register(JobType)
class JobTypeAdmin(admin.ModelAdmin):
    list_display = ('name',)
    pass

@admin.register(Server)
class JobTypeAdmin(admin.ModelAdmin):
    list_display = ('name',)
    pass

 

Add to your settings.py your new application:

 ~/operations_env/operations/operations/settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'piston',
]

 

You are now ready to fire up the development server, login, connect to the admin app and insert some new servers/job types:

(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py runserver

Every Job type is a jinja2 template of a bash script.

For example an ansible playbook job type may look like this:

#!/bin/bash
source /opt/ansible/bin/activate
cat << EOT >> /opt/ansible/playbooks/{{ key }}
{{ body }}
EOT
ansible-playbook -i /opt/ansible/hosts /opt/ansible/playbooks/{{ key }}
RC=$?
rm /opt/ansible/playbooks/{{ key }}
exit $RC

 

… a db2 sql script job type:

#!/bin/bash
export LC_ALL=el_GR.UTF-8
source sqllib/db2profile
cat << EOT > $HOME/{{ key }}
{{ body }}
EOT
db2 +p< $HOME/{{ key }}
RC=$?
rm $HOME/{{ key }}
exit $RC

 

Form, view and urls

 

All these bellow are basic django you may be familiar with:

 ~/operations_env/operations/piston/forms.py

from django import forms
from piston.models import JobType, Server


class SubmitForm(forms.Form):
    server = forms.ModelChoiceField(label='Server*', queryset=Server.objects.all().order_by('name'), required=True)
    job_type = forms.ModelChoiceField(label='Type*', queryset=JobType.objects.all().order_by('name'), required=True)
    job_name = forms.CharField(label='Name*', max_length=50, required=True)
    body = forms.CharField(
    label="Body*", widget=forms.Textarea(attrs={'rows': 10}), required=True)

 

 ~/operations_env/operations/piston/views.py

from django.views.generic.edit import FormView
from piston.forms import SubmitForm


class SubmitView(FormView):
    template_name = 'submit.html'
    form_class = SubmitForm

    def form_valid(self, form):
        return super(SubmitView, self).form_valid(form)

 

 ~/operations_env/operations/operations/urls.py

from django.conf.urls import url
from django.contrib import admin
from piston.views import SubmitView


urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^submit/', SubmitView.as_view(), name='submit'),
]

The demo template

At this point there is work to be done in order to deal with websockets. There is basicaly an html form and some javascript code running in two phases.

For the submission phase (on click of a button):

  1. We need to create an object with form's data and serialize it.
  2. We need to create a unique id for each submission and include it to the serialized odject. We do this for concurrency (see the comments in the end).
  3. We need to create a "console" area in the html page for each submission waiting for "console" messages.

For the socket.receive phase (on arrival of a new message):

  1. We need to parse the message, concluding if it is a console message or alert.
  2. For the console message we need to append it on the related console depending on the console id.
  3. For the alert message we need to prepend it on the alerts area.

 

Here is the template we use for this demo:

 ~/operations_env/operations/piston/templates/submit.html

<html>
<meta charset="UTF-8">

<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
<body>
<div class="container, col-md-9">
<form class="form-horizontal" action="#" id="job_form">{% csrf_token %}
    <fieldset>
           {% for field in form %}
           {% if field.name == "server" %}
           <br>
           <select class="form-control" id="job_form" name="{{field.name}}">
             {% for choice in field.field.queryset %}
             <option>{{ choice.name }}</option>
             {% endfor %}
           </select>
           {% elif field.name == "job_type" %}
           <br>
           <select class="form-control" id="job_form" name="{{field.name}}">
             {% for choice in field.field.queryset %}
             <option>{{ choice.name }}</option>
             {% endfor %}
           </select>
           {% elif field.name == "job_name" %}
           <br>
           <input class="form-control" id="id_{{ field.name }}" name="{{field.name}}" placeholder="{{ field.label }}">
           {% elif field.name == "body" %}
           <br>
           <textarea class="form-control" id="id_{{ field.name }}" name="{{field.name}}" placeholder="{{ field.label }}" rows="10"></textarea>
           {% endif %}
           {% endfor %}
    </fieldset>
  <br>
  <div class="controls">
      <button type="button" class="btn btn-default" name="action" onclick="clsConsole();">CLEAR</button>
      <button type="button" class="btn btn-danger submit_btn" name="action" onclick="Submit();">SUBMIT!</button>
  </div>
</form>
    <ul class="nav nav-tabs" role="tablist">
        <li><a href="#" class="add-console" style="display: none;"></a></li>
    </ul>
    <div class="tab-content">
    </div>
</div>
<div class="container, col-md-3"><br><div id="messages"></div></div>
</body>
<script src="https://code.jquery.com/jquery-3.1.1.slim.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js" integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa" crossorigin="anonymous"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/reconnecting-websocket/1.0.0/reconnecting-websocket.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery-serialize-object/2.5.0/jquery.serialize-object.min.js"></script>
<script>
var ws_scheme = window.location.protocol == "http" ? "wss" : "ws";
var ws_console = ws_scheme + '://' + window.location.host + '/submit/';
var socket = new ReconnectingWebSocket(ws_console);
var counter = 0;

socket.onmessage = function(e) {
    var message_data = JSON.parse(e.data)
    if (typeof message_data['console'] !== 'undefined') {
        var console_line = message_data['console'];
        var div_id = message_data['div_id'];
        var color_id = message_data['color'];
        $("<div />", { id : div_id })
            .append(console_line)
            .appendTo("#" + div_id)
            .css('color', color_id);
    } else if ( typeof message_data['alert'] !== 'undefined') {
        $('#messages').prepend(message_data['alert']);
    }
}

socket.onopen = function() {
    console.log("WS connected." );
}

if (socket.readyState == WebSocket.OPEN) socket.onopen();

function Submit() {
    counter += 1;
    $("#sub_key").val( "counter" );
    formdataser = $('#job_form').serializeObject();
    formdataser["sub_id"] = counter;
    var formdata = JSON.stringify(formdataser);
    var id = $(".nav-tabs").children().length;
    var tabId = 'console' + counter;
    var jobname = formdataser["job_name"]
    $('.add-console').closest('li').before('<li><a href="#console' + counter + '">' + jobname + '</a></li>');
    $('.tab-content').append('<div class="tab-pane" id="' + tabId + '"><br></div>');
    $('.nav-tabs li:nth-child(' + id + ') a').click();
    socket.send(formdata);
}

$(".nav-tabs").on("click", "a", function (e) {
        e.preventDefault();
        if (!$(this).hasClass('add-console')) {
            $(this).tab('show');
        }
    })
    .on("click", "span", function () {
        var anchor = $(this).siblings('a');
        $(anchor.attr('href')).remove();
        $(this).parent().remove();
        $(".nav-tabs li").children('a').first().click();
    });

function clsConsole() {
  $(".nav-tabs").html(""); 
  $(".nav-tabs").append("<li><a href='#' class='add-console' style='display: none;'></a></li>"); 
  $(".tab-content").html(""); 
}
</script>
</html>

The channels

In order to play with channels we need three things. Configuration, routing and consumers:

 ~/operations_env/operations/operations/settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',
    'piston',
]

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "asgi_redis.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
        },
        "ROUTING": "piston.routing.channel_routing",
    },
}

 

B. The file routing.py in our application directory:

 ~/operations_env/operations/piston/routing.py

from channels.routing import route
from piston.consumers import MyConsumer


channel_routing = [
    route("websocket.receive", MyConsumer),
]

 

C. And finally the file consumers.py in our application directory:

 ~/operations_env/operations/piston/consumers.py

from channels.generic.websockets import JsonWebsocketConsumer
from  paramiko import SSHClient, AutoAddPolicy
from jinja2 import Template
from piston.models import JobType, Server


ssh = SSHClient()

class MyConsumer(JsonWebsocketConsumer):

    channel_session_user = True

    def connection_groups(self, **kwargs):
        """
        Called to return the list of groups to automatically add/remove
        this connection to/from.
        """
        return ["operators"]
        #pass

    def connect(self, message, **kwargs):
        """
        Perform things on connection start
        """
        pass

    def receive(self, content, **kwargs):
        """
        Called when a message is received with decoded JSON content
        """
        ssh = SSHClient()
        ssh.set_missing_host_key_policy(AutoAddPolicy())
        server_name, job_type, job_name, body, sub_id = content['server'], content['job_type'], content['job_name'], str(content['body']), content['sub_id']
        template = Template(JobType.objects.get(name=job_type).template)
        username = Server.objects.get(name=server_name).username
        password = Server.objects.get(name=server_name).password
        server = Server.objects.get(name=server_name).hostname
        channel_key = self.message.channel_session.session_key
        channel_user = self.message.user
        script_id = channel_key + '_' + str(sub_id)
        script = template.render( body=body, key=script_id )
        print(script)
        ssh.connect(server, username=username, password=password)
        ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(script)
        self.groupAlert('<div class=\"alert alert-info alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">&times;</span></button><strong>Info!</strong> The job \'%s\' started.</div>' % job_name)
        for line in iter(ssh_stdout.readline, ''):
            self.conWrite( ('<samp>'+line+'</samp>'), ('console' + str(sub_id)), 'Green')
        for line in iter(ssh_stderr.readline, ''):
            self.conWrite( ('<samp>'+line+'</samp>'), ('console' + str(sub_id)), 'Red')
        if ssh_stdout.channel.recv_exit_status() == 0:
            self.groupAlert('<div class=\"alert alert-success alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">&times;</span></button><strong>Success!</strong> The job \'%s\' completed successfuly.</div>' % job_name)
            self.conWrite( '<p class="bg-success">The job completed OK!</p>', ('console' + str(sub_id)), 'Black')
        else:
            self.groupAlert('<div class=\"alert alert-danger alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">&times;</span></button><strong>Attention!</strong> The job \'%s\' failed.</div>' % job_name)
            self.conWrite( '<p class="bg-danger">The job failed!</p>', ('console' + str(sub_id)), 'Red')
        ssh.close()

    def conCreate(self, div_id, **kwargs):
        self.send({'div_id': div_id})

    def conWrite(self, line=None, div_id='console', color='Black', **kwargs):
        self.send({'console': line, 'div_id': div_id, 'color': color})

    def groupAlert(self, alert, **kwargs):
        self.group_send('operators', {'alert': alert })

    def disconnect(self, message, **kwargs):
        """
        Perform things on connection close
        """
        pass

 

Comments

Consumers is where all the magic happen:

  1. We used generic consumers to avoid dealing with low level operations like json decoding, character escaping etc. Thankfully channels developers are providing all these with generic consumers and some decorators.
  2. In our generic consumer "MyConsumer" every method is called on a web socket event as the comments describe. The methods conWrite and groupAlert was written by us to write to a user's console or to alert the group respectively.
  3. When we create the script, we render the template with two variables: body and key. The key may be used for tasks running in parallel in the same server and need -for example- to create a temporary file or pipe. We can use the key variable that is unique for every submission to ensure the the tasks can run in parallel.
  4. With conWrite method we send to client a line of text, a cosole id, and a font color.
  5. With the groupAlert method we are sending to 'operators' group a line of text.
  6. We parse the json message from client, run the task, and send back as message, each line of the task's systemout and systemerr with the relevant font color.
  7. We also notify the group with a bootstrap alert at the beginning and the end of each task.

 


View epilis's profile on LinkedIn Visit us on facebook X epilis rss feed: Latest articles