Files
PyRIGS/versioning/versioning.py
FreneticScribbler 3438489934 Add new line functionality for vehicles/drivers
Might it have been easier to create 'dummy' models like with EventItems? Probably...
2020-08-27 02:20:46 +01:00

273 lines
9.3 KiB
Python

import datetime
import logging
from diff_match_patch import diff_match_patch
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import EmailField, IntegerField, TextField
from django.shortcuts import get_object_or_404
from django.utils.functional import cached_property
from django.views import generic
from reversion.models import Version, VersionQuerySet
from RIGS import models
from assets import models as asset_models
logger = logging.getLogger('tec.pyrigs')
class FieldComparison(object):
def __init__(self, field=None, old=None, new=None):
self.field = field
self._old = old
self._new = new
def display_value(self, value):
if isinstance(self.field, IntegerField) and self.field.choices is not None and len(self.field.choices) > 0:
return [x[1] for x in self.field.choices if x[0] == value][0]
return value
@property
def old(self):
return self.display_value(self._old)
@property
def new(self):
return self.display_value(self._new)
@property
def long(self):
if isinstance(self.field, EmailField):
return True
return False
@property
def linebreaks(self):
if isinstance(self.field, TextField):
return True
return False
@property
def diff(self):
oldText = str(self.display_value(self._old)) or ""
newText = str(self.display_value(self._new)) or ""
dmp = diff_match_patch()
diffs = dmp.diff_main(oldText, newText)
dmp.diff_cleanupSemantic(diffs)
outputDiffs = []
for (op, data) in diffs:
if op == dmp.DIFF_INSERT:
outputDiffs.append({'type': 'insert', 'text': data})
elif op == dmp.DIFF_DELETE:
outputDiffs.append({'type': 'delete', 'text': data})
elif op == dmp.DIFF_EQUAL:
outputDiffs.append({'type': 'equal', 'text': data})
return outputDiffs
class ModelComparison(object):
def __init__(self, old=None, new=None, version=None, excluded_keys=[]):
# recieves two objects of the same model, and compares them. Returns an array of FieldCompare objects
try:
self.fields = old._meta.get_fields()
except AttributeError:
self.fields = new._meta.get_fields()
self.old = old
self.new = new
self.excluded_keys = excluded_keys
self.version = version
@cached_property
def revision(self):
return self.version.revision
@cached_property
def field_changes(self):
changes = []
for field in self.fields:
field_name = field.name
if field_name in self.excluded_keys:
continue # if we're excluding this field, skip over it
try:
oldValue = getattr(self.old, field_name, None)
except ObjectDoesNotExist:
oldValue = None
try:
newValue = getattr(self.new, field_name, None)
except ObjectDoesNotExist:
newValue = None
bothBlank = (not oldValue) and (not newValue)
if oldValue != newValue and not bothBlank:
comparison = FieldComparison(field, oldValue, newValue)
changes.append(comparison)
return changes
@cached_property
def fields_changed(self):
return len(self.field_changes) > 0
@cached_property
def item_changes(self):
# Recieves two event version objects and compares their items, returns an array of ItemCompare objects
item_type = ContentType.objects.get_for_model(models.EventItem)
old_item_versions = self.version.parent.revision.version_set.filter(content_type=item_type)
new_item_versions = self.version.revision.version_set.filter(content_type=item_type)
comparisonParams = {'excluded_keys': ['id', 'event', 'order']}
# Build some dicts of what we have
item_dict = {} # build a list of items, key is the item_pk
for version in old_item_versions: # put all the old versions in a list
if version.field_dict["event_id"] == int(self.new.pk):
compare = ModelComparison(old=version._object_version.object, **comparisonParams)
item_dict[version.object_id] = compare
for version in new_item_versions: # go through the new versions
if version.field_dict["event_id"] == int(self.new.pk):
try:
compare = item_dict[version.object_id] # see if there's a matching old version
compare.new = version._object_version.object # then add the new version to the dictionary
except KeyError: # there's no matching old version, so add this item to the dictionary by itself
compare = ModelComparison(new=version._object_version.object, **comparisonParams)
item_dict[version.object_id] = compare # update the dictionary with the changes
changes = []
for (_, compare) in list(item_dict.items()):
if compare.fields_changed:
changes.append(compare)
return changes
@cached_property
def items_changed(self):
return len(self.item_changes) > 0
@cached_property
def anything_changed(self):
return self.fields_changed or self.items_changed
class RIGSVersionManager(VersionQuerySet):
# TODO Rework this so its possible to define from the other apps
def get_for_multiple_models(self, model_array):
content_types = []
for model in model_array:
content_types.append(ContentType.objects.get_for_model(model))
return self.filter(content_type__in=content_types).select_related("revision").order_by(
"-revision__date_created")
class RIGSVersion(Version):
class Meta:
proxy = True
objects = RIGSVersionManager.as_manager()
@cached_property
def parent(self):
thisId = self.object_id
versions = RIGSVersion.objects.get_for_object_reference(self.content_type.model_class(), thisId).select_related(
"revision", "revision__user").all()
try:
previousVersion = versions.filter(revision_id__lt=self.revision_id).latest('revision__date_created')
except ObjectDoesNotExist:
return False
return previousVersion
@cached_property
def changes(self):
return ModelComparison(
version=self,
new=self._object_version.object,
old=self.parent._object_version.object if self.parent else None
)
class VersionHistory(generic.ListView):
model = RIGSVersion
template_name = "version_history.html"
paginate_by = 25
def get_queryset(self, **kwargs):
return RIGSVersion.objects.get_for_object(self.get_object()).select_related("revision",
"revision__user").all().order_by(
"-revision__date_created")
def get_object(self, **kwargs):
return get_object_or_404(self.kwargs['model'], pk=self.kwargs['pk'])
def get_context_data(self, **kwargs):
context = super(VersionHistory, self).get_context_data(**kwargs)
context['object'] = self.get_object()
context['id'] = self.get_object().pk
return context
class ActivityTable(generic.ListView):
model = RIGSVersion
template_name = "activity_table.html"
paginate_by = 25
def get_queryset(self):
versions = RIGSVersion.objects.get_for_multiple_models(
[models.Event, models.Venue, models.Person, models.Organisation, models.EventAuthorisation, models.RiskAssessment])
return versions.order_by("-revision__date_created")
def get_context_data(self, **kwargs):
context = super(ActivityTable, self).get_context_data(**kwargs)
context['title'] = 'Rigboard'
return context
# TODO Defined by the model, rather than with this list
def models_for_feed():
return [models.Event, models.Venue, models.Person, models.Organisation, models.EventAuthorisation, models.RiskAssessment, models.EventChecklist,
asset_models.Asset, asset_models.Supplier]
# Appears on homepage
class ActivityFeed(generic.ListView):
model = RIGSVersion
template_name = "activity_feed_data.html"
paginate_by = 25
def get_queryset(self):
versions = RIGSVersion.objects.get_for_multiple_models(
models_for_feed())
return versions.order_by("-revision__date_created")
def get_context_data(self, **kwargs):
# Call the base implementation first to get a context
context = super(ActivityFeed, self).get_context_data(**kwargs)
maxTimeDelta = datetime.timedelta(hours=1)
items = []
for thisVersion in context['object_list']:
thisVersion.withPrevious = False
if len(items) >= 1:
timeDiff = items[-1].revision.date_created - thisVersion.revision.date_created
timeTogether = timeDiff < maxTimeDelta
sameUser = thisVersion.revision.user_id == items[-1].revision.user_id
thisVersion.withPrevious = timeTogether & sameUser
items.append(thisVersion)
return context