Skip to content

Commit 1e3e990

Browse files
start porting python-nostr
1 parent e63c9d4 commit 1e3e990

File tree

4 files changed

+316
-0
lines changed

4 files changed

+316
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# cipher.py: MicroPython compatibility layer for cryptography.hazmat.primitives.ciphers
2+
# Implements Cipher, algorithms.AES, and modes.CBC using ucryptolib
3+
4+
from ucryptolib import aes
5+
6+
class Cipher:
7+
"""Emulates cryptography's Cipher for AES encryption/decryption."""
8+
def __init__(self, algorithm, mode):
9+
self.algorithm = algorithm
10+
self.mode = mode
11+
self._key = algorithm.key
12+
self._iv = mode.iv if mode.iv is not None else b'\x00' * 16
13+
self._cipher = aes(self._key, 1) # Mode 1 = CBC
14+
15+
def encryptor(self):
16+
return Encryptor(self._cipher, self._iv)
17+
18+
def decryptor(self):
19+
return Decryptor(self._cipher, self._iv)
20+
21+
class Encryptor:
22+
"""Handles encryption with the initialized cipher."""
23+
def __init__(self, cipher, iv):
24+
self._cipher = cipher
25+
self._iv = iv
26+
self._buffer = bytearray()
27+
28+
def update(self, data):
29+
self._buffer.extend(data)
30+
# MicroPython's ucryptolib processes full blocks
31+
block_size = 16 # AES block size
32+
if len(self._buffer) >= block_size:
33+
to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
34+
self._buffer = self._buffer[len(to_process):]
35+
return self._cipher.encrypt(to_process)
36+
return b''
37+
38+
def finalize(self):
39+
if self._buffer:
40+
# Pad remaining data if needed (handled by caller with PKCS7)
41+
return self._cipher.encrypt(self._buffer)
42+
return b''
43+
44+
class Decryptor:
45+
"""Handles decryption with the initialized cipher."""
46+
def __init__(self, cipher, iv):
47+
self._cipher = cipher
48+
self._iv = iv
49+
self._buffer = bytearray()
50+
51+
def update(self, data):
52+
self._buffer.extend(data)
53+
block_size = 16
54+
if len(self._buffer) >= block_size:
55+
to_process = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
56+
self._buffer = self._buffer[len(to_process):]
57+
return self._cipher.decrypt(to_process)
58+
return b''
59+
60+
def finalize(self):
61+
if self._buffer:
62+
return self._cipher.decrypt(self._buffer)
63+
return b''
64+
65+
class algorithms:
66+
"""Namespace for cipher algorithms."""
67+
class AES:
68+
def __init__(self, key):
69+
if len(key) not in (16, 24, 32): # 128, 192, 256-bit keys
70+
raise ValueError("AES key must be 16, 24, or 32 bytes")
71+
self.key = key
72+
self.block_size = 128 # Bits
73+
74+
class modes:
75+
"""Namespace for cipher modes."""
76+
class CBC:
77+
def __init__(self, iv):
78+
if len(iv) != 16:
79+
raise ValueError("CBC IV must be 16 bytes")
80+
self.iv = iv
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# primitives.py: MicroPython compatibility layer for cryptography.hazmat.primitives.padding
2+
# Implements PKCS7 padding and unpadding
3+
4+
def _byte_padding_check(block_size):
5+
"""Validate block size for padding."""
6+
if not (0 <= block_size <= 2040):
7+
raise ValueError("block_size must be in range(0, 2041).")
8+
if block_size % 8 != 0:
9+
raise ValueError("block_size must be a multiple of 8.")
10+
11+
class PKCS7PaddingContext:
12+
"""Handles PKCS7 padding."""
13+
def __init__(self, block_size):
14+
_byte_padding_check(block_size)
15+
self.block_size = block_size // 8 # Convert bits to bytes
16+
self._buffer = bytearray()
17+
18+
def update(self, data):
19+
self._buffer.extend(data)
20+
# Return full blocks
21+
block_size = self.block_size
22+
if len(self._buffer) >= block_size:
23+
to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
24+
self._buffer = self._buffer[len(to_return):]
25+
return to_return
26+
return b''
27+
28+
def finalize(self):
29+
# Pad with bytes equal to padding length
30+
pad_length = self.block_size - (len(self._buffer) % self.block_size)
31+
padding = bytes([pad_length] * pad_length)
32+
self._buffer.extend(padding)
33+
result = bytes(self._buffer)
34+
self._buffer = bytearray()
35+
return result
36+
37+
class PKCS7UnpaddingContext:
38+
"""Handles PKCS7 unpadding."""
39+
def __init__(self, block_size):
40+
_byte_padding_check(block_size)
41+
self.block_size = block_size // 8
42+
self._buffer = bytearray()
43+
44+
def update(self, data):
45+
self._buffer.extend(data)
46+
# Only process complete blocks
47+
block_size = self.block_size
48+
if len(self._buffer) >= block_size:
49+
to_return = self._buffer[:len(self._buffer) - (len(self._buffer) % block_size)]
50+
self._buffer = self._buffer[len(to_return):]
51+
return to_return
52+
return b''
53+
54+
def finalize(self):
55+
if not self._buffer or len(self._buffer) % self.block_size != 0:
56+
raise ValueError("Invalid padding")
57+
pad_length = self._buffer[-1]
58+
if pad_length > self.block_size or pad_length == 0:
59+
raise ValueError("Invalid padding")
60+
if self._buffer[-pad_length:] != bytes([pad_length] * pad_length):
61+
raise ValueError("Invalid padding")
62+
result = bytes(self._buffer[:-pad_length])
63+
self._buffer = bytearray()
64+
return result
65+
66+
class PKCS7:
67+
"""PKCS7 padding implementation."""
68+
def __init__(self, block_size):
69+
_byte_padding_check(block_size)
70+
self.block_size = block_size
71+
72+
def padder(self):
73+
return PKCS7PaddingContext(self.block_size)
74+
75+
def unpadder(self):
76+
return PKCS7UnpaddingContext(self.block_size)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# dataclasses.py: Minimal MicroPython compatibility layer for Python's dataclasses
2+
# Implements @dataclass with __init__ and __repr__ generation
3+
4+
def dataclass(cls):
5+
"""Decorator to emulate Python's @dataclass, generating __init__ and __repr__."""
6+
# Get class annotations and defaults
7+
annotations = getattr(cls, '__annotations__', {})
8+
defaults = {}
9+
for name in dir(cls):
10+
if not name.startswith('__'):
11+
attr = getattr(cls, name, None)
12+
if not callable(attr) and name in annotations:
13+
defaults[name] = attr
14+
15+
# Generate __init__ method
16+
def __init__(self, *args, **kwargs):
17+
# Positional arguments
18+
fields = list(annotations.keys())
19+
for i, value in enumerate(args):
20+
if i >= len(fields):
21+
raise TypeError(f"Too many positional arguments")
22+
setattr(self, fields[i], value)
23+
24+
# Keyword arguments and defaults
25+
for name in fields:
26+
if name in kwargs:
27+
setattr(self, name, kwargs[name])
28+
elif not hasattr(self, name):
29+
if name in defaults:
30+
setattr(self, name, defaults[name])
31+
else:
32+
raise TypeError(f"Missing required argument: {name}")
33+
34+
# Generate __repr__ method
35+
def __repr__(self):
36+
fields = [
37+
f"{name}={getattr(self, name)!r}"
38+
for name in annotations
39+
]
40+
return f"{cls.__name__}({', '.join(fields)})"
41+
42+
# Attach generated methods to class
43+
setattr(cls, '__init__', __init__)
44+
setattr(cls, '__repr__', __repr__)
45+
46+
return cls

