dodan user

This commit is contained in:
emaric 2024-01-18 12:03:37 +01:00
parent 75c00b0e91
commit 298219ab30
32 changed files with 912 additions and 2 deletions

View File

@ -8,6 +8,9 @@ class PointSerializer(serializers.Serializer):
def to_representation(self, instance):
return {'lat': instance.y, 'lon': instance.x}
def to_internal_value(self, data):
return Point(float(data['lon']), float(data['lat']))
class ObjektSigurnostiSerializer(serializers.ModelSerializer):
lokacija = PointSerializer()

View File

@ -45,7 +45,7 @@ class ObjektSigurnostiDetailTest(APITestCase):
def test_update_objekt_sigurnosti(self):
data = {
'lokacija': {'lat':18.457, 'lon':45.124},
'lokacija': {'lat': 45.123, 'lon': 18.456},
'naziv' : 'updated-naziv',
}

View File

@ -52,7 +52,7 @@ INSTALLED_APPS = [
'drf_yasg',
# Custom apps:
'plovidba_aplikacija',
'user',
]
MIDDLEWARE = [
@ -104,6 +104,13 @@ DATABASES["default"]["TEST"] = {
"NAME": ENV_STR("DATABASE_TEST_NAME", "test_plovidba_dev_db")
}
AUTH_USER_MODEL = "user.User"
AUTHENTICATION_BACKENDS = [
# Needed to login by username in Django admin, regardless of `allauth`
"django.contrib.auth.backends.ModelBackend",
]
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators

View File

@ -41,6 +41,7 @@ urlpatterns = [
path('api/', include('plovidba_aplikacija.urls')),
path("swagger/", api_schema_view.with_ui("swagger", cache_timeout=0), name="schema-swagger-ui"),
path("redoc/", api_schema_view.with_ui("redoc", cache_timeout=0), name="schema-redoc"),
path('user/', include('user.urls')),
]
if settings.SHOW_API_DOCS:

0
user/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

47
user/admin.py Normal file
View File

@ -0,0 +1,47 @@
from django.contrib import admin
# Register your models here.
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.contrib.auth.admin import UserAdmin
from .models import Organization, PasswordResetRequest
class OrganizationAdmin(admin.ModelAdmin):
list_display = ('name', 'contact_email')
admin.site.register(Organization, OrganizationAdmin) # noqa
class CustomUserAdmin(UserAdmin):
fieldsets = (
(None, {'fields': ('email', 'password')}),
(('Personal info'), {'fields': ('first_name', 'last_name', 'organization')}),
(('Permissions'), {
'fields': ('is_active', 'email_confirmed', 'is_staff', 'is_superuser', 'groups', 'user_permissions')
}),
(('Important dates'), {'fields': ('last_login', 'date_joined')}),
)
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('email', 'password1', 'password2'),
}),
)
list_display = (
'email', 'first_name', 'last_name', 'is_staff',
'email_confirmed', 'organization', 'language_preference'
)
list_filter = ('organization', )
search_fields = ('email', 'first_name', 'last_name')
ordering = ('email',)
readonly_fields = ('date_joined',)
admin.site.register(get_user_model(), CustomUserAdmin) # noqa
@admin.register(PasswordResetRequest)
class PasswordResetRequestAdmin(admin.ModelAdmin):
list_display = ('received_on', 'user', 'uid', 'confirmed')
list_filter = ('received_on', )

6
user/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class UserConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'user'

View File

View File

@ -0,0 +1,42 @@
from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand
from user.models import User
class Command(BaseCommand):
help = """
Programatically set application permissions and groups\n
usage: python manage.py create_groups_and_permissions.py
"""
def handle(self, *args, **options):
# get or create groups
admin_group, created = Group.objects.get_or_create(name='Admin')
operater_group, created = Group.objects.get_or_create(name='Operater')
viewer_group, created = Group.objects.get_or_create(name='Viewer')
# define content types
ct_user = ContentType.objects.get_for_model(User)
# define permissions
application_permissions = [
# (codename, name, content_type, list_of_groups_to_assign_perm)
('add_user', 'Can add new users', ct_user, [admin_group]),
('change_user', 'Can change existing user', ct_user, [admin_group]),
('delete_user', 'Can delete existing user', ct_user, [admin_group]),
('add_data', 'Can add new data', ct_user, [admin_group, operater_group]),
]
# get or create permissions and add them to appropriate groups
for permission_tuple in application_permissions:
permission_obj, created = Permission.objects.get_or_create(
codename=permission_tuple[0],
name=permission_tuple[1],
content_type=permission_tuple[2]
)
for group in permission_tuple[3]:
group.permissions.add(permission_obj)

