mirror of
https://github.com/nottinghamtec/PyRIGS.git
synced 2026-01-17 13:32:15 +00:00
* FEAT: Initial work on revision history for assets The revision history for individual items mostly works, though it shows database ID where it should show asset ID. Recent changes feed isn't yet done. * FEAT: Initial implementation of asset activity stream * CHORE: Fix pep8 * FIX: Asset history table 'branding' * FIX: Individual asset version history is now correctly filtered * FEAT: Make revision history for suppliers accessible * CHORE: *sings* And a pep8 in a broken tree... * Refactored out duplicated code from `AssetVersionHistory * CHORE: pep8 And another random bit of wierd whitespace I found Co-authored-by: Matthew Smith <mattysmith22@googlemail.com> Closes #358
259 lines
8.5 KiB
Python
259 lines
8.5 KiB
Python
|
|
|
|
import logging
|
|
import datetime
|
|
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.shortcuts import get_object_or_404
|
|
from django.views import generic
|
|
from django.utils.functional import cached_property
|
|
from django.db.models import IntegerField, EmailField, TextField
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from reversion.models import Version, VersionQuerySet
|
|
from diff_match_patch import diff_match_patch
|
|
|
|
from RIGS import models
|
|
|
|
logger = logging.getLogger('tec.pyrigs')
|
|
|
|
|
|
class FieldComparison(object):
|
|
def __init__(self, field=None, old=None, new=None):
|
|
self.field = field
|
|
self._old = old
|
|
self._new = new
|
|
|
|
def display_value(self, value):
|
|
if isinstance(self.field, IntegerField) and len(self.field.choices) > 0:
|
|
return [x[1] for x in self.field.choices if x[0] == value][0]
|
|
if self.field.name == "risk_assessment_edit_url":
|
|
return "completed" if value else ""
|
|
return value
|
|
|
|
@property
|
|
def old(self):
|
|
return self.display_value(self._old)
|
|
|
|
@property
|
|
def new(self):
|
|
return self.display_value(self._new)
|
|
|
|
@property
|
|
def long(self):
|
|
if isinstance(self.field, EmailField):
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def linebreaks(self):
|
|
if isinstance(self.field, TextField):
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def diff(self):
|
|
oldText = str(self.display_value(self._old)) or ""
|
|
newText = str(self.display_value(self._new)) or ""
|
|
dmp = diff_match_patch()
|
|
diffs = dmp.diff_main(oldText, newText)
|
|
dmp.diff_cleanupSemantic(diffs)
|
|
|
|
outputDiffs = []
|
|
|
|
for (op, data) in diffs:
|
|
if op == dmp.DIFF_INSERT:
|
|
outputDiffs.append({'type': 'insert', 'text': data})
|
|
elif op == dmp.DIFF_DELETE:
|
|
outputDiffs.append({'type': 'delete', 'text': data})
|
|
elif op == dmp.DIFF_EQUAL:
|
|
outputDiffs.append({'type': 'equal', 'text': data})
|
|
return outputDiffs
|
|
|
|
|
|
class ModelComparison(object):
|
|
|
|
def __init__(self, old=None, new=None, version=None, excluded_keys=[]):
|
|
# recieves two objects of the same model, and compares them. Returns an array of FieldCompare objects
|
|
try:
|
|
self.fields = old._meta.get_fields()
|
|
except AttributeError:
|
|
self.fields = new._meta.get_fields()
|
|
|
|
self.old = old
|
|
self.new = new
|
|
self.excluded_keys = excluded_keys
|
|
self.version = version
|
|
|
|
@cached_property
|
|
def revision(self):
|
|
return self.version.revision
|
|
|
|
@cached_property
|
|
def field_changes(self):
|
|
changes = []
|
|
for field in self.fields:
|
|
field_name = field.name
|
|
|
|
if field_name in self.excluded_keys:
|
|
continue # if we're excluding this field, skip over it
|
|
|
|
try:
|
|
oldValue = getattr(self.old, field_name, None)
|
|
except ObjectDoesNotExist:
|
|
oldValue = None
|
|
|
|
try:
|
|
newValue = getattr(self.new, field_name, None)
|
|
except ObjectDoesNotExist:
|
|
newValue = None
|
|
|
|
bothBlank = (not oldValue) and (not newValue)
|
|
if oldValue != newValue and not bothBlank:
|
|
comparison = FieldComparison(field, oldValue, newValue)
|
|
changes.append(comparison)
|
|
|
|
return changes
|
|
|
|
@cached_property
|
|
def fields_changed(self):
|
|
return len(self.field_changes) > 0
|
|
|
|
@cached_property
|
|
def item_changes(self):
|
|
# Recieves two event version objects and compares their items, returns an array of ItemCompare objects
|
|
|
|
item_type = ContentType.objects.get_for_model(models.EventItem)
|
|
old_item_versions = self.version.parent.revision.version_set.filter(content_type=item_type)
|
|
new_item_versions = self.version.revision.version_set.filter(content_type=item_type)
|
|
|
|
comparisonParams = {'excluded_keys': ['id', 'event', 'order']}
|
|
|
|
# Build some dicts of what we have
|
|
item_dict = {} # build a list of items, key is the item_pk
|
|
for version in old_item_versions: # put all the old versions in a list
|
|
if version.field_dict["event_id"] == int(self.new.pk):
|
|
compare = ModelComparison(old=version._object_version.object, **comparisonParams)
|
|
item_dict[version.object_id] = compare
|
|
|
|
for version in new_item_versions: # go through the new versions
|
|
if version.field_dict["event_id"] == int(self.new.pk):
|
|
try:
|
|
compare = item_dict[version.object_id] # see if there's a matching old version
|
|
compare.new = version._object_version.object # then add the new version to the dictionary
|
|
except KeyError: # there's no matching old version, so add this item to the dictionary by itself
|
|
compare = ModelComparison(new=version._object_version.object, **comparisonParams)
|
|
|
|
item_dict[version.object_id] = compare # update the dictionary with the changes
|
|
|
|
changes = []
|
|
for (_, compare) in list(item_dict.items()):
|
|
if compare.fields_changed:
|
|
changes.append(compare)
|
|
|
|
return changes
|
|
|
|
@cached_property
|
|
def items_changed(self):
|
|
return len(self.item_changes) > 0
|
|
|
|
@cached_property
|
|
def anything_changed(self):
|
|
return self.fields_changed or self.items_changed
|
|
|
|
|
|
class RIGSVersionManager(VersionQuerySet):
|
|
def get_for_multiple_models(self, model_array):
|
|
content_types = []
|
|
for model in model_array:
|
|
content_types.append(ContentType.objects.get_for_model(model))
|
|
|
|
return self.filter(content_type__in=content_types).select_related("revision").order_by("-pk")
|
|
|
|
|
|
class RIGSVersion(Version):
|
|
class Meta:
|
|
proxy = True
|
|
|
|
objects = RIGSVersionManager.as_manager()
|
|
|
|
@cached_property
|
|
def parent(self):
|
|
thisId = self.object_id
|
|
|
|
versions = RIGSVersion.objects.get_for_object_reference(self.content_type.model_class(), thisId).select_related("revision", "revision__user").all()
|
|
|
|
try:
|
|
previousVersion = versions.filter(revision_id__lt=self.revision_id).latest(
|
|
field_name='revision__date_created')
|
|
except ObjectDoesNotExist:
|
|
return False
|
|
|
|
return previousVersion
|
|
|
|
@cached_property
|
|
def changes(self):
|
|
return ModelComparison(
|
|
version=self,
|
|
new=self._object_version.object,
|
|
old=self.parent._object_version.object if self.parent else None
|
|
)
|
|
|
|
|
|
class VersionHistory(generic.ListView):
|
|
model = RIGSVersion
|
|
template_name = "RIGS/version_history.html"
|
|
paginate_by = 25
|
|
|
|
def get_queryset(self, **kwargs):
|
|
return RIGSVersion.objects.get_for_object(self.get_object()).select_related("revision", "revision__user").all()
|
|
|
|
def get_object(self, **kwargs):
|
|
return get_object_or_404(self.kwargs['model'], pk=self.kwargs['pk'])
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(VersionHistory, self).get_context_data(**kwargs)
|
|
context['object'] = self.get_object()
|
|
|
|
return context
|
|
|
|
|
|
class ActivityTable(generic.ListView):
|
|
model = RIGSVersion
|
|
template_name = "RIGS/activity_table.html"
|
|
paginate_by = 25
|
|
|
|
def get_queryset(self):
|
|
versions = RIGSVersion.objects.get_for_multiple_models([models.Event, models.Venue, models.Person, models.Organisation, models.EventAuthorisation])
|
|
return versions
|
|
|
|
|
|
class ActivityFeed(generic.ListView):
|
|
model = RIGSVersion
|
|
template_name = "RIGS/activity_feed_data.html"
|
|
paginate_by = 25
|
|
|
|
def get_queryset(self):
|
|
versions = RIGSVersion.objects.get_for_multiple_models([models.Event, models.Venue, models.Person, models.Organisation, models.EventAuthorisation])
|
|
return versions
|
|
|
|
def get_context_data(self, **kwargs):
|
|
# Call the base implementation first to get a context
|
|
context = super(ActivityFeed, self).get_context_data(**kwargs)
|
|
|
|
maxTimeDelta = datetime.timedelta(hours=1)
|
|
|
|
items = []
|
|
|
|
for thisVersion in context['object_list']:
|
|
thisVersion.withPrevious = False
|
|
if len(items) >= 1:
|
|
timeDiff = items[-1].revision.date_created - thisVersion.revision.date_created
|
|
timeTogether = timeDiff < maxTimeDelta
|
|
sameUser = thisVersion.revision.user_id == items[-1].revision.user_id
|
|
thisVersion.withPrevious = timeTogether & sameUser
|
|
|
|
items.append(thisVersion)
|
|
|
|
return context
|