Wooey/djangui/tasks.py

113 lines
4.0 KiB
Python
Raw Normal View History

from __future__ import absolute_import
import subprocess
import tarfile
import os
import zipfile
from django.utils.text import get_valid_filename
2015-05-11 03:36:30 +00:00
from django.core.files.storage import default_storage
from django.core.files import File
from django.conf import settings
2015-04-29 14:31:06 +00:00
from celery import Task
2015-05-01 16:18:31 +00:00
from celery import states
from celery import app
2015-04-29 14:31:06 +00:00
from celery.contrib import rdb
celery_app = app.app_or_default()
class DjanguiTask(Task):
pass
# def after_return(self, status, retval, task_id, args, kwargs, einfo):
# job, created = DjanguiJob.objects.get_or_create(djangui_celery_id=task_id)
# job.content_type.djangui_celery_state = status
# job.save()
@celery_app.task(base=DjanguiTask)
2015-05-11 03:36:30 +00:00
def submit_script(**kwargs):
2015-05-01 16:18:31 +00:00
job_id = kwargs.pop('djangui_job')
2015-05-11 03:36:30 +00:00
resubmit = kwargs.pop('djangui_resubmit', False)
from .backend import utils
from .models import DjanguiJob
job = DjanguiJob.objects.get(pk=job_id)
command = utils.get_job_commands(job=job)
if resubmit:
# clone ourselves, setting pk=None seems hackish but it works
job.pk = None
# This is where the script works from -- it is what is after the media_root since that may change between
# setups/where our user uploads are stored.
cwd = job.get_output_path()
abscwd = os.path.abspath(os.path.join(settings.MEDIA_ROOT, cwd))
job.command = ' '.join(command)
job.save_path = cwd
utils.mkdirs(abscwd)
# make sure we have the script, otherwise download it. This can happen if we have an ephemeral file system or are
# executing jobs on a worker node.
script_path = job.script.script_path
2015-05-11 22:19:16 +00:00
if not utils.get_storage(local=True).exists(script_path.path):
utils.get_storage(local=True).save(script_path.path, script_path.file)
2015-05-11 03:36:30 +00:00
job.status = DjanguiJob.RUNNING
job.save()
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=abscwd)
stdout, stderr = proc.communicate()
# tar/zip up the generated content for bulk downloads
def get_valid_file(cwd, name, ext):
out = os.path.join(cwd, name)
index = 0
while os.path.exists('{}.{}'.format(out, ext)):
index += 1
out = os.path.join(cwd, '{}_{}'.format(name, index))
return '{}.{}'.format(out, ext)
2015-05-11 03:36:30 +00:00
# fetch the job again in case the database connection was lost during the job or something else changed.
job = DjanguiJob.objects.get(pk=job_id)
2015-05-11 03:36:30 +00:00
tar_out = get_valid_file(abscwd, get_valid_filename(job.job_name), 'tar.gz')
tar = tarfile.open(tar_out, "w:gz")
2015-05-11 03:36:30 +00:00
tar_name = os.path.splitext(os.path.splitext(os.path.split(tar_out)[1])[0])[0]
tar.add(abscwd, arcname=tar_name)
tar.close()
2015-05-11 03:36:30 +00:00
zip_out = get_valid_file(abscwd, get_valid_filename(job.job_name), 'zip')
zip = zipfile.ZipFile(zip_out, "w")
arcname = os.path.splitext(os.path.split(zip_out)[1])[0]
2015-05-11 03:36:30 +00:00
zip.write(abscwd, arcname=arcname)
for root, folders, filenames in os.walk(os.path.split(zip_out)[0]):
for filename in filenames:
path = os.path.join(root, filename)
if path == tar_out:
continue
if path == zip_out:
continue
zip.write(path, arcname=os.path.join(arcname, filename))
zip.close()
2015-04-29 14:31:06 +00:00
2015-05-11 03:36:30 +00:00
# save all the files generated as well to our default storage for ephemeral storage setups
for root, folders, files in os.walk(abscwd):
for filename in files:
filepath = os.path.join(root, filename)
s3path = os.path.join(root[root.find(cwd):], filename)
2015-05-11 22:19:16 +00:00
exists = utils.get_storage(local=False).exists(s3path)
filesize = utils.get_storage(local=False).size(s3path)
2015-05-11 03:36:30 +00:00
if not exists or (exists and filesize == 0):
if exists:
2015-05-11 22:19:16 +00:00
utils.get_storage(local=False).delete(s3path)
utils.get_storage(local=False).save(s3path, File(open(filepath, 'rb')))
2015-05-11 03:36:30 +00:00
utils.create_job_fileinfo(job)
2015-05-01 16:18:31 +00:00
job.stdout = stdout
job.stderr = stderr
2015-05-06 22:59:04 +00:00
job.status = DjanguiJob.COMPLETED
2015-05-01 16:18:31 +00:00
job.save()
return (stdout, stderr)