diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000000000000000000000000000000000..8dd399ab55bce8a4a1ddfd0c98170f771f6e5c35 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +extend-ignore = E203 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7f7a6a44a78c905530b2f3f6a18255add5539f32..9e512450a6032fa68687d434fac3c1cf0c54e603 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,6 +11,7 @@ repos: rev: 5.12.0 hooks: - id: isort + args: ["--profile", "black"] - repo: https://github.com/pycqa/flake8 rev: 6.0.0 hooks: diff --git a/transporte/__init__.py b/transporte/__init__.py index 463f27ba6bf135527d412e89b1f238442ae5e761..c7eb081b533d0a648883787cb02a2f0b331b2710 100644 --- a/transporte/__init__.py +++ b/transporte/__init__.py @@ -11,13 +11,14 @@ login_manager = LoginManager() def create_app(): """Construct the core application.""" app = Flask(__name__) - app.config.from_pyfile('config.cfg') + app.config.from_pyfile("config.cfg") db.init_app(app) mail.init_app(app) login_manager.init_app(app) with app.app_context(): from . import views # noqa: F401 + db.create_all() return app diff --git a/transporte/forms.py b/transporte/forms.py index ca42986388b8d47b8d216d86bc07a5dc009a11a1..0cabdcd258907c6ebf29bb0262e7ab8f639f3af0 100644 --- a/transporte/forms.py +++ b/transporte/forms.py @@ -1,65 +1,76 @@ - import datetime from flask_wtf import FlaskForm from flask_wtf.file import FileField from wtforms import SelectField, StringField, SubmitField, TextAreaField -from wtforms.fields import BooleanField -from wtforms.fields import TimeField +from wtforms.fields import BooleanField, TimeField from wtforms.validators import DataRequired, Email, Optional + # DateField from wtforms_components supports min/max depending on DateRange from wtforms_components import DateField, DateRange VehicleTypes = { - 'car': 'Car', - 'trailer': 'Car with trailer', - 'transporter': 'Transporter', - '7.5t': '7.5t', - '12t': '12t', - '18t': '18t', - '40t': '40t', - 'truck': 'Truck (unknown size)' + "car": "Car", + "trailer": "Car with trailer", + "transporter": "Transporter", + "7.5t": "7.5t", + "12t": "12t", + "18t": "18t", + "40t": "40t", + "truck": "Truck (unknown size)", } -Roles = { - 'user': 'User', - 'helpdesk': 'Helpdesk', - 'admin': 'Admin' -} +Roles = {"user": "User", "helpdesk": "Helpdesk", "admin": "Admin"} class LoginForm(FlaskForm): - login = StringField('Email', validators=[DataRequired(), Email(message='Please enter valid emailaddress')]) + login = StringField( + "Email", + validators=[DataRequired(), Email(message="Please enter valid emailaddress")], + ) class TransportForm(FlaskForm): - organizer = StringField('Organizer', validators=[DataRequired()]) - needs_organization = BooleanField('Needs organization') - origin = TextAreaField('Origin', validators=[DataRequired()]) - destination = TextAreaField('Destination', validators=[DataRequired()]) - date = DateField('Date', validators=[DataRequired(), DateRange( - min=datetime.date(year=2019, month=12, day=14), - max=datetime.date(year=2020, month=1, day=7))]) - time = TimeField('ETA', validators=[Optional()]) - vehicle = SelectField('Vehicle', validators=[DataRequired()], choices=[('', '')] + list(VehicleTypes.items())) - goods = TextAreaField('Goods', validators=[DataRequired()]) - vehicle_owner = StringField('Vehicle Owner') - orga_contact = TextAreaField('Orga Contact Person / Details', validators=[DataRequired()]) - driver_contact = TextAreaField('Driver Contact Person / Details') - comment = TextAreaField('Comment') - file_upload = FileField('Files', render_kw={'multiple': True}) - save = SubmitField('Save') - saveasnew = SubmitField('Save as new') + organizer = StringField("Organizer", validators=[DataRequired()]) + needs_organization = BooleanField("Needs organization") + origin = TextAreaField("Origin", validators=[DataRequired()]) + destination = TextAreaField("Destination", validators=[DataRequired()]) + date = DateField( + "Date", + validators=[ + DataRequired(), + DateRange( + min=datetime.date(year=2019, month=12, day=14), + max=datetime.date(year=2020, month=1, day=7), + ), + ], + ) + time = TimeField("ETA", validators=[Optional()]) + vehicle = SelectField( + "Vehicle", + validators=[DataRequired()], + choices=[("", "")] + list(VehicleTypes.items()), + ) + goods = TextAreaField("Goods", validators=[DataRequired()]) + vehicle_owner = StringField("Vehicle Owner") + orga_contact = TextAreaField( + "Orga Contact Person / Details", validators=[DataRequired()] + ) + driver_contact = TextAreaField("Driver Contact Person / Details") + comment = TextAreaField("Comment") + file_upload = FileField("Files", render_kw={"multiple": True}) + save = SubmitField("Save") + saveasnew = SubmitField("Save as new") class AddressForm(FlaskForm): - public = BooleanField('public') - address = TextAreaField('Address', validators=[DataRequired()]) + public = BooleanField("public") + address = TextAreaField("Address", validators=[DataRequired()]) class RoleForm(FlaskForm): - role = SelectField('Role', validators=[DataRequired()], choices=list(Roles.items())) + role = SelectField("Role", validators=[DataRequired()], choices=list(Roles.items())) class TransportFilterForm(FlaskForm): - day = SelectField('Day', choices=[]) + day = SelectField("Day", choices=[]) diff --git a/transporte/models.py b/transporte/models.py index aad1ada226521585293b2f35655825bfd9d07854..4bf5738f2a0b9b482d02e36ff446555240c89951 100644 --- a/transporte/models.py +++ b/transporte/models.py @@ -1,4 +1,3 @@ - from flask import Markup from flask import current_app as app from flask import flash, url_for @@ -13,61 +12,69 @@ from . import db, mail class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) login = db.Column(db.String(256), index=True, unique=True) - role = db.Column(db.String(64), default='user') - transports = db.relationship('Transport', backref='user', lazy='dynamic') - addresses = db.relationship('Address', backref='user', lazy='dynamic') + role = db.Column(db.String(64), default="user") + transports = db.relationship("Transport", backref="user", lazy="dynamic") + addresses = db.relationship("Address", backref="user", lazy="dynamic") def create_token(self): - s = Serializer(app.config['SECRET_KEY']) - token = s.dumps({'id': self.id}) + s = Serializer(app.config["SECRET_KEY"]) + token = s.dumps({"id": self.id}) return token def mail_token(self): token = self.create_token() - if app.config['DEBUG']: + if app.config["DEBUG"]: flash( - Markup('<b>DEBUG:</b> <a href={url}>{url}</a>'.format( - url=url_for('login_with_token', token=token, _external=True))), - 'warning') + Markup( + "<b>DEBUG:</b> <a href={url}>{url}</a>".format( + url=url_for("login_with_token", token=token, _external=True) + ) + ), + "warning", + ) return # send login email - msg = Message('Your LOC transport tool credentials!', recipients=[self.login]) - msg.body = ('Hi, na! \n\n' - 'Thank you for helping us keeping an overview of your transports :) \n' - 'Here is your login link: {}'.format(url_for('login_with_token', token=token, _external=True))) + msg = Message("Your LOC transport tool credentials!", recipients=[self.login]) + msg.body = ( + "Hi, na! \n\n" + "Thank you for helping us keeping an overview of your transports :) \n" + "Here is your login link: {}".format( + url_for("login_with_token", token=token, _external=True) + ) + ) mail.send(msg) @staticmethod def verify_login_token(token): - s = Serializer(app.config['SECRET_KEY']) + s = Serializer(app.config["SECRET_KEY"]) try: - data = s.loads(token, max_age=10*60) + data = s.loads(token, max_age=10 * 60) except SignatureExpired: # valid token, but expired return None except BadSignature: # invalid token return None - return User.query.get(data['id']) + return User.query.get(data["id"]) def __repr__(self): - return '<User {}>'.format(self.login) + return "<User {}>".format(self.login) class Address(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) public = db.Column(db.Boolean, default=False, nullable=False) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) address = db.Column(db.Text, nullable=False) class Transport(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) ticket_id = db.Column(db.Integer) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) organizer = db.Column(db.String(256), nullable=False) needs_organization = db.Column(db.Boolean, default=False, nullable=False) origin = db.Column(db.Text, nullable=False) @@ -82,17 +89,17 @@ class Transport(db.Model): comment = db.Column(db.Text) done = db.Column(db.Boolean, default=False, nullable=False) cancelled = db.Column(db.Boolean, default=False, nullable=False) - files = db.relationship('File', backref='transport', lazy='dynamic') + files = db.relationship("File", backref="transport", lazy="dynamic") def __repr__(self): - return '<Transport {}>'.format(self.id) + return "<Transport {}>".format(self.id) class File(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) - transport_id = db.Column(db.Integer, db.ForeignKey('transport.id'), nullable=False) + transport_id = db.Column(db.Integer, db.ForeignKey("transport.id"), nullable=False) name = db.Column(db.String(256), nullable=False) path = db.Column(db.String(256), nullable=False) def __repr__(self): - return '<File {}>'.format(self.path) + return "<File {}>".format(self.path) diff --git a/transporte/transporte.py b/transporte/transporte.py index a28198c3701fc1c4befdbc06481dc507ffc09597..8a942e0073d604acb3611960537ce1838ef0c2f6 100644 --- a/transporte/transporte.py +++ b/transporte/transporte.py @@ -8,13 +8,13 @@ from . import create_app app = create_app() -limiter = Limiter(get_remote_address, app=app,storage_uri="memory://") +limiter = Limiter(get_remote_address, app=app, storage_uri="memory://") QRcode(app) -if app.config['DEBUG']: - import logging +if app.config["DEBUG"]: import http.client + import logging http.client.HTTPConnection.debuglevel = 1 @@ -25,5 +25,5 @@ if app.config['DEBUG']: requests_log.propagate = True -if __name__ == '__main__': - app.run('0.0.0.0', debug=app.config['DEBUG']) +if __name__ == "__main__": + app.run("0.0.0.0", debug=app.config["DEBUG"]) diff --git a/transporte/views.py b/transporte/views.py index 7a430e5e02c23ea17accb87a8ecd8174d61e79f0..ecfbf26fadb9ff5c6bb85e7052b929d872356124 100644 --- a/transporte/views.py +++ b/transporte/views.py @@ -7,30 +7,44 @@ from dateutil import parser from email_validator import validate_email from flask import Markup, abort from flask import current_app as app -from flask import (escape, flash, redirect, render_template, request, - send_from_directory, url_for) +from flask import ( + escape, + flash, + redirect, + render_template, + request, + send_from_directory, + url_for, +) from flask_login import current_user, login_required, login_user, logout_user from flask_wtf import FlaskForm from jinja2 import pass_eval_context from werkzeug.utils import secure_filename from . import db, login_manager -from .forms import (AddressForm, LoginForm, RoleForm, Roles, - TransportFilterForm, TransportForm, VehicleTypes) +from .forms import ( + AddressForm, + LoginForm, + RoleForm, + Roles, + TransportFilterForm, + TransportForm, + VehicleTypes, +) from .models import Address, File, Transport, User -from .zammad_integration import close_ticket, update_ticket +from .zammad_integration import close_ticket, update_ticket # noqa: F401 -@app.route('/') +@app.route("/") @login_required def index(): todo = ( dict( - description='Todays transports', + description="Todays transports", progress=100, ), dict( - description='Overall transports', + description="Overall transports", progress=100, ), ) @@ -38,17 +52,22 @@ def index(): query = Transport.query.filter(not Transport.cancelled) try: - todo[0]['progress'] = 100 / query.filter(Transport.date == datetime.date.today()).count() * query.filter( - Transport.date == datetime.date.today()).filter(Transport.done).count() + todo[0]["progress"] = ( + 100 + / query.filter(Transport.date == datetime.date.today()).count() + * query.filter(Transport.date == datetime.date.today()) + .filter(Transport.done) + .count() + ) except ZeroDivisionError: pass try: - todo[1]['progress'] = 100 / query.count() * query.filter(Transport.done).count() + todo[1]["progress"] = 100 / query.count() * query.filter(Transport.done).count() except ZeroDivisionError: pass - return render_template('layout.html', todo=todo) + return render_template("layout.html", todo=todo) def get_user(email): @@ -62,7 +81,8 @@ def get_user(email): return user -@app.route('/login', methods=['GET', 'POST']) + +@app.route("/login", methods=["GET", "POST"]) # @limiter.limit('10/hour') def login(): loginform = LoginForm() @@ -71,75 +91,86 @@ def login(): email = loginform.login.data.lower() try: - if not app.config['DEBUG']: + if not app.config["DEBUG"]: v = validate_email(email) - email = v['email'] # replace with normalized form + email = v["email"] # replace with normalized form except Exception as e: loginform.login.errors.append(str(e)) - return render_template('login.html', loginform=loginform) + return render_template("login.html", loginform=loginform) user = get_user(email) # create token user.mail_token() - flash('Check your inbox!') + flash("Check your inbox!") - return render_template('login.html', loginform=loginform) + return render_template("login.html", loginform=loginform) -@app.route('/login/token/<token>') +@app.route("/login/token/<token>") def login_with_token(token): user = User.verify_login_token(token) if user: login_user(user, remember=True) - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash('Invalid or expired token!') - return redirect(url_for('login')) + flash("Invalid or expired token!") + return redirect(url_for("login")) + -@app.route('/login/password/<password>') +@app.route("/login/password/<password>") def login_with_password(password): - accounts_with_this_pw = [ account['email'] for account in app.config['SPECIAL_HELPDESK_ACCOUNTS'] if account['password'] == password ] + accounts_with_this_pw = [ + account["email"] + for account in app.config["SPECIAL_HELPDESK_ACCOUNTS"] + if account["password"] == password + ] if len(accounts_with_this_pw) == 0: - return redirect(url_for('login')) + return redirect(url_for("login")) elif len(accounts_with_this_pw) > 1: - app.logger.warn("Multiple sepcial helpdesk accounts with the same password are not supported!") - return redirect(url_for('login')) + app.logger.warn( + "Multiple sepcial helpdesk accounts with the same password are not supported!" # noqa: E501 + ) + return redirect(url_for("login")) user = get_user(accounts_with_this_pw[0]) if user: login_user(user) - return redirect(url_for('index')) + return redirect(url_for("index")) else: - flash('Invalid or expired token!') - return redirect(url_for('login')) + flash("Invalid or expired token!") + return redirect(url_for("login")) + -@app.route('/logout') +@app.route("/logout") @login_required def logout(): logout_user() - flash('You have been logged out.') + flash("You have been logged out.") - return redirect(url_for('login')) + return redirect(url_for("login")) -@app.route('/transports/add', defaults={'id': None}, methods=['GET', 'POST']) -@app.route('/transports/edit/<int:id>', methods=['GET', 'POST']) +@app.route("/transports/add", defaults={"id": None}, methods=["GET", "POST"]) +@app.route("/transports/edit/<int:id>", methods=["GET", "POST"]) @login_required def edit_transport(id=None): if id is not None: transport = Transport.query.get(id) - if not (current_user.id == transport.user_id or current_user.role in ['helpdesk', 'admin']): + if not ( + current_user.id == transport.user_id + or current_user.role in ["helpdesk", "admin"] + ): transport = None else: @@ -159,17 +190,19 @@ def edit_transport(id=None): # # if ticket is new, update object with zammad ticket id # - #if transport.ticket_id is None: + # if transport.ticket_id is None: # transport.ticket_id = update_ticket(transport) # db.session.add(transport) # db.session.commit() - #else: + # else: # update_ticket(transport) - for file in request.files.getlist('file_upload'): + for file in request.files.getlist("file_upload"): if file.filename: file_name = secure_filename(file.filename) - upload_dir = os.path.join(app.config['UPLOAD_DIR'], str(transport.id)) + upload_dir = os.path.join( + app.config["UPLOAD_DIR"], str(transport.id) + ) file_path = os.path.join(upload_dir, file_name) if not os.path.isdir(os.path.join(app.root_path, upload_dir)): @@ -181,128 +214,149 @@ def edit_transport(id=None): db.session.add(f) db.session.commit() - flash('Saved') + flash("Saved") if id is None: - return redirect(url_for('edit_transport', id=transport.id)) + return redirect(url_for("edit_transport", id=transport.id)) else: transportform = TransportForm() - flash('Not authorized to edit this transport!') + flash("Not authorized to edit this transport!") addresslist = Address.query - if current_user.role not in ['admin', 'helpdesk']: + if current_user.role not in ["admin", "helpdesk"]: addresslist = addresslist.filter(Address.public) addresslist = addresslist.all() - return render_template('transport_details_edit.html', transportform=transportform, transport=transport, - addresslist=addresslist) + return render_template( + "transport_details_edit.html", + transportform=transportform, + transport=transport, + addresslist=addresslist, + ) -@app.route('/transports/list') +@app.route("/transports/list") @login_required def list_transports(): transportlist = Transport.query - if current_user.role not in ['helpdesk', 'admin']: + if current_user.role not in ["helpdesk", "admin"]: transportlist = transportlist.filter(Transport.user_id == current_user.id) - dates = transportlist.with_entities(Transport.date).distinct().order_by(Transport.date).all() + dates = ( + transportlist.with_entities(Transport.date) + .distinct() + .order_by(Transport.date) + .all() + ) - filterform = TransportFilterForm(day=request.args.get('day')) - filterform.day.choices = [('None', 'Filter by date')] + [(date[0], date[0]) for date in dates] + filterform = TransportFilterForm(day=request.args.get("day")) + filterform.day.choices = [("None", "Filter by date")] + [ + (date[0], date[0]) for date in dates + ] if filterform.day.object_data is not None: - transportlist = transportlist.filter(Transport.date == parser.parse(filterform.day.object_data).date()) + transportlist = transportlist.filter( + Transport.date == parser.parse(filterform.day.object_data).date() + ) - return render_template('transport_list.html', transportlist=transportlist, filterform=filterform) + return render_template( + "transport_list.html", transportlist=transportlist, filterform=filterform + ) -@app.route('/transports/show/<int:id>') -@app.route('/transports/show/<int:id>/<string:format>') +@app.route("/transports/show/<int:id>") +@app.route("/transports/show/<int:id>/<string:format>") @login_required def show_transport(id=None, format=None): transport = Transport.query.get(id) if transport is None or not ( - transport.user_id == current_user.id or current_user.role in ['helpdesk', 'admin']): + transport.user_id == current_user.id + or current_user.role in ["helpdesk", "admin"] + ): transport = None - flash('Transport is not available') + flash("Transport is not available") else: if transport.done: - flash('Transport is done', 'success') + flash("Transport is done", "success") elif transport.cancelled: - flash('Transport was cancelled!', 'danger') + flash("Transport was cancelled!", "danger") - if format == 'sticker': - template = 'transport_sticker.html' + if format == "sticker": + template = "transport_sticker.html" else: - template = 'transport_details.html' + template = "transport_details.html" return render_template(template, transport=transport) -@app.route('/transports/mark/<mark>/<int:id>', methods=['GET', 'POST']) +@app.route("/transports/mark/<mark>/<int:id>", methods=["GET", "POST"]) @login_required def mark_transport(mark, id=None): transport = Transport.query.get(id) if transport is None or not ( - transport.user_id == current_user.id or current_user.role in ['helpdesk', 'admin']): + transport.user_id == current_user.id + or current_user.role in ["helpdesk", "admin"] + ): transport = None - flash('Transport not available') + flash("Transport not available") elif transport.done: - flash('Transport already marked as done!') + flash("Transport already marked as done!") transport = None elif transport.cancelled: - flash('Transport cancelled!') + flash("Transport cancelled!") transport = None form = FlaskForm() if form.validate_on_submit(): - if mark == 'done' and current_user.role in ['helpdesk', 'admin']: + if mark == "done" and current_user.role in ["helpdesk", "admin"]: transport.done = True - elif mark == 'cancelled': + elif mark == "cancelled": transport.cancelled = True # # close ticket # - if transport.ticket_id: - close_ticket(transport, mark) + # if transport.ticket_id: + # close_ticket(transport, mark) db.session.add(transport) db.session.commit() - return redirect(url_for('list_transports')) + return redirect(url_for("list_transports")) - return render_template('transport_mark.html', mark=mark, transport=transport, form=form) + return render_template( + "transport_mark.html", mark=mark, transport=transport, form=form + ) -@app.route('/addresses/list') +@app.route("/addresses/list") @login_required def list_addresses(): - if not (current_user.role in ['helpdesk', 'admin']): + if not (current_user.role in ["helpdesk", "admin"]): abort(404) addresses = Address.query.all() - return render_template('address_list.html', addresslist=addresses) + return render_template("address_list.html", addresslist=addresses) -@app.route('/addresses/add', defaults={'id': None}, methods=['GET', 'POST']) -@app.route('/addresses/edit/<int:id>', methods=['GET', 'POST']) +@app.route("/addresses/add", defaults={"id": None}, methods=["GET", "POST"]) +@app.route("/addresses/edit/<int:id>", methods=["GET", "POST"]) @login_required def edit_address(id=None): - if not (current_user.role in ['helpdesk', 'admin']): + if not (current_user.role in ["helpdesk", "admin"]): abort(404) if id is not None: address = Address.query.get(id) - if not (current_user.id == address.user_id or current_user.role in ['admin']): + if not (current_user.id == address.user_id or current_user.role in ["admin"]): address = None else: address = Address() @@ -318,24 +372,30 @@ def edit_address(id=None): db.session.add(address) db.session.commit() - flash('Saved!') + flash("Saved!") if id is None: - return redirect(url_for('edit_address', id=address.id)) + return redirect(url_for("edit_address", id=address.id)) else: addressform = AddressForm() - flash('Not authorized to edit this address!') + flash("Not authorized to edit this address!") - return render_template('address_edit.html', addressform=addressform, address=address) + return render_template( + "address_edit.html", addressform=addressform, address=address + ) -@app.route('/addresses/del/<int:id>', methods=['GET', 'POST']) +@app.route("/addresses/del/<int:id>", methods=["GET", "POST"]) @login_required def delete_address(id): address = Address.query.get(id) - if address is None or current_user.id is not address.user_id or current_user.role not in ['admin']: + if ( + address is None + or current_user.id is not address.user_id + or current_user.role not in ["admin"] + ): abort(404) form = FlaskForm() @@ -344,60 +404,64 @@ def delete_address(id): db.session.delete(address) db.session.commit() - return redirect(url_for('list_addresses')) + return redirect(url_for("list_addresses")) - return render_template('address_delete.html', form=form, address=address) + return render_template("address_delete.html", form=form, address=address) -@app.route('/users/list') +@app.route("/users/list") @login_required def list_users(): - if not (current_user.role in ['admin']): + if not (current_user.role in ["admin"]): abort(404) users = User.query.all() - return render_template('user_list.html', userlist=users) + return render_template("user_list.html", userlist=users) -@app.route('/users/show/<int:id>', methods=['GET', 'POST']) +@app.route("/users/show/<int:id>", methods=["GET", "POST"]) @login_required def edit_user(id=None): - if not (current_user.role in ['admin']): + if not (current_user.role in ["admin"]): abort(404) user = User.query.get(id) roleform = RoleForm(obj=user) if user is None: - flash('User not available') + flash("User not available") elif roleform.validate_on_submit(): roleform.populate_obj(user) db.session.add(user) db.session.commit() - flash('Saved') + flash("Saved") - return render_template('user_details.html', user=user, roleform=roleform) + return render_template("user_details.html", user=user, roleform=roleform) -@app.route('/me', methods=['GET']) +@app.route("/me", methods=["GET"]) @login_required def me(): token = current_user.create_token() - return render_template('me.html', token=token) + return render_template("me.html", token=token) -@app.route('/uploads/<int:transport_id>/<path:filename>') +@app.route("/uploads/<int:transport_id>/<path:filename>") def uploaded_file(transport_id, filename): transport = Transport.query.get(transport_id) - if transport is None or not (transport.user_id == current_user.id or current_user.role in ['helpdesk', 'admin']): + if transport is None or not ( + transport.user_id == current_user.id + or current_user.role in ["helpdesk", "admin"] + ): abort(404) - return send_from_directory(os.path.join(app.config['UPLOAD_DIR'], str(transport_id)), - filename) + return send_from_directory( + os.path.join(app.config["UPLOAD_DIR"], str(transport_id)), filename + ) def format_datetime(value): @@ -405,24 +469,24 @@ def format_datetime(value): return babel.dates.format_datetime(value, format) -_paragraph_re = re.compile(r'(?:\r\n|\r|\n)') +_paragraph_re = re.compile(r"(?:\r\n|\r|\n)") @pass_eval_context def nl2br(eval_ctx, value): - result = u'<br />\n'.join(_paragraph_re.split(escape(value))) + result = "<br />\n".join(_paragraph_re.split(escape(value))) if eval_ctx.autoescape: result = Markup(result) return result -app.jinja_env.filters['datetime'] = format_datetime -app.jinja_env.filters['nl2br'] = nl2br +app.jinja_env.filters["datetime"] = format_datetime +app.jinja_env.filters["nl2br"] = nl2br @app.context_processor def inject_global_template_vars(): - return dict(app_name=app.config['APP_NAME'], vehicletypes=VehicleTypes, roles=Roles) + return dict(app_name=app.config["APP_NAME"], vehicletypes=VehicleTypes, roles=Roles) @app.context_processor diff --git a/transporte/zammad_integration.py b/transporte/zammad_integration.py index 5c3ea46f11d8c24781f2f901918260dfa79858aa..4ac74f2f8cbe06aad7d87eedad99ad1c4e089ee8 100644 --- a/transporte/zammad_integration.py +++ b/transporte/zammad_integration.py @@ -1,80 +1,102 @@ from datetime import datetime +from flask import current_app as app from flask import render_template from zammad_py import ZammadAPI -from flask import current_app as app - def close_ticket(transport, reason): - client = ZammadAPI(username=app.config['ZAMMAD_USER'], password=app.config['ZAMMAD_PASS'], - host=app.config['ZAMMAD_HOST'], is_secure=app.config['ZAMMAD_SECURE']) + client = ZammadAPI( + username=app.config["ZAMMAD_USER"], + password=app.config["ZAMMAD_PASS"], + host=app.config["ZAMMAD_HOST"], + is_secure=app.config["ZAMMAD_SECURE"], + ) messagebody = reason # - # if new helpdesk user is closing a ticket, we need to create the user in the ticket system + # if new helpdesk user is closing a ticket, + # we need to create the user in the ticket system # - zammad_search_result = client.user.search({'query': transport.user.login}) + zammad_search_result = client.user.search({"query": transport.user.login}) if zammad_search_result: zammad_user = zammad_search_result[0] else: - zammad_user = client.user.create({'firstname': transport.user.login, 'email': transport.user.login}) + zammad_user = client.user.create( + {"firstname": transport.user.login, "email": transport.user.login} + ) ticketTemplate = { - 'id': transport.ticket_id, - 'title': '[Transport] from: ' + transport.origin + ' to: ' + transport.destination, - 'group_id': app.config['ZAMMAD_GROUP_ID'], - 'customer_id': zammad_user['id'], - 'state_id': '4', - 'article': {'from': app.config['ZAMMAD_QUEUE'], - 'to': transport.user.login, - 'body': messagebody, - 'type_id': 1, - 'content_type': 'text/html'}, - 'tags': 'transport', - } + "id": transport.ticket_id, + "title": "[Transport] from: " + + transport.origin + + " to: " + + transport.destination, + "group_id": app.config["ZAMMAD_GROUP_ID"], + "customer_id": zammad_user["id"], + "state_id": "4", + "article": { + "from": app.config["ZAMMAD_QUEUE"], + "to": transport.user.login, + "body": messagebody, + "type_id": 1, + "content_type": "text/html", + }, + "tags": "transport", + } client.ticket.update(transport.ticket_id, ticketTemplate) def update_ticket(transport): - client = ZammadAPI(username=app.config['ZAMMAD_USER'], password=app.config['ZAMMAD_PASS'], - host=app.config['ZAMMAD_HOST'], is_secure=app.config['ZAMMAD_SECURE']) + client = ZammadAPI( + username=app.config["ZAMMAD_USER"], + password=app.config["ZAMMAD_PASS"], + host=app.config["ZAMMAD_HOST"], + is_secure=app.config["ZAMMAD_SECURE"], + ) # # Create user if not exists # - zammad_search_result = client.user.search({'query': transport.user.login}) + zammad_search_result = client.user.search({"query": transport.user.login}) if zammad_search_result: zammad_user = zammad_search_result[0] else: - zammad_user = client.user.create({'firstname': transport.user.login, 'email': transport.user.login}) + zammad_user = client.user.create( + {"firstname": transport.user.login, "email": transport.user.login} + ) - messagebody = render_template('email_transport_update.html', transport=transport) + messagebody = render_template("email_transport_update.html", transport=transport) ticketTemplate = { - 'title': '[Transport] from: ' + transport.origin + ' to: ' + transport.destination, - 'group_id': app.config['ZAMMAD_GROUP_ID'], - 'customer_id': zammad_user['id'], - 'state_id': '3' if transport.needs_organization else '2', - 'article': {'from': app.config['ZAMMAD_QUEUE'], - 'to': transport.user.login, - 'body': messagebody, - 'type_id': 1, - 'content_type': 'text/html'}, - 'tags': 'transport', - 'pending_time': '{}Z'.format(datetime.utcnow().isoformat()), + "title": "[Transport] from: " + + transport.origin + + " to: " + + transport.destination, + "group_id": app.config["ZAMMAD_GROUP_ID"], + "customer_id": zammad_user["id"], + "state_id": "3" if transport.needs_organization else "2", + "article": { + "from": app.config["ZAMMAD_QUEUE"], + "to": transport.user.login, + "body": messagebody, + "type_id": 1, + "content_type": "text/html", + }, + "tags": "transport", + "pending_time": "{}Z".format(datetime.utcnow().isoformat()), } # # Create a new ticket, if the transport has no ticket_id in the db # if transport.ticket_id is None: - transport.ticket_id = client.ticket.create(ticketTemplate)['id'] + transport.ticket_id = client.ticket.create(ticketTemplate)["id"] else: - ticketTemplate['id'] = transport.ticket_id + ticketTemplate["id"] = transport.ticket_id client.ticket.update(transport.ticket_id, ticketTemplate) return transport.ticket_id