1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| import json import logging import os import socket import ssl import struct import sys import time import uuid import argparse
APNS_HOST = 'gateway.push.apple.com' APNS_HOST_SANDBOX = 'gateway.sandbox.push.apple.com' APNS_PORT = 2195
APNS_ERRORS = { 1:'Processing error', 2:'Missing device token', 3:'missing topic', 4:'missing payload', 5:'invalid token size', 6:'invalid topic size', 7:'invalid payload size', 8:'invalid token', 255:'Unknown' }
def push(cert_path, device, sandbox): if not os.path.exists(cert_path): logging.error("Invalid certificate path: %s" % cert_path) sys.exit(1)
device = device.decode('hex') expiry = time.time() + 3600
try: sock = ssl.wrap_socket( socket.socket(socket.AF_INET, socket.SOCK_STREAM), certfile=cert_path ) host = APNS_HOST_SANDBOX if sandbox else APNS_HOST sock.connect((host, APNS_PORT)) sock.settimeout(1) except Exception as e: logging.error("Failed to connect: %s" % e) sys.exit(1)
logging.info("Connected to APNS\n")
for ident in range(1,4): logging.info("Sending %d of 3 push notifications" % ident)
payload = json.dumps({ 'aps': { 'alert': 'Push Test %d: %s' % (ident, str(uuid.uuid4())[:8]) } })
items = [1, ident, expiry, 32, device, len(payload), payload]
try: sent = sock.write(struct.pack('!BIIH32sH%ds'%len(payload), *items)) if sent: logging.info("Message sent\n") else: logging.error("Unable to send message\n") except socket.error as e: logging.error("Socket write error: %s", e)
try: response = sock.read(6) command, status, failed_ident = struct.unpack('!BBI',response[:6]) logging.info("APNS Error: %s\n", APNS_ERRORS.get(status)) sys.exit(1) except socket.timeout: pass except ssl.SSLError: pass
sock.close()
if __name__ == '__main__': parser = argparse.ArgumentParser(description="Send test APNS notifications to device using cert") parser.add_argument("certificate", help="Push certificate") parser.add_argument("device_id", help="Device ID") parser.add_argument("-s", "--sandbox", action="store_true", help="Use APNS sandbox environment") args = parser.parse_args()
logging.basicConfig(level=logging.INFO) push(args.certificate, args.device_id, args.sandbox) logging.info("Complete\n")
|