J'essaie d'exécuter une requête sql brute et de passer en toute sécurité un ordre par/asc/desc en fonction de l'entrée de l'utilisateur. C'est l'extrémité arrière d'une grille de données paginée. Je ne peux pas pour la vie de moi comprendre comment faire cela en toute sécurité. Les paramètres sont convertis en chaînes afin qu'Oracle ne puisse pas exécuter la requête. Je ne trouve aucun exemple de ceci n'importe où sur Internet. Quelle est la meilleure façon d'accomplir cela en toute sécurité? (Je n'utilise pas l'ORM, doit être raw sql).Comment lier en toute sécurité la colonne Oracle à ORDER BY à SQLAlchemy dans une requête brute?
Ma solution de contournement consiste à définir ASC/DESC sur une variable que j'ai définie. Cela fonctionne bien et est sûr. Cependant, comment lier un nom de colonne à ORDER BY? Est-ce que c'est possible? Je peux juste ajouter à la liste blanche un groupe de colonnes et faire quelque chose de similaire à l'ASC/DESC. J'étais juste curieux de savoir s'il y avait un moyen de le lier. Merci.
@default.route('/api/barcodes/<sort_by>/<sort_dir>', methods=['GET'])
@json_enc
def fetch_barcodes(sort_by, sort_dir):
#time.sleep(5)
# Can't use sort_dir as a parameter, so assign to variable to sanitize it
ord_dir = "DESC" if sort_dir.lower() == 'desc' else 'ASC'
records = []
stmt = text("SELECT bb_request_id,bb_barcode,bs_status, "
"TO_CHAR(bb_rec_cre_date, 'MM/DD/YYYY') AS bb_rec_cre_date "
"FROM bars_barcodes,bars_status "
"WHERE bs_status_id = bb_status_id "
"ORDER BY :ord_by :ord_dir ")
stmt = stmt.bindparams(ord_by=sort_by,ord_dir=ord_dir)
rs = db.session.execute(stmt)
records = [dict(zip(rs.keys(), row)) for row in rs]
DatabaseError: (cx_Oracle.DatabaseError) ORA-01036: nom de la variable illégale/Numéro [SQL: « SELECT bb_request_id, bb_barcode, bs_status, TO_CHAR (bb_rec_cre_date, 'MM/JJ/AAAA) AS bb_rec_cre_date dE bars_barcodes, bars_status OU bs_status_id = bb_status_id ORDER BY: ord_by: ord_dir « ] [paramètres: { 'ord_by': u'bb_rec_cre_date », 'ord_dir': 'ASC'}]
Solution UPDATE basée sur acceptée answer:
def fetch_barcodes(sort_by, sort_dir, page, rows_per_page):
ord_dir_func = desc if sort_dir.lower() == 'desc' else asc
query_limit = int(rows_per_page)
query_offset = (int(page) - 1) * query_limit
stmt = select([column('bb_request_id'),
column('bb_barcode'),
column('bs_status'),
func.to_char(column('bb_rec_cre_date'), 'MM/DD/YYYY').label('bb_rec_cre_date')]).\
select_from(table('bars_barcode')).\
select_from(table('bars_status')).\
where(column('bs_status_id') == column('bb_status_id')).\
order_by(ord_dir_func(column(sort_by))).\
limit(query_limit).offset(query_offset)
result = db.session.execute(stmt)
records = [dict(row) for row in result]
response = json_return()
response.addRecords(records)
#response.setTotal(len(records))
response.setTotal(1001)
response.setSuccess(True)
response.addMessage("Records retrieved successfully. Limit: " + str(query_limit) + ", Offset: " + str(query_offset) + " SQL: " + str(stmt))
return response
Merci pour la réponse et la tête sur le zip. J'ai copié ce code à partir de l'application d'un collègue. Je suis assez nouveau pour Python. J'avais l'intention de revenir en arrière et de comprendre un peu mieux ce concept. Merci beaucoup pour la clarification. Il est tôt ici à San Diego, donc je vais vérifier votre solution plus tard aujourd'hui. –