internal_filesystem/lib/secrets.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# By PiggyOS
2+
3+
# secrets.py: Compatibility layer for CPython's secrets module in MicroPython
4+
# Uses urandom for cryptographically secure randomness
5+
# Implements SystemRandom, choice, randbelow, randbits, token_bytes, token_hex,
6+
# token_urlsafe, and compare_digest
7+
8+
import urandom
9+
import ubinascii
10+
import uhashlib
11+
import utime
12+
13+
class SystemRandom:
14+
"""Emulates random.SystemRandom using MicroPython's urandom."""
15+
16+
def randrange(self, start, stop=None, step=1):
17+
"""Return a random int in range(start, stop[, step])."""
18+
if stop is None:
19+
stop = start
20+
start = 0
21+
if step != 1:
22+
raise NotImplementedError("step != 1 not supported")
23+
if start >= stop:
24+
raise ValueError("empty range")
25+
range_size = stop - start
26+
return start + self._randbelow(range_size)
27+
28+
def _randbelow(self, n):
29+
"""Return a random int in [0, n)."""
30+
if n <= 0:
31+
raise ValueError("exclusive_upper_bound must be positive")
32+
k = (n.bit_length() + 7) // 8 # Bytes needed for n
33+
r = 0
34+
while True:
35+
r = int.from_bytes(self._getrandbytes(k), 'big')
36+
if r < n:
37+
return r
38+
39+
def _getrandbytes(self, n):
40+
"""Return n random bytes."""
41+
return bytearray(urandom.getrandbits(8) for _ in range(n))
42+
43+
def choice(self, seq):
44+
"""Return a randomly chosen element from a non-empty sequence."""
45+
if not seq:
46+
raise IndexError("cannot choose from an empty sequence")
47+
return seq[self._randbelow(len(seq))]
48+
49+
def randbits(self, k):
50+
"""Return a non-negative int with k random bits."""
51+
if k < 0:
52+
raise ValueError("number of bits must be non-negative")
53+
numbytes = (k + 7) // 8
54+
return int.from_bytes(self._getrandbytes(numbytes), 'big') >> (numbytes * 8 - k)
55+
56+
# Instantiate SystemRandom for module-level functions
57+
_sysrand = SystemRandom()
58+
59+
def choice(seq):
60+
"""Return a randomly chosen element from a non-empty sequence."""
61+
return _sysrand.choice(seq)
62+
63+
def randbelow(exclusive_upper_bound):
64+
"""Return a random int in [0, exclusive_upper_bound)."""
65+
return _sysrand._randbelow(exclusive_upper_bound)
66+
67+
def randbits(k):
68+
"""Return a non-negative int with k random bits."""
69+
return _sysrand.randbits(k)
70+
71+
def token_bytes(nbytes=None):
72+
"""Return a random byte string of nbytes. Default is 32 bytes."""
73+
if nbytes is None:
74+
nbytes = 32
75+
if nbytes < 0:
76+
raise ValueError("number of bytes must be non-negative")
77+
return _sysrand._getrandbytes(nbytes)
78+
79+
def token_hex(nbytes=None):
80+
"""Return a random hex string of nbytes. Default is 32 bytes."""
81+
return ubinascii.hexlify(token_bytes(nbytes)).decode()
82+
83+
def token_urlsafe(nbytes=None):
84+
"""Return a random URL-safe base64 string of nbytes. Default is 32 bytes."""
85+
if nbytes is None:
86+
nbytes = 32
87+
if nbytes < 0:
88+
raise ValueError("number of bytes must be non-negative")
89+
# Base64 encoding: 4 chars per 3 bytes, so we need ceil(nbytes * 4/3) chars
90+
# Generate enough bytes to ensure we have at least nbytes after encoding
91+
raw_bytes = token_bytes(nbytes)
92+
# Use URL-safe base64 encoding (replaces '+' with '-', '/' with '_')
93+
encoded = ubinascii.b2a_base64(raw_bytes).decode().rstrip('\n=')
94+
# Ensure length corresponds to nbytes (truncate if needed)
95+
return encoded[:int(nbytes * 4 / 3)]
96+
97+
def compare_digest(a, b):
98+
"""Return True if a and b are equal in constant time, else False."""
99+
# Convert to bytes if strings
100+
if isinstance(a, str):
101+
a = a.encode()
102+
if isinstance(b, str):
103+
b = b.encode()
104+
if not isinstance(a, (bytes, bytearray)) or not isinstance(b, (bytes, bytearray)):
105+
raise TypeError("both inputs must be bytes-like or strings")
106+
if len(a) != len(b):
107+
return False
108+
# Constant-time comparison to prevent timing attacks
109+
result = 0
110+
for x, y in zip(a, b):
111+
result |= x ^ y
112+
return result == 0
113+
114+

0 commit comments

Comments
 (0)