import datetime import hashlib import time import json import random import string import requests from base64 import b64encode from urllib.parse import urlparse from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 from cryptography.hazmat.primitives.ciphers.aead import AESGCM import base64 from io import BytesIO # 生成二维码 def to_qr(text): import qrcode img = qrcode.make(text) bIo = BytesIO() img.save(bIo, format='PNG') ig = bIo.getvalue() data = base64.b64encode(ig) data = data.decode('utf-8') # return data # print(data) with open('pay.png', 'wb') as f: img.save(f) return data class WXPay: """ 微信 Native支付 """ def __init__(self): self.appid = "xxxxxxxxxxxxxxxxxx" # APPID self.mchid = "xxxxxxx" # 商户号 self.payment_url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/native' # Native支付下单接口 self.refund_url = 'https://api.mch.weixin.qq.com/v3/refund/domestic/refunds' # 退款接口 self.transfer_url = 'https://api.mch.weixin.qq.com/v3/transfer/batches' # 转账接口 self.notify_url = "https://xxxxxxxxxxxxxxxxxx.com" # 通知url self.serial_no = 'xxxxxxxxxxxxx' # 商户证书序列号 self.apiclient_key = './apiclient_key.pem' # 本地证书路径 # 生成签名 def get_sign(self, sign_str): rsa_key = RSA.importKey(open(self.apiclient_key).read()) signer = pkcs1_15.new(rsa_key) digest = SHA256.new(sign_str.encode('utf8')) sign = b64encode(signer.sign(digest)).decode('utf-8') return sign def request(self, url: str, method: str, data=None): data = json.dumps(data) if data else '' random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32)) timestamp = str(int(time.time())) # print("---------", url.split(urlparse(url).netloc)[-1]) sign_str = '\n'.join([ method.upper(), # HTTP请求方法 url.split(urlparse(url).netloc)[-1], # path+args timestamp, # 时间戳 random_str, # 请求随机串 data, '' # 请求报文主体 ]) # 结尾空窜仅用于让后面多一个\n sign = self.get_sign(sign_str) headers = { 'Accept': 'application/json;', 'Content-Type': 'application/json; charset=UTF-8', 'Authorization': f'WECHATPAY2-SHA256-RSA2048 mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{timestamp}",serial_no="{self.serial_no}"', 'Wechatpay-Serial': self.serial_no } response = requests.request(url=url, method=method, data=data, headers=headers) return response def selectorder(self, order_id): url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/out-trade-no/{}?mchid={}'.format(order_id, self.mchid) # print(url) return self.request(url, 'GET') # 支付 def payment(self, order_no, total, description): now_time = datetime.datetime.now() + datetime.timedelta(days=1) now_time_str = now_time.strftime("%Y-%m-%dT%H:%M:%S+08:00") data = { "mchid": self.mchid, "out_trade_no": order_no, # 订单号 "appid": self.appid, "description": description, # 商品描述 "notify_url": self.notify_url, "time_expire": now_time_str, "amount": { "total": total, # 总金额(分) "currency": "CNY" } } return self.request(self.payment_url, 'POST', data) # 退款 def refund(self, out_trade_no, out_refund_no, refund, reason): data = { "out_trade_no": out_trade_no, # 微信支付订单号(交易单号) "out_refund_no": out_refund_no, # 商户退款单号(商户单号) "reason": reason, # 退款原因 "notify_url": self.notify_url, # 通知Url "amount": { "total": refund, # 订单金额 "refund": refund, # 退款金额(分) "currency": "CNY" } } # print(data) return self.request(self.refund_url, 'POST', data) def application_bill(self): url = 'https://api.mch.weixin.qq.com/v3/bill/tradebill?bill_date=2022-02-28&bill_type=ALL' return self.request(url, 'GET') def transfer(self, out_batch_no, batch_name, batch_remark, total_amount, total_num, transfer_detail_list): ''' 商家转账到零钱 :param out_batch_no: 订单号 :param batch_name: 批次名字 :param batch_remark: 批次描述 :param total_amount: 总金额 :param total_num: 总笔数 :param transfer_detail_list: 详情列表 { out_detail_no: 详情订单号 transfer_amount: 金额 transfer_remark': 描述 openid': 用户openid } :return: Response ''' data = { 'appid': self.appid, 'out_batch_no': out_batch_no, 'batch_name': batch_name, 'batch_remark': batch_remark, 'total_amount': total_amount, 'total_num': total_num, 'transfer_detail_list': transfer_detail_list } return self.request(self.transfer_url, 'POST', data) def pay_decrypt(nonce, ciphertext, associated_data): # 支付结果解密 key = "xxxxxxxxxxxxxxxxx" key_bytes = str.encode(key) nonce_bytes = str.encode(nonce) ad_bytes = str.encode(associated_data) data = base64.b64decode(ciphertext) # print(nonce_bytes, data, ad_bytes) aesgcm = AESGCM(key_bytes) return aesgcm.decrypt(nonce_bytes, data, ad_bytes) def order_uuid(): """ """ order_id = str(datetime.datetime.fromtimestamp(time.time())).replace("-", "").replace(" ", "").replace(":","").replace(".", "") + str(random.randint(100, 999)) return order_id if __name__ == "__main__": wx = WXPay() order_uid = order_uuid() print(order_uid) resp = wx.payment(order_uid, 1, "订单支付-测试") print(resp.text) to_qr(resp.json()['code_url']) # --------------------------------------------------------------------------------------------------------- out_batch_no = order_uuid() batch_name = '提现' batch_remark = '用户提现' total_amount = 1 total_num = 1 transfer_detail_list = [ { 'out_detail_no': order_uuid(), 'transfer_amount': 1, 'transfer_remark': '用户提现', 'openid': 'odDp41UknzC5pD37d9EZWkyKCJk0', } ] wx = WXPay() ret = wx.transfer(out_batch_no, batch_name, batch_remark, total_amount, total_num, transfer_detail_list) print(ret.text)