Source code for sideshow_corepos.batch.neworder

# -*- coding: utf-8; -*-
################################################################################
#
#  Sideshow-COREPOS -- Case/Special Order Tracker for CORE-POS
#  Copyright © 2025 Lance Edgar
#
#  This file is part of Sideshow.
#
#  Sideshow is free software: you can redistribute it and/or modify it
#  under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  Sideshow is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with Sideshow.  If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
New Order Batch Handler for CORE-POS
"""

import decimal

import sqlalchemy as sa
from sqlalchemy import orm

from sideshow.batch import neworder as base


[docs] class NewOrderBatchHandler(base.NewOrderBatchHandler): """ Custom :term:`handler` for :term:`new order batches <new order batch>` which can use CORE-POS as external data source for customers and products. See parent class :class:`~sideshow:sideshow.batch.neworder.NewOrderBatchHandler` for more info. """ def autocomplete_customers_external( # pylint: disable=empty-docstring self, session, term, user=None ): """ """ corepos = self.app.get_corepos_handler() op_model = corepos.get_model_office_op() op_session = corepos.make_session_office_op() # base query query = op_session.query(op_model.CustomerClassic).join( op_model.MemberInfo, op_model.MemberInfo.card_number == op_model.CustomerClassic.card_number, ) # filter query criteria = [] for word in term.split(): criteria.append( sa.or_( op_model.CustomerClassic.first_name.ilike(f"%{word}%"), op_model.CustomerClassic.last_name.ilike(f"%{word}%"), ) ) query = query.filter(sa.and_(*criteria)) # sort query query = query.order_by( op_model.CustomerClassic.first_name, op_model.CustomerClassic.last_name ) # get data # TODO: need max_results option customers = query.all() # get results def result(customer): return {"value": str(customer.card_number), "label": str(customer)} results = [result(c) for c in customers] op_session.close() return results def refresh_batch_from_external_customer( # pylint: disable=empty-docstring self, batch ): """ """ corepos = self.app.get_corepos_handler() op_model = corepos.get_model_office_op() op_session = corepos.make_session_office_op() if not batch.customer_id.isdigit(): raise ValueError( f"invalid CORE-POS customer card number: {batch.customer_id}" ) try: customer = ( op_session.query(op_model.CustomerClassic) .join( op_model.MemberInfo, op_model.MemberInfo.card_number == op_model.CustomerClassic.card_number, ) .filter(op_model.CustomerClassic.card_number == int(batch.customer_id)) .filter(op_model.CustomerClassic.person_number == 1) .options(orm.joinedload(op_model.CustomerClassic.member_info)) .one() ) except orm.exc.NoResultFound as e: raise ValueError(f"CORE-POS Customer not found: {batch.customer_id}") from e batch.customer_name = str(customer) batch.phone_number = customer.member_info.phone batch.email_address = customer.member_info.email op_session.close() def autocomplete_products_external( # pylint: disable=empty-docstring self, session, term, user=None ): """ """ corepos = self.app.get_corepos_handler() op_model = corepos.get_model_office_op() op_session = corepos.make_session_office_op() # base query query = op_session.query(op_model.Product) # filter query criteria = [] for word in term.split(): criteria.append( sa.or_( op_model.Product.brand.ilike(f"%{word}%"), op_model.Product.description.ilike(f"%{word}%"), ) ) query = query.filter(sa.and_(*criteria)) # sort query query = query.order_by(op_model.Product.brand, op_model.Product.description) # get data # TODO: need max_results option products = query.all() # get results def result(product): return { "value": product.upc, "label": self.app.make_full_name( product.brand, product.description, product.size ), } results = [result(c) for c in products] op_session.close() return results def get_product_info_external( # pylint: disable=empty-docstring self, session, product_id, user=None ): """ """ corepos = self.app.get_corepos_handler() op_model = corepos.get_model_office_op() op_session = corepos.make_session_office_op() try: product = ( op_session.query(op_model.Product) .filter(op_model.Product.upc == product_id) .one() ) except orm.exc.NoResultFound as e: raise ValueError(f"CORE-POS Product not found: {product_id}") from e data = { "product_id": product.upc, "scancode": product.upc, "brand_name": product.brand, "description": product.description, "size": product.size, "full_description": self.app.make_full_name( product.brand, product.description, product.size ), "weighed": product.scale, "special_order": False, "department_id": product.department_number, "department_name": product.department.name if product.department else None, "case_size": self.get_case_size_for_external_product(product), "unit_price_reg": self.get_unit_price_reg_for_external_product(product), # TODO # 'vendor_name': product.vendor_name, # 'vendor_item_code': product.vendor_item_code, } op_session.close() return data def refresh_row_from_external_product(self, row): # pylint: disable=empty-docstring """ """ corepos = self.app.get_corepos_handler() op_model = corepos.get_model_office_op() op_session = corepos.make_session_office_op() try: product = ( op_session.query(op_model.Product) .filter(op_model.Product.upc == row.product_id) .one() ) except orm.exc.NoResultFound as e: raise ValueError(f"CORE-POS Product not found: {row.product_id}") from e row.product_scancode = product.upc row.product_brand = product.brand row.product_description = product.description row.product_size = product.size row.product_weighed = product.scale row.department_id = product.department_number row.department_name = product.department.name if product.department else None row.special_order = False row.vendor_name = None row.vendor_item_code = None item = product.default_vendor_item if item: row.vendor_name = item.vendor.name if item.vendor else None row.vendor_item_code = item.sku row.case_size = self.get_case_size_for_external_product(product) row.unit_cost = product.cost row.unit_price_reg = self.get_unit_price_reg_for_external_product(product) op_session.close() def get_case_size_for_external_product( # pylint: disable=empty-docstring self, product ): """ """ if product.vendor_items: item = product.vendor_items[0] if item.units is not None: return decimal.Decimal(f"{item.units:0.4f}") return None def get_unit_price_reg_for_external_product( # pylint: disable=empty-docstring self, product ): """ """ if product.normal_price is not None: return decimal.Decimal(f"{product.normal_price:0.3f}") return None