View File

@ -0,0 +1,71 @@
import csv
import os
import sys
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.management.base import BaseCommand
from user.models import Organization, User
class Command(BaseCommand):
help = """
Programatically create entries for model User
"""
def handle(self, *args, **options):
csv_fpath = os.path.join(settings.RESOURCES_DIR, 'users.csv')
csv.field_size_limit(sys.maxsize)
created_entries = 0
existing_entries = 0
admin_group, created = Group.objects.get_or_create(name='Admin')
operater_group, created = Group.objects.get_or_create(name='Operater')
with open(csv_fpath) as f:
reader = csv.DictReader(f)
default_organization = Organization.objects.get_or_create(name='Državna geodetska uprava')[0]
for row in reader:
username = row['usrname']
first_name = row['ime']
last_name = row['prezime']
email = row['email']
email_confirmed = True
if not email:
print("User {} {} doesn't have email but it's required. Setting fake email...".format(
first_name, last_name
))
# set fake email
email = '{}.{}@example.com'.format(first_name, last_name)
email_confirmed = False
obj, created = User.objects.get_or_create(
username=username,
first_name=first_name,
last_name=last_name,
email=email,
email_confirmed=email_confirmed,
organization=default_organization
)
if row['rola'] == 'operater':
obj.groups.add(operater_group)
elif row['rola'] == 'admin':
obj.groups.add(admin_group)
else:
obj.groups.clear()
if created:
created_entries += 1
print("Kreiran user {} {}".format(obj.first_name, obj.last_name))
else:
existing_entries += 1
print("Created: {}".format(created_entries))
print("Existing: {}".format(existing_entries))

View File

@ -0,0 +1,44 @@
# Generated by Django 4.2.9 on 2024-01-17 11:44
import django.contrib.auth.models
import django.contrib.auth.validators
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'abstract': False,
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
]

View File

Binary file not shown.

88
user/models.py Normal file
View File

@ -0,0 +1,88 @@
from django.db import models
# Create your models here.
from django.contrib.auth.models import (AbstractBaseUser,BaseUserManager, PermissionsMixin)
class UserManager(BaseUserManager):
'''Manager for users'''
def create_user(self, email, password=None, **extra): #kreiran regular user
if not email:
raise ValueError('Valid e-mail must be provided!')
user = self.model(email=self.normalize_email(email), **extra)
user.username
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password, first_name, last_name): #kreiran superuser s dodatnim zahtjevima
user = self.create_user(email, password, **{"first_name": first_name, "last_name": last_name})
user.is_staff = True
user.is_superuser = True
user.save(using=self._db)
return user
class Organization(models.Model):
name = models.CharField(max_length=128)
contact_email = models.EmailField(blank=True)
class User(AbstractBaseUser, PermissionsMixin):
LANGUAGE_CHOICES = (
('hr', 'Hrvatski'),
('en', 'English')
)
username = models.CharField(max_length=255, blank=True)
email = models.EmailField(max_length=255, unique=True)
is_active = models.BooleanField(default=True)
is_staff = models.BooleanField(default=False)
first_name = models.CharField(max_length=127, blank=True)
last_name = models.CharField(max_length=127, blank=True)
organization = models.ForeignKey(Organization, on_delete=models.SET_NULL, null=True) #povezuje korisnike s organizacijom
date_joined = models.DateTimeField(auto_now_add=True)
email_confirmed = models.BooleanField(default=False)
language_preference = models.CharField(choices=LANGUAGE_CHOICES, max_length=8, default='hr')
objects = UserManager()
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['first_name', 'last_name']
@property
def is_admin(self):
return self.groups.exists() and self.groups.first().name == 'Admin'
@property
def is_editor(self):
return self.groups.exists() and self.groups.first().name == 'Editor'
@property
def is_wiever(self):
return self.groups.exists() and self.groups.first().name == 'Wiever'
@property
def app_role_name(self):
return self.groups.first().name if self.groups.exists() else ''
@property
def app_role_id(self):
return self.groups.first().id if self.groups.exists() else None
@property
def full_name(self):
return '{} {}'.format(self.first_name, self.last_name)
class PasswordResetRequest(models.Model):
received_on = models.DateTimeField(auto_now_add=True)
uid = models.UUIDField()
user = models.ForeignKey(User, on_delete=models.CASCADE)
confirmed = models.BooleanField(default=False)
confirmed_on = models.DateTimeField(null=True)
def __str__(self):
return self.received_on.isoformat()

46
user/permissions.py Normal file
View File

@ -0,0 +1,46 @@
from django.contrib.auth.models import Group
from rest_framework import permissions
class UserPermission(permissions.BasePermission):
"""
Custom permission to only allow users with permissions
to view/add/update/delete the users.
"""
def has_permission(self, request, view):
# we cover delete permissions in has_object_permissions
if request.method == 'DELETE':
return True
if request.method in permissions.SAFE_METHOD:
return True
if not request.user.app_role_id:
return False
app_role = Group.objects.get(pk=request.user.app_role_id)
return 'add_user' in app_role.permissions.all().values_list(
'codename', flat=True)
def has_object_permission(self, request, view, obj):
if request.method == 'DELETE':
return request.user == obj
if not request.user.app_role_id:
return False
if request.method in permissions.SAFE_METHODS:
return True
app_role = Group.objects.get(pk=request.user.app_role_id)
permission_dict = {
'PATCH': 'change_user',
'PUT': 'change_user'
}
perm = permission_dict[request.method]
return perm in app_role.permissions.all().values_list('codename', flat=True)

162
user/serializers.py Normal file
View File

@ -0,0 +1,162 @@
from typing import Any, Dict
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.password_validation import validate_password
from django.core import exceptions as django_exceptions
from django.utils.translation import gettext as _
from rest_framework import serializers
from rest_framework_simplejwt.exceptions import InvalidToken
from rest_framework_simplejwt.serializers import (TokenObtainPairSerializer,
TokenRefreshSerializer)
User = get_user_model() #kreirana varijabla User koja se referencira na User model
class UserSerializer(serializers.ModelSerializer): #serializer for the user object
class Meta:
model = get_user_model()
fields = ['email', 'password']
extra_kwargs = {'password': {'write_only': True, 'min_length': 8}}
def create(self, validated_data):
return get_user_model().objects.create_user(**validated_data)
def update(self, instance, validated_data):
password = validated_data.pop('password', None)
user = super().update(instance, validated_data)
if password:
user.set_password(password)
user.save()
return user
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
def validate(self, attrs):
data = super().validate(attrs)
refresh = self.get_token(self.user)
# Custom token names
data[settings.AUTH_REFRESH_TOKEN_NAME] = str(refresh)
data[settings.AUTH_ACCESS_TOKEN_NAME] = str(refresh.access_token)
# used to keep exact expiration time, will be removed from body before sending to client!
data["lifetime"] = refresh.lifetime
return data
@classmethod
def get_token(cls, user):
token = super(CustomTokenObtainPairSerializer, cls).get_token(user)
# TODO: Get user group goes here
# Add custom claims
token['username'] = user.email if user.USERNAME_FIELD == 'email' else user.username
token['is_staff'] = user.is_staff
token['is_superuser'] = user.is_superuser
return token
class CustomCookieTokenRefreshSerializer(TokenRefreshSerializer):
refresh = None
def validate(self, attrs):
# Check if refresh token was included
refresh_token = self.context['request'].COOKIES.get(settings.AUTH_REFRESH_TOKEN_NAME)
if (not refresh_token):
raise InvalidToken(_('No valid refresh token found!'))
attrs['refresh'] = refresh_token
refresh = self.token_class(attrs["refresh"])
data = {settings.AUTH_ACCESS_TOKEN_NAME: str(refresh.access_token)}
if settings.SIMPLE_JWT.get("ROTATE_REFRESH_TOKENS"):
if settings.SIMPLE_JWT.get("BLACKLIST_AFTER_ROTATION"):
try:
# Attempt to blacklist the given refresh token
refresh.blacklist()
except AttributeError:
pass
refresh.set_jti()
refresh.set_exp()
refresh.set_iat()
data["refresh"] = str(refresh)
data["lifetime"] = refresh.lifetime
return data
class PasswordSerializer(serializers.Serializer):
new_password = serializers.CharField(style={'input_type': 'password'})
status_codes = {
"This field may not be blank.": 601,
"This password is too short. It must contain at least 8 characters.": 602,
"This password is entirely numeric.": 603,
"This password is too common.": 604,
"The password is too similar to the username.": 605,
"The password is too similar to the email address.": 606,
"The password is too similar to the first name.": 607,
"The password is too similar to the last name.": 608
}
def validate(self, attrs):
user = self.context['request'].user
assert user is not None
try:
validate_password(attrs['new_password'], user)
except django_exceptions.ValidationError as e:
errors = []
for msg in list(e.messages):
try:
status_code = self.status_codes[msg]
except Exception:
status_code = 600
errors.append({'code': status_code, 'message': msg})
raise serializers.ValidationError({
'new_password_errors': errors
})
return super(PasswordSerializer, self).validate(attrs)
class PasswordChangeSerializer(PasswordSerializer):
old_password = serializers.CharField(style={"input_type": "password"})
class PasswordResetSerializer(serializers.Serializer):
email = serializers.CharField()
def validate(self, attrs):
email = attrs['email']
try:
User.objects.get(email=email)
except User.DoesNotExist:
raise serializers.ValidationError({
'email': "There is no user with this email"
})
return super(PasswordResetSerializer, self).validate(attrs)
class PasswordResetConfirmSerializer(PasswordSerializer):
token = serializers.CharField()
class UserDetailSerializer(serializers.ModelSerializer):
"""Detail serializer for the user object."""
organization_name = serializers.ReadOnlyField(source='organization.name')
class Meta:
model = get_user_model()
fields = ['id', 'first_name', 'last_name', 'username', 'email', 'organization_name', 'language_preference']
class UserDetailUpdateSerializer(serializers.ModelSerializer):
class Meta:
model = get_user_model()
fields = ['first_name', 'last_name', 'language_preference']

3
user/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

18
user/urls.py Normal file
View File

@ -0,0 +1,18 @@
from django.urls import path
from user import views
from rest_framework_simplejwt.views import TokenVerifyView
app_name = 'user'
urlpatterns = [
path('create/', views.CreateUserView.as_view(), name='create'),
path('activate/<str:uidb64>/<str:token>/', views.activate, name='activate'),
path('token/', views.CustomObtainTokenPairView.as_view(), name='token_obtain_pair'),
path('token/refresh/', views.CustomCookieTokenRefreshView.as_view(), name='token_refresh'),
path('token/verify/', TokenVerifyView.as_view(), name='token_verify'),
path('token/logout/', views.LogoutView.as_view(), name='token_logout'),
path('password-reset/', views.PasswordResetView.as_view(), name='password_reset'),
path('password-reset/<str:uid>/confirm/', views.PasswordResetConfirmView.as_view(), name='password_reset_confirm'),
path('me/', views.RequestUserDetailView.as_view(), name='user_detail'),
path('me/change-password/', views.ChangePasswordView.as_view(), name='change_password'),
path('<int:pk>/delete/', views.DeleteUserView.as_view(), name='user_delete')
]

42
user/utils.py Normal file
View File

@ -0,0 +1,42 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.tokens import default_token_generator
from django.core.mail import EmailMultiAlternatives
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_decode, urlsafe_base64_encode
class Util:
@staticmethod
def send_email(html_content, **data):
try:
email = EmailMultiAlternatives(**data)
email.attach_alternative(html_content, "text/html")
email.send()
except Exception:
raise Exception("Sending e-mail went wrong")
@staticmethod
def get_token(user):
try:
data = {
"uidb64": urlsafe_base64_encode(force_bytes(user.pk)),
"token": default_token_generator.make_token(user)
}
except Exception:
return None
return data
@staticmethod
def check_token(uidb64, token):
try:
User = get_user_model()
uid = urlsafe_base64_decode(uidb64).decode()
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
user = None
# Check token
if user is not None and default_token_generator.check_token(user, token):
return user
raise Exception("Token invalid")

330
user/views.py Normal file
View File

@ -0,0 +1,330 @@
from django.shortcuts import render
# Create your views here.
import uuid
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.tokens import default_token_generator
from django.contrib.sites.shortcuts import get_current_site
from django.core.mail import BadHeaderError, send_mail
from django.http import HttpResponse
from django.shortcuts import redirect
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from drf_yasg.utils import swagger_auto_schema
from rest_framework import generics, status
from rest_framework.permissions import AllowAny
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import (TokenObtainPairView,
TokenRefreshView)
from .models import PasswordResetRequest, User
from .permissions import UserPermission
from .serializers import (CustomCookieTokenRefreshSerializer,
CustomTokenObtainPairSerializer,
PasswordResetConfirmSerializer,
PasswordResetSerializer, PasswordSerializer,
UserDetailSerializer, UserDetailUpdateSerializer,
UserSerializer)
from .utils import Util
def activate(request, uidb64, token): # TODO: rewrite to CBV
try:
user = Util.check_token(uidb64, token)
except Exception:
return HttpResponse("Unable to verify your e-mail adress")
if user is not None:
user.email_confirmed = True
user.save()
if request.user.is_authenticated:
messages.success(request, "Hvala Vam na potvrdi email adrese.")
return redirect(settings.FRONTEND_URL)
else:
msg = "Hvala Vam na potvrdi email adrese. Sad se možete ulogirati u svoj korisnički račun."
messages.success(request, msg)
return redirect(settings.FRONTEND_URL)
else:
return HttpResponse('Vaš korisnički račun je već aktiviran ili aktivacijski link nije ispravan.')
class CreateUserView(generics.CreateAPIView):
"""Create a new user in the system."""
serializer_class = UserSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
if settings.AUTH_EMAIL_VERIFICATION in ["mandatory", "optional"]:
try:
self.send_confirmation_email(request, serializer.instance)
except Exception:
return Response(
_("User was created but server wasn't able to send e-mail!"),
status=status.HTTP_202_ACCEPTED
)
headers = self.get_success_headers(serializer.data)
response_msg = {
"account_created": serializer.data,
"info": _("Confirmation e-mail was sent to your e-mail address, please confirm it to start using this app!")
}
return Response(response_msg, status=status.HTTP_201_CREATED, headers=headers)
def send_confirmation_email(self, request, user):
token_data = Util.get_token(user)
relative_link = reverse('user:activate', kwargs=token_data)
link = request.build_absolute_uri(relative_link)
# Variables used in both templates
template_vars = {
'email': user.email,
'link': link,
'greeting': _("Hi,"),
'info': _("Please click on the link to confirm your registration!"),
'description_msg': _("Your e-mail:"),
}
text_content = render_to_string('acc_activate_email.txt', template_vars)
# Adding extra variables to html template
html_content = render_to_string('acc_activate_email.html', {
**template_vars,
'bttn_text': _("Confirm email"),
'alternate_text': _("Or go to link:")
})
data = {
"body": text_content,
"to": [user.email],
"subject": _("Please confirm your e-mail"),
}
Util.send_email(html_content, **data)
return
class CustomObtainTokenPairView(TokenObtainPairView):
permission_classes = (AllowAny,)
serializer_class = CustomTokenObtainPairSerializer
def finalize_response(self, request, response, *args, **kwargs):
if response.data.get(settings.AUTH_REFRESH_TOKEN_NAME):
# # # Move refresh token from body to HttpOnly cookie
refresh_token_name = settings.AUTH_REFRESH_TOKEN_NAME
persist = request.data.get('persist', False)
max_age = response.data['lifetime'] if persist else None
response.set_cookie(
refresh_token_name,
response.data[refresh_token_name],
max_age=max_age,
secure=settings.SIMPLE_JWT['AUTH_COOKIE_SECURE'],
httponly=settings.SIMPLE_JWT['AUTH_COOKIE_HTTP_ONLY'],
samesite=settings.SIMPLE_JWT['AUTH_COOKIE_SAMESITE']
)
# Remove from body
del response.data[refresh_token_name]
del response.data["lifetime"]
return super().finalize_response(request, response, *args, **kwargs)
class CustomCookieTokenRefreshView(TokenRefreshView):
serializer_class = CustomCookieTokenRefreshSerializer
def finalize_response(self, request, response, *args, **kwargs): #purpose is to customize the HTTP response generated by the view after refreshing an access token.
if response.data.get(settings.AUTH_REFRESH_TOKEN_NAME):
# # # Move refresh token from body to HttpOnly cookie
refresh_token_name = settings.AUTH_REFRESH_TOKEN_NAME
persist = request.data.get('persist', False)
max_age = response.data['lifetime'] if persist else None
response.set_cookie(
refresh_token_name,
response.data[refresh_token_name],
max_age=max_age,
secure=settings.SIMPLE_JWT['AUTH_COOKIE_SECURE'],
httponly=settings.SIMPLE_JWT['AUTH_COOKIE_HTTP_ONLY'],
samesite=settings.SIMPLE_JWT['AUTH_COOKIE_SAMESITE']
)
# Remove from body
del response.data[refresh_token_name]
del response.data["lifetime"]
return super().finalize_response(request, response, *args, **kwargs)
class LogoutView(APIView):
renderer_classes = [JSONRenderer]
permission_classes = []
@swagger_auto_schema(
responses={'200': 'OK', '400': 'Bad Request'},
operation_id='LogoutUser',
operation_description='Logout user and clean cookies'
)
def post(self, request, *args, **kwargs):
response = Response()
response.set_cookie(settings.AUTH_REFRESH_TOKEN_NAME, None, max_age=1, httponly=True)
response.set_cookie("sessionid", None, max_age=1, httponly=True) # logout from django admin!
return response
class RequestUserDetailView(APIView):
renderer_classes = [JSONRenderer]
permission_classes = []
@swagger_auto_schema(
responses={'200': 'OK', '400': 'Bad Request'},
operation_id='UserDetail',
operation_description='Get details for request user'
)
def get(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response(status=status.HTTP_400_BAD_REQUEST)
user = request.user
data = UserDetailSerializer(user).data
return Response(status=status.HTTP_200_OK, data=data)
@swagger_auto_schema(
responses={'200': 'OK', '400': 'Bad Request'},
operation_id='UserDetailUpdate',
operation_description='Update details for request user',
request_body=UserDetailUpdateSerializer
)
def put(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response(status=status.HTTP_400_BAD_REQUEST)
user = request.user
data = self.request.data
user.first_name = data.get('first_name', user.first_name)
user.last_name = data.get('last_name', user.last_name)
user.language_preference = data.get('language_preference', user.first_name)
user.save()
user_data = UserDetailSerializer(user).data
return Response(status=status.HTTP_200_OK, data=user_data)
class DeleteUserView(generics.DestroyAPIView):
permission_classes = [UserPermission]
queryset = User.objects.all()
class ChangePasswordView(generics.UpdateAPIView):
queryset = User.objects.all()
serializer_class = PasswordSerializer
def update(self, request, *args, **kwargs):
user = self.request.user
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if not user.check_password(serializer.data.get("old_password")):
return Response({"old_password": ["Wrong password."]}, status=status.HTTP_400_BAD_REQUEST)
user.set_password(serializer.data.get("new_password"))
user.save()
response = {'message': 'Password updated successfully'}
return Response(response, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class PasswordResetView(generics.GenericAPIView):
"""
Use this endpoint to send email to user with password reset key.
"""
serializer_class = PasswordResetSerializer
authentication_classes = []
permission_classes = []
def post(self, request, *args, **kwargs):
serializer = PasswordResetSerializer(data=request.data)
if serializer.is_valid():
try:
user = User.objects.get(email=serializer.data.get('email'))
except User.DoesNotExist:
return Response(status=status.HTTP_400_BAD_REQUEST)
# Build the password reset link
current_site = get_current_site(self.request)
domain = current_site.domain
uid = uuid.uuid4()
token = default_token_generator.make_token(user)
reset_link = "https://{domain}/password-reset/{uid}/{token}/".format(domain=domain, uid=uid, token=token)
# create password reset obj
PasswordResetRequest.objects.create(user=user, uid=uid, confirmed=False)
# send e-mail
subject = "Password reset"
message = (
"You're receiving this email because you requested a password reset for your account.\n\n"
"Please visit this url to set new password:\n{reset_link}".format(reset_link=reset_link)
)
from_email = settings.DEFAULT_FROM_EMAIL
try:
send_mail(subject, message, from_email, [user.email])
except BadHeaderError:
return Response({"error": "Invalid header found."}, status=status.HTTP_400_BAD_REQUEST)
response = {"message": "E-mail successfully sent."}
return Response(response, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class PasswordResetConfirmView(generics.GenericAPIView):
"""
Use this endpoint to finish reset password process.
"""
permission_classes = []
authentication_classes = []
def get_serializer_class(self):
return PasswordResetConfirmSerializer
def post(self, request, **kwargs):
try:
uid = uuid.UUID(self.kwargs['uid'])
obj = PasswordResetRequest.objects.get(uid=uid)
user = obj.user
except (ValueError, PasswordResetRequest.DoesNotExist):
return Response(status=status.HTTP_400_BAD_REQUEST, data={'error': 'UID is not valid'})
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# check if token is valid
token_valid = default_token_generator.check_token(obj.user, serializer.data["token"])
if not token_valid:
return Response(status=status.HTTP_400_BAD_REQUEST, data={'error': 'Token not valid'})
# set new password
user.set_password(serializer.data["new_password"])
user.save()
# update object in db
obj.confirmed = True
obj.confirmed_on = timezone.now()
obj.save()
return Response(status=status.HTTP_200_OK)