Advanced
The python await and async is one of the more advanced features to help run your programs faster by making sure the CPU is spending as little time as possible waiting and instead as much time as possible working. If ever you see a capable chef, you’ll know what I mean. The chef is not just following a recipe step by step (i.e. working synchronously), the chef is boiling water to cook the pasta , measuring the amount of pasta, chopping tomatoes for the pasta sauce until the water boils etc (i.e. the chef is working asynchronously). The chef is minimizing the time they are waiting idle and always working on a task. That’s the same idea with async and await.
For this tutorial, we will focus on python 3.7 as it has some of the more modern features of await and async. We will call out some of the differences for python 3.4 – 3.6.
What is async await in Python?
The async await keywords help to define in your program which parts need to run sequentially, and which parts may take sometime but other parts of the program can execute while this step completes. A modern example of this is that if you’re downloading a web page it may take a few seconds, while the download is happening you can execute other parts of your program.
How does async await work in Python?
Sometimes the best way to explain something is to show how you would achieve the same thing without the feature.
Continuing with the restaurant theme, suppose you are running a hamburger stall (you’re the waiter and the chef) and it is almost instant to collect payment for a customer and serve the final hamburger, but the most time consuming task is to cooking the beef patty which takes 2 seconds (one could only wish!).
See the below diagram:

In the above diagram:
- Step 1: you would first get the order and collect the money from Customer 1
- Step 2: you would then put a beef patty on the cook top and then wait for 2 seconds for the beef patty to cook. At the same time, Customer 1 is also waiting for 2 seconds.
- Step 3: when the beef patty is cooked, you can then plate this onto a hamburger bun
- Step 4: pass the final hamburger to Customer 1
- Step 5: You would then start to serve Customer 2 (who has already been waiting 2 seconds for you to serve Customer 1). You can then repeat steps 2-4
With the above approach, Customer 1 would have their burger in about 2 seconds, Customer 2 approx 4 seconds, and then Customer 3 approx 6 seconds.
The equivalent code would be as follows:
import time, datetime, timeit
customer_queue = [ "C1", "C2", "C3" ]
def get_next_customer():
return customer_queue.pop(0) #Get the first customer from list
def cook_hamburger(customer):
start_customer_timer = timeit.default_timer()
print( f"[{customer}]: Start cooking hamberger for customer")
time.sleep(2) # It takes 2 seconds to cook the hamburger
end_customer_timer = timeit.default_timer()
print( f"[{customer}]: Finish cooking hamberger for customer. Total {end_customer_timer-start_customer_timer} seconds\n")
def run_shop():
while customer_queue:
curr_customer = get_next_customer()
cook_hamburger(curr_customer)
def main():
print('Hamburger Shop')
start = timeit.default_timer()
run_shop()
stop = timeit.default_timer()
print(f"** Total runtime: {stop-start} seconds ***")
if __name__ == '__main__':
main()
The code above is fairly straightforward. We have a list of customers that are queuing in the list customer_queue which are being looped under the def run_shop(). For each customer (get_next_customer()), we call cook_hamburger() to cook the hamburger for 2 seconds and wait for it to complete.
Running this code you would get the following output:

As expected, the total runtime for 3 customers is 6 seconds since each customer is served sequentially.
Cooking Hamburgers Asynchronously and coding the event loop manually
Instead of serving the customer and cooking the hamburger for each customer, you can obviously do some of the tasks asynchronously, meaning you can start the task but you don’t have to sit and wait, you can do something else. See the following diagram where the chef/waiter is serving multiple customers and cooking at the same time. It’s not explicitly shown here, but the chef/waiter is constantly checking on the status of the next task and if a task doesn’t require his/her attention they’ll move on to the next task. This process of always looking for something to do is the equivalent of the “event loop”. The Event Loop is a programming construct where the logic is to always look for a task to execute and if there’s a task which will take some time it can release control to the next task in the loop.

In the above example, the following is happening:
- Step 1: you would first get the order and collect the money from Customer 1
- Step 2: you would then put a beef patty on the cook top and then let it cook, then immediately move on to the next customer while the patty is cooking.
- Step 3: you would first get the order and collect the money from Customer 2. You would also check if the first beef patty has completed cooking yet.
- Step 4: you would then put another beef patty on the cook top and then let it cook, then immediately move on to the next customer while the patty is cooking.
- …
- Step 5: When any of the beef patties are done, you would plate it
- Step 6: Pass the plated hamburger to the respective customer. Note, in the above example we’ve assumed it to be Customer 1, but it could be any customer depending on which beef patty cooked fully first.
- Step 7: When any of the beef patties are done, you would plate it, and server
This is the equivalent of the event loop. The chef/waiter is constantly checking if it needs to serve the customer or check on the hamburgers which are cooking. When there’s a hamburger is placed on the stove and we need to wait 2 seconds, the chef/waiter moves to the next task and does not wait for the 2 seconds to complete. When the hamburger is done, it is then served to the customer.
How can this be done programatically? Glad you asked:
import time ,datetime, timeit
customer_queue = [ "C1", "C2", "C3" ]
hamburger_queue = []
def get_next_customer():
if customer_queue: return customer_queue.pop(0) #Get the first customer from list
return None
def start_cooking_hamburger(customer):
print( f"[{customer}]: Start cooking hamberger for customer")
hamburger = { "customer":customer, "start_cooking_time": timeit.default_timer(), "cooked":False}
hamburger_queue.append( hamburger )
def check_hamburger_status():
curr_timer = timeit.default_timer()
#Check if it's cooking, but release control
for index, hamburger in enumerate(hamburger_queue):
elapsed_time = curr_timer-hamburger['start_cooking_time']
if elapsed_time > 2: #2 second has passed for hamrburger to cook
print( f"[{hamburger['customer']}]: Finish cooking hamberger for customer. Total {elapsed_time} seconds\n")
del hamburger_queue[ index]. #delete from list to mark as done
def run_shop():
while customer_queue or hamburger_queue: #Event loop
curr_customer = get_next_customer()
if curr_customer: start_cooking_hamburger(curr_customer)
check_hamburger_status()
def main():
print('Hamburger Shop')
start = timeit.default_timer()
run_shop()
stop = timeit.default_timer()
print(f"** Total runtime: {stop-start} seconds ***")
if __name__ == '__main__':
main()
The output of the code is as follows:

So there’s a few things happening here:
- There’s a new list called
hamburger_queue[]which is keeping track of each hamburger that is being cooked - The event loop is the
while customer_queue or hamburger_queuewithin therun_shop()function - We have a new function called
start_cooking_hamburger()which helps to keep track of the task to cooking starting. Why is this needed? Well in the past we would simply wait for a given task. Now, since we are doing something else while we wait, we need to remember a few things to come back to the task - We also have a new function called
check_hamburger_status()which checks the status of each hamburger being cooked (i.e. item inhamburger_queue[]), and if it is cooked (i.e. 2 seconds have passed), then it is considered complete
You may notice in the output that Customer 3 was in fact served before Customer 2. This is because that the execution order is not guarantee.
How To Work with Python bytes and bytearray
Intermediate
Working with binary data is one of those skills that feels intimidating until you understand the two types Python gives you: bytes (immutable) and bytearray (mutable). Whether you are reading a binary file format, sending raw bytes over a network socket, working with image data, or packing integers into a compact binary protocol, these two types are what you reach for. The confusion usually comes from mixing up strings (Unicode text) and bytes (raw binary data) — a distinction Python 3 enforces strictly.
Both bytes and bytearray are built into Python 3 — no imports needed for the basic operations. The struct module (also built-in) handles packing and unpacking binary data formats like those used in network protocols and file headers. The examples below all run with the standard library only.
This article covers: creating bytes and bytearrays, encoding and decoding strings, slicing and searching binary data, modifying bytearrays in place, using the struct module for binary packing, reading binary files, and a real-life binary file parser that reads PNG header metadata.
Bytes and Bytearray: Quick Example
Here is the core distinction in 15 lines — bytes is immutable, bytearray is mutable:
# bytes_quick.py
# bytes: immutable
data = b"Hello, World!"
print(type(data), data)
print("First byte:", data[0]) # Integer (72 = ord('H'))
print("Slice:", data[0:5]) # Still bytes
# bytearray: mutable
buf = bytearray(b"Hello, World!")
buf[0] = 104 # Change 'H' to 'h' (ASCII 104)
buf.extend(b" Goodbye!")
print("Modified:", buf)
print("As string:", buf.decode("utf-8"))
Output:
<class 'bytes'> b'Hello, World!'
First byte: 72
Slice: b'Hello'
Modified: bytearray(b'hello, World! Goodbye!')
As string: hello, World! Goodbye!
Indexing a bytes or bytearray object with a single integer returns an int (the byte value 0-255), not a single-character string. Slicing returns a new bytes or bytearray object. This is the most common source of confusion for developers coming from Python 2.
Creating bytes and bytearray
There are several ways to create byte sequences in Python, each suited to different situations:
| Method | Example | Use case |
|---|---|---|
Literal prefix b"" | b"hello" | Hard-coded ASCII byte strings |
str.encode() | "hello".encode("utf-8") | Converting text to bytes |
bytes(n) | bytes(10) | Zero-filled buffer of n bytes |
bytes(iterable) | bytes([72, 101, 108]) | From list of integers (0-255) |
bytes.fromhex() | bytes.fromhex("48656c6c6f") | From hex string |
bytearray(source) | bytearray(b"hello") | Mutable copy of bytes |
# bytes_creation.py
# From ASCII literal
a = b"Python"
print(a, a.hex())
# From UTF-8 string
b = "Python -- binary".encode("utf-8")
print(b)
# Zero-filled buffer
buf = bytearray(8)
print("Zeroed buffer:", buf)
# From integer list
c = bytes([80, 121, 116, 104, 111, 110])
print("From ints:", c.decode())
# From hex string
d = bytes.fromhex("deadbeef")
print("From hex:", d, d.hex())
Output:
b'Python' 507974686f6e
b'Python \xe2\x80\x94 binary'
Zeroed buffer: bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')
From ints: Python
From hex: b'\xde\xad\xbe\xef' deadbeef
Notice how "Python -- binary".encode("utf-8") shows the em-dash encoded as three UTF-8 bytes (\xe2\x80\x94). This is why Python 3 makes the string/bytes distinction explicit — text and binary data have completely different internal representations.
Encoding and Decoding Strings
The most frequent use of bytes is converting between text (strings) and binary data. Always specify the encoding explicitly — never rely on the default, which varies by platform.
# bytes_encoding.py
text = "Cafe -- served with care"
# Encode to bytes
utf8_bytes = text.encode("utf-8")
utf16_bytes = text.encode("utf-16")
ascii_bytes = "Hello".encode("ascii")
print("UTF-8: ", utf8_bytes[:20], "...", len(utf8_bytes), "bytes")
print("UTF-16:", utf16_bytes[:20], "...", len(utf16_bytes), "bytes")
# Decode back to string
decoded = utf8_bytes.decode("utf-8")
print("Decoded:", decoded)
# Handling errors gracefully
mixed = b"Hello \x80 World" # Invalid UTF-8 byte \x80
safe = mixed.decode("utf-8", errors="replace")
print("With replace:", safe)
strict_safe = mixed.decode("utf-8", errors="ignore")
print("With ignore: ", strict_safe)
Output:
UTF-8: b'Cafe \xe2\x80\x94 served wi' ... 26 bytes
UTF-16: b'\xff\xfec\x00a\x00f\x00e\x00' ... 52 bytes
Decoded: Cafe -- served with care
With replace: Hello World
With ignore: Hello World
UTF-8 is the right choice for almost everything — it is ASCII-compatible, compact for English text, and the universal standard for the web. UTF-16 uses 2 bytes minimum per character and adds a byte-order mark. The errors parameter to .decode() controls what happens with bytes that are not valid in the given encoding: "replace" substitutes the Unicode replacement character, "ignore" silently drops the bad byte, and "strict" (default) raises a UnicodeDecodeError.
Searching and Slicing Binary Data
Because bytes supports the same sequence operations as str, you can search, slice, split, and join binary data with familiar method names.
# bytes_searching.py
data = b"Content-Type: application/json\r\nContent-Length: 42\r\n\r\n{}"
# Find the header separator
sep_pos = data.find(b"\r\n\r\n")
headers_raw = data[:sep_pos]
body = data[sep_pos + 4:]
print("Headers block:", headers_raw.decode())
print("Body:", body.decode())
# Split headers into individual lines
for line in headers_raw.split(b"\r\n"):
if b":" in line:
key, value = line.split(b": ", 1)
print(f" {key.decode()}: {value.decode()}")
# Check content type
if data.startswith(b"Content"):
print("Starts with Content -- yes")
print("Position of 'json':", data.find(b"json"))
Output:
Headers block: Content-Type: application/json
Content-Length: 42
Body: {}
Content-Type: application/json
Content-Length: 42
Starts with Content -- yes
Position of 'json': 24
This pattern — splitting HTTP headers on \r\n and splitting each header on ": " — is essentially what Python’s http module does internally. Working directly with bytes is necessary here because HTTP headers are specified as ASCII bytes, not Unicode strings.
Packing Binary Data with struct
The struct module converts between Python values and packed binary data using C-style format strings. This is how you read binary file formats, network protocol headers, and hardware data formats.
# bytes_struct.py
import struct
# Pack: Python values -> bytes
packed = struct.pack(">IHH", 1234567, 100, 200)
print("Packed bytes:", packed.hex(), "-- length:", len(packed))
# Unpack: bytes -> Python values
value1, value2, value3 = struct.unpack(">IHH", packed)
print("Unpacked:", value1, value2, value3)
# Format string key: > = big-endian, I = unsigned int (4 bytes), H = unsigned short (2 bytes)
print("\nFormat sizes:")
for fmt, name in [("B", "unsigned byte"), ("H", "unsigned short"), ("I", "unsigned int"), ("Q", "unsigned long long")]:
print(f" {fmt} ({name}): {struct.calcsize(fmt)} bytes")
# Pack multiple fields into a fixed-size record
RECORD_FMT = ">HHI" # port, flags, timestamp
record = struct.pack(RECORD_FMT, 8080, 0b11, 1713484800)
print("\nRecord hex:", record.hex())
port, flags, ts = struct.unpack(RECORD_FMT, record)
print(f"Port: {port}, Flags: {flags:02b}, Timestamp: {ts}")
Output:
Packed bytes: 0012d68700640c8 -- length: 8
Unpacked: 1234567 100 200
Format sizes:
B (unsigned byte): 1 bytes
H (unsigned short): 2 bytes
I (unsigned int): 4 bytes
Q (unsigned long long): 8 bytes
Record hex: 1f9000021ef34c00
Port: 8080, Flags: 11, Timestamp: 1713484800
The format string prefix controls byte order: > means big-endian (network byte order, most significant byte first), < means little-endian, and = uses the native byte order of the current machine. Always specify byte order explicitly when interoperating with external systems — never rely on native order for data you will send over a network or save to a file.
Reading and Writing Binary Files
Open files in binary mode with "rb" or "wb" to read and write raw bytes. This is essential for images, PDFs, audio files, and any non-text format.
# bytes_binary_file.py
# Write binary data
with open("data.bin", "wb") as f:
# Write a simple custom binary format: 4-byte magic + 2-byte version + data
header = struct.pack(">4sH", b"PYDT", 1)
payload = b"Sample binary payload data\x00" * 3
f.write(header)
f.write(struct.pack(">I", len(payload))) # payload length
f.write(payload)
import struct # ensure import at top in real scripts
# Read it back
with open("data.bin", "rb") as f:
magic, version = struct.unpack(">4sH", f.read(6))
payload_len = struct.unpack(">I", f.read(4))[0]
payload = f.read(payload_len)
print("Magic:", magic.decode())
print("Version:", version)
print("Payload length:", payload_len)
print("Payload preview:", payload[:27])
Output:
Magic: PYDT
Version: 1
Payload length: 81
Payload preview: b'Sample binary payload data\x00'
Always open binary files with "rb"/"wb" — never with just "r"/"w". On Windows, text mode performs newline translation (\r\n becomes \n), which silently corrupts binary files. The pattern of writing a magic number at the start of a file is used by virtually every binary format — PNG starts with \x89PNG\r\n\x1a\n, PDF starts with %PDF, ZIP starts with PK.
Real-Life Example: PNG Header Parser
PNG files start with a fixed 8-byte signature followed by mandatory chunks. This parser reads the PNG signature and the IHDR chunk (which contains image width, height, and color mode) without loading the entire image into memory.
# png_header_parser.py
import struct
import urllib.request
def parse_png_header(data: bytes) -> dict:
"""Parse PNG signature and IHDR chunk from raw bytes."""
PNG_SIGNATURE = b"\x89PNG\r\n\x1a\n"
if data[:8] != PNG_SIGNATURE:
raise ValueError("Not a PNG file -- magic bytes do not match")
# IHDR chunk starts at byte 8
# Format: 4-byte length + 4-byte type + data + 4-byte CRC
chunk_len = struct.unpack(">I", data[8:12])[0]
chunk_type = data[12:16]
if chunk_type != b"IHDR":
raise ValueError("Expected IHDR chunk")
# IHDR data: width(4) + height(4) + bit_depth(1) + color_type(1) + ...
ihdr = data[16:16 + chunk_len]
width, height, bit_depth, color_type = struct.unpack(">IIBB", ihdr[:10])
color_modes = {0: "Grayscale", 2: "RGB", 3: "Indexed", 4: "Grayscale+Alpha", 6: "RGBA"}
return {
"width": width,
"height": height,
"bit_depth": bit_depth,
"color_mode": color_modes.get(color_type, "Unknown"),
"raw_signature": data[:8].hex(),
}
# Fetch a real PNG image (Python logo)
url = "https://www.python.org/static/img/python-logo.png"
with urllib.request.urlopen(url) as resp:
raw = resp.read(200) # Only need the first 200 bytes
info = parse_png_header(raw)
for key, val in info.items():
print(f"{key:20s}: {val}")
Output:
width : 290
height : 82
bit_depth : 8
color_mode : RGBA
raw_signature : 89504e470d0a1a0a
We only read the first 200 bytes of the file — enough to get the header without downloading the full image. The IHDR chunk always appears first in any valid PNG file, making this approach reliable. This same technique works for any binary format with a fixed header structure: BMP files, WAV audio, ZIP archives, ELF executables.
Frequently Asked Questions
What is the difference between bytes and str in Python 3?
str is a sequence of Unicode code points — it represents text and knows nothing about how that text is stored on disk or sent over a network. bytes is a sequence of integers (0-255) — it represents raw binary data. Converting between them requires an explicit encoding (UTF-8, ASCII, Latin-1, etc.). You cannot concatenate a str and a bytes object; Python will raise a TypeError. This strict separation prevents the encoding bugs that were common in Python 2.
When should I use bytearray instead of bytes?
Use bytearray when you need to modify the data in place — for example, building a binary packet by writing fields one by one, or patching specific bytes in a buffer. Use bytes when the data should be immutable — for function arguments, dictionary keys, and data you will not modify. bytearray is also useful for large binary buffers where creating a new bytes object for every modification would be wasteful.
How do I print bytes as hexadecimal?
Use data.hex() to get a lowercase hex string, or data.hex(" ") (Python 3.8+) to insert a space between each byte for readability. To go the other way, call bytes.fromhex("deadbeef"). For debugging, repr(data) shows the b"..." form with \xNN escapes for non-printable bytes. Use int.from_bytes(data, byteorder="big") to convert a byte sequence to a Python integer.
What is memoryview and when do I need it?
A memoryview provides a view into the underlying buffer of a bytes or bytearray object without copying data. Slicing a memoryview returns another memoryview pointing into the same memory — very efficient for large buffers where you want to process different sections without allocating new objects. Use it when you are working with large binary streams (network buffers, numpy arrays, audio data) and performance matters. For most everyday work, slicing bytes directly is fine.
What are the most useful struct format characters?
The most commonly used format characters are: B (unsigned byte, 1 byte), H (unsigned short, 2 bytes), I (unsigned int, 4 bytes), Q (unsigned long long, 8 bytes), f (float, 4 bytes), d (double, 8 bytes), s (char array, used as Ns for N bytes), and x (padding byte). Prefix with > for big-endian (network order) or < for little-endian (most Intel/x86 data). Use struct.calcsize(fmt) to check the byte size of a format string before use.
Conclusion
Python’s bytes and bytearray types give you precise control over binary data. You have covered the full toolkit: creating byte sequences from literals, strings, integers, and hex values; encoding and decoding with explicit error handling; slicing and searching binary buffers; using the struct module to pack and unpack binary formats; reading binary files; and the PNG header parser that shows these concepts working together in a real format.
A great next project is to extend the PNG parser to scan all chunks in a PNG file (not just IHDR) and print a manifest of chunk types and sizes — PNG files can contain text metadata (tEXt chunks), color profiles (iCCP), and transparency data (tRNS) beyond the image pixels. That project will give you practice with the chunk-based parsing pattern used by many binary formats.
For the complete API reference, see the Python binary sequence types documentation and the struct module documentation.
Related Articles
Async Await Code Example in Python
In the previous section we created an asynchronous version manually. Here’s the same outcome but written with the async await syntax. As you’ll notice it is very similar to the original synchronous version:
import time, datetime, time
import asyncio
import time, datetime, timeit
customer_queue = [ "C1", "C2", "C3" ]
def get_next_customer():
return customer_queue.pop(0) #Get the first customer from list
async def cook_hamburger(customer):
start_customer_timer = timeit.default_timer()
print( f"[{customer}]: Start cooking hamberger for customer")
await asyncio.sleep(2) # Sleep but release control
end_customer_timer = timeit.default_timer()
print( f"[{customer}]: Finish cooking hamberger for customer. Total {end_customer_timer-start_customer_timer} seconds\n")
async def run_shop():
cooking_queue = []
while customer_queue:
curr_customer = get_next_customer()
cooking_queue.append( cook_hamburger(curr_customer) ) #this returns a task only
#cooking_queue[] has all the async tasks
await asyncio.gather( *cooking_queue ) #Run all in parallel
def main():
print('Hamburger Shop')
start = timeit.default_timer()
asyncio.run( run_shop() ) #Start the event loop
stop = timeit.default_timer()
print(f"** Total runtime: {stop-start} seconds ***")
if __name__ == '__main__':
main()
Output as follows:

Let’s walk through the code:
- Firstly, the async await is available from the library
asynciohence theimport asyncio - There’s funny set of
asynckeywords which precede thedef run_shop()and thedef cook_hamburger(customer)functions. In addition therun_shop()is no longer called directly, instead it is called with aasyncio.run( run_shop() )function call. So here’s what is happening:- The
asyncio.run()function is the trigger for the so-called event loop. It continues to run forever until all the tasks given to it are completed. You must pass it a function with theasync def...prefix hence whyrun_shop()has the async prefix - In the
async def run_shop()function call, the code iterates while there are customers in the queue to process, and then there’s a call tocook_hamburger(curr_customer)for each customer. A direct call to the customer does not actually call the function but instead creates a task to execute this. That is what theasynctells the compiler – that when called directly, return a task. - At the end of the function code in
def run_shop()there’s a call to functionawait asyncio.gather( *cooking_queue). There’s a few things going on here:- The
awaitkeywords indicates that you need wait for the work to complete but python can do something else in the meantime - The call to
gather()actually executes all the tasks given to it as a parameter collectively as a group and then returns the results sequentially (please note that the order of the tasks being executed may be random) - The
*customer_queuesimply expands the list into a list of parameter items. So for example ifcustomer_queue[] == [ '1', '2', '3']then thegather( *customer_queue)would be the same asgather( '1', '2', '3').
- The
- When the
await asyncio.gather( *customer_queue )is called, theawaitkeyword releases control to any activities that are pending and one of them would be to the calls to functioncook_hamburger()which was added to thecustomer_queuelist. Hence calls tocook_hamburger()would be triggered. - Within
cook_hamburger()there is also anawait asyncio.sleep(2). This simply waits for 2 seconds, however, it does not force the program to wait for the 2 seconds to complete, instead theawaitkeyword releases python to do something else in the meantime. This is similar to step 3 in Figure 2 where the chef/waiter puts the hamburger on the grill, but then doesn’t wait for the 2 second but instead does something else (i.e. serve the next customer)
- The
- The
asyncio.run()are new keywords as part of python 3.7. In older versions of python you may see the following but it is the same as simply runningasyncio.run( run_shop() ):loop = asyncio.get_event_loop()loop.run_until_complete(run_shop())loop.close()
- As you will notice, this is very similar to the synchronous code that covers Figure 1 above. This is the beauty of async/await
So remember, whenever there’s an await then that means python pauses at that point for that task to complete but then also releases python to do something else. That’s how the performance improvement occurs. In this example, the runtime of this is 2 seconds instead of the sequential 6 seconds!
Async Asynchronous Calling Another Async Function Code Example
Suppose you want t also call another async function once your first async function is completed – how do you go about this? Remember the rule, if you want to run something asynchronously, you have to use the await keyword, and that the function you’re calling has to be defined with async def ...
To continue with the restaurant theme, suppose that after the hamburger is cooked you ask an assistant to put the hamburger into a takeaway bag which takes 1 second. This is also another task that you need not ‘block’ and wait for it to complete. Hence, this action can be put into a function which is defined as an async. Here’s what the code can look like:
import time, datetime, time
import asyncio
customer_queue = [ "C1", "C2", "C3" ]
def get_next_customer():
return customer_queue.pop(0) #Get the first customer from list
async def cook_hamburger(customer):
start_customer_timer = timeit.default_timer()
print( f"[{customer}]: Start cooking hamberger for customer")
await asyncio.sleep(2) # Sleep but release control
end_customer_timer = timeit.default_timer()
print( f"[{customer}]: Finish cooking hamberger for customer. Total {end_customer_timer-start_customer_timer} seconds")
await put_hamburger_in_takeaway_bag( customer )
async def put_hamburger_in_takeaway_bag( customer):
start_customer_timer = timeit.default_timer()
print( f"[{customer}]: Start packing hamberger")
await asyncio.sleep(1) # It takes 2 seconds to cook the hamburger
end_customer_timer = timeit.default_timer()
print( f"[{customer}]: Finish packing hamberger. Total {end_customer_timer-start_customer_timer} seconds\n")
async def run_shop():
cooking_queue = []
while customer_queue:
curr_customer = get_next_customer()
cooking_queue.append( cook_hamburger(curr_customer) ) #Get each of the event loops
await asyncio.gather( *cooking_queue ) #Run all in parallel
def main():
print('Hamburger Shop')
start = timeit.default_timer()
asyncio.run( run_shop() ) #Start the event loop
stop = timeit.default_timer()
print(f"** Total runtime: {stop-start} seconds ***")
if __name__ == '__main__':
main()
The output would be:

See how once the hamburger is cooked (e.g. [C1]: Finish cooking hamburger for customer. Total 2.000924572115764 seconds), then immediately afterwards you have the [C1]: Start packing hamburger step but also gets called asynchronously.
Async Await Real World Example With Web Crawler in Python
One difficulty in learning Async / Await is that many examples provided simply provide the asyncio.sleep() as an example which is helpful to understand the concept, but not very helpful when you want to make something more useful. Let’s try a more complex example where you want to get some stock data from finance.yahoo.com and then, for that same stock, you also get the first 3 newspaper articles from news.google.com in the last 24 hours.
Now one thing you will realise is that await only works with functions that are defined as async. So you cannot call any function with await. Why? Well recall that when you call await you are expecting a function to return a task and not actually call the function, hence that function needs to be defined as async in order to tell python that it returns a task to be executed at the next available time.
Let’s see the synchronous version of the code:
import asyncio, requests, timeit
from bs4 import BeautifulSoup
from pygooglenews import GoogleNews
stock_list = [ "TSLA", "AAPL"]
def get_stock_price_data(stock):
print(f"-- getting stock data for {stock}")
data = {"stock":stock, "price_open":0, "price_close":0 }
stock_page = requests.get( 'https://finance.yahoo.com/quote/' + stock, headers={'Cache-Control': 'no-cache', "Pragma": "no-cache"})
soup = BeautifulSoup(stock_page.text, 'html.parser')
#<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text
#<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text
return data
def get_recent_news(stock):
print(f"-- getting news data for {stock}")
gn = GoogleNews()
search = gn.search(f"stocks {stock}", when = '24h')
news = search['entries'][0:3]
return news
def print_stock_update(stock, data, news):
print(f"Stock:{ stock }")
price_change = 0
if int(float(data['price_open'])) != 0: price_change = round( 100 * ( float( data['price_close'])/float(data['price_open'])-1), 2)
print(f"Open Price:{data['price_open']} Close Price:{data['price_close']} Change:{price_change}% ")
print("Latest News:")
for news_item in news:
print( f"{news_item.published}:{news_item.source.title} - {news_item.title}" )
print("\n")
def process_stocks():
for stock in stock_list:
data = get_stock_price_data( stock )
news=[]
news = get_recent_news( stock )
print_stock_update(stock, data, news)
if __name__ == '__main__':
start_timer = timeit.default_timer()
process_stocks()
end_timer = timeit.default_timer()
print(f"** Total runtime: {end_timer-start_timer} seconds ***")
Output as follows:

So what’s happening here. Well, you are looping through two stocks TSLA and AAPL, and for each stock the following happens sequentially:
- A call to
data = get_stock_price_data( stock )occurs in order to make a call torequests.get( 'https://finance.yahoo.com/quote/' + stock)to get the HTML page for the TSLA stock. Effectively, this page: https://finance.yahoo.com/quote/TSLA - Next we use
BeautifulSoup()in order to find the HTML snippet that contains the stock price data for the opening price and the closing price:


- After the call to yahoo is complete, then there’s a call to
news = get_recent_news( stock )which uses the modulepygooglenewsto get the latest google news. In fact we have used this function in our previous Twitter Bot article. - Once this is all done, that output is printed out with the call to
print_stock_update(stock, data, news)
Clearly this could be called asynchronously as we are looping each time for each stock, and then also the call to get the stock data is independent to getting the news data. However, one thing has to happen sequentially is the print_stock_update(stock, data, news) which has to wait for both the async calls to complete.
One wait to try is to simply call the website download with:
stock_page = await requests.get( 'https://finance.yahoo.com/quote/' + stock, headers={'Cache-Control': 'no-cache', "Pragma": "no-cache"})
However, you will get the following error:

The reason is, as you may have guessed, is that the requests.get() is not created with the async def... construct and hence cannot be called asynchronously.
What you can do however is to use another ‘get’ web page module called httpx. This function is defined with async def... and can be called similar to requests. That same line would be re-written as:
import httpx
#....
async def get_stock_price_data(stock):
print(f"-- stock data:getting stock data for {stock}")
data = {"stock":stock, "price_open":0, "price_close":0 }
#*** instead of requests.get('https://finance.yahoo.com/quote/' + stock)) ****
client = httpx.AsyncClient()
stock_page = await client.get( 'https://finance.yahoo.com/quote/' + stock)
soup = BeautifulSoup(stock_page.text, 'html.parser')
#<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text
#<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text
print(f"-- stock data:done {stock}")
return data
Ok, that works well. However, but what about the GoogleNews() code. There is no such async version of this function, so how can this be called asynchronously? Well for this, you can actually wrap it around a new thread. A ‘thread’ is way to run a piece of code under the same CPU process but in a parallel. It warrants a whole separate article but for now you can think of it as finding a separate space to execute this independent of the current execution path. However, to execute this in a separate thread, there’s a bit more involved.
The code looks like the following:
### Original Version
def get_recent_news(stock):
print(f"-- stock news:getting stock data for {stock}")
gn = GoogleNews()
search = gn.search(f"stocks {stock}", '24h') #Slow code to run asynchronously
news = search['entries'][0:3]
print(f"-- stock news:done {stock}")
return news
### Asynchronous Version
async def get_recent_news(stock):
print(f"-- stock news:getting stock data for {stock}")
gn = GoogleNews()
search = await asyncio.get_event_loop().run_in_executor( None, gn.search, f"stocks {stock}", '24h')
news = search['entries'][0:3]
print(f"-- stock news:done {stock}")
return news
Here what’s happening is that firstly we are using the await keyword to call the gn.search() function which is now being called through this asyncio.get_event_loop().run_in_executor( .. ) function call. What’s happening here is that we are asking the asyncio module to get access to the event loop (that piece of code that continuously checks for tasks to be done) and then to run in a separate thread. The way it is called is that the parameters must be passed in separate to the function call and hence why the parameters are to be passed in after the function name itself. You will also notice that the whole function can now be defined as async def get_recent_news(stock)
How To Mix Asynchronous And Synchronous Code With Await Async in Python
Now the final problem to be solved is how do we call the two functions of get_stock_price_data( stock ) and get_recent_news(stock) to be run asynchronously, but then wait for both to finish, and THEN run the print. This is where these steps should all be grouped under one function. This is the trick to mix asynchronous and synchronous code.
In order to run a group of tasks in parallel as a group you use asyncio.gather(). However, if you want to execute a synchronous function when ALL tasks that were given to asyncio.gather() is complete, then you should wrap it in another asyncio.gather()
async def process_stock_batch(stock):
(data, news) = await asyncio.gather( get_stock_price_data( stock ), get_recent_news(stock) )
print('-- print:request printing')
print_stock_update(stock, data, news)
print('-- print:done')
async def process_stocks():
run_stock_list = []
for stock in stock_list:
run_stock_list.append( process_stock_batch(stock) )
await asyncio.gather( *run_stock_list )
Before we solve it for the real world examples, lets show a simpler example. Suppose we had the following example:
import asyncio, timeit
async def get_web_data_A(index):
await asyncio.sleep(1)
print(f"Get Web Data-A[{index}] - sleep 1 second")
async def get_web_data_B(index):
await asyncio.sleep(1)
print(f"Get Web Data-B[{index}] - sleep 1 second")
async def process(index, start_timer):
await asyncio.gather( get_web_data_A(index), get_web_data_B(index) )
print(f"Calculate [{index}] - Elapsed time:[{timeit.default_timer()-start_timer}]")
async def run_all():
start_timer = timeit.default_timer()
for index in range(0,2):
await process(index, start_timer)
if __name__ == '__main__':
asyncio.run( run_all() )
This has the following output:

What is encouraging with this code, is that even though the call to get_web_data_A() and get_web_data_B() both sleep for 1 second, since they were doing that asynchronously, then the total runtime is still just a little over 1 second. This can be shown by the Calculate [0]... output. However, the problem is that the code still iterates each index sequentially, meaning, that index 0 is processed completely first, and once that’s done, then index 1 is processed. What we want instead is to run all the slow get_web_data_A() and get_web_data_B() first, and then run the code to calculate afterwards. This is where you need to first create the tasks for ALL the iterations, and then call gather() on all the tasks. See the following code:
import asyncio, timeit
async def get_web_data_A(index):
await asyncio.sleep(1)
print(f"Get Web Data-A[{index}] - sleep 1 second")
async def get_web_data_B(index):
await asyncio.sleep(1)
print(f"Get Web Data-B[{index}] - sleep 1 second")
async def process(index, start_timer):
await asyncio.gather( get_web_data_A(index), get_web_data_B(index) )
print(f"Calculate [{index}] - Elapsed time:[{timeit.default_timer()-start_timer}]")
async def run_all_2():
start_timer = timeit.default_timer()
task_queue = []
for index in range(0,2):
task_queue.append( process(index, start_timer) )
await asyncio.gather( *task_queue )
if __name__ == '__main__':
asyncio.run( run_all_2() )
Here, in the function async def run_all_2() when we loop, we do not call the blocking code await asyncio.gather... inside the for loop. Instead, we are adding all the tasks to call process(..) into a list called task_queue[], and then at the end of the for loop we are calling await asyncio.gather( *task_queue ) on all tasks in one go. Hence, the output is as follows:

You’ll notice that ALL the get_web_data_A() and get_web_data_B() are being called asynchronously, and then the calculate function is called on all the available data. Hence, the elapsed time for all the iterations is only 1 second, compared to the previous 2 seconds.
So what does this mean for our real world example for getting stock data from Yahoo and then calling Google News asynchronously, and then only printing the data once both are done? Well, the same principle applies. The code is as follows:
import asyncio, httpx, timeit
from bs4 import BeautifulSoup
from pygooglenews import GoogleNews
stock_list = [ "TSLA", "AAPL"]
async def get_stock_price_data(stock):
print(f"-- stock data:getting stock data for {stock}")
data = {"stock":stock, "price_open":0, "price_close":0 }
client = httpx.AsyncClient()
stock_page = await client.get( 'https://finance.yahoo.com/quote/' + stock)
soup = BeautifulSoup(stock_page.text, 'html.parser')
#<fin-streamer active="" class="Fw(b) Fz(36px) Mb(-4px) D(ib)" data-field="regularMarketPrice" data-pricehint="2" data-symbol="TSLA" data-test="qsp-price" data-trend="none" value="759.63">759.63</fin-streamer>
data['price_close'] = soup.find('fin-streamer', attrs={"data-symbol":stock, "data-field":"regularMarketPrice"} ).text
#<td class="Ta(end) Fw(600) Lh(14px)" data-test="OPEN-value">723.25</td>
data['price_open'] = soup.find( attrs={"data-test":"OPEN-value"}).text
print(f"-- stock data:done {stock}")
return data
async def get_recent_news(stock):
print(f"-- stock news:getting stock data for {stock}")
gn = GoogleNews()
search = await asyncio.get_event_loop().run_in_executor( None, gn.search, f"stocks {stock}", '24h')
news = search['entries'][0:3]
print(f"-- stock news:done {stock}")
return news
def print_stock_update(stock, data, news):
print('-- print:starting print')
print(f"Stock:{ stock }")
price_change = 0
if int(float(data['price_open'])) != 0: price_change = round( 100 * ( float( data['price_close'])/float(data['price_open'])-1), 2)
print(f"Open Price:{data['price_open']} Close Price:{data['price_close']} Change:{price_change}% ")
print("Latest News:")
for news_item in news:
print( f"{news_item.published}:{news_item.source.title} - {news_item.title}" )
print("\n")
async def process_stock_batch(stock):
(data, news) = await asyncio.gather( get_stock_price_data( stock ), get_recent_news(stock) )
print('-- print:request printing')
print_stock_update(stock, data, news)
print('-- print:done')
async def process_stocks():
run_stock_list = []
for stock in stock_list:
run_stock_list.append( process_stock_batch(stock) )
await asyncio.gather( *run_stock_list )
if __name__ == '__main__':
start_timer = timeit.default_timer()
asyncio.run( process_stocks() )
end_timer = timeit.default_timer()
print(f"** Total runtime: {end_timer-start_timer} seconds ***")
The key bit of code is in the async def process_stocks() which now iterates over each of the stocks, creates tasks, and then calls await asyncio.gather( *run_stock_list ) on all the stocks in one go, and then in the function process_stock_batch(stock) we have the asynchronous call to (data, news) = await asyncio.gather( get_stock_price_data( stock ), and then the synchronous call to print_stock_update(stock, data, news) once both web data is complete.
Conclusion
The await and async function is an incredibly useful feature of python which takes a bit of getting used to in order to understand the concept, but once you’ve got the hang of it, it can be incredibly useful to get an improve of the performance of your code by leveraging idle time where you are waiting for a task to complete. Remember to be sure about the sequencing and being mindful of whether you care to have a follow-up activity once that task is completed, or you can simply continue to execute.
This not easy to grasp as a beginner, but follow the example code above, and if you get stuck feel free to reach out through our email list below.
How To Work with Python bytes and bytearray
Intermediate
Working with binary data is one of those skills that feels intimidating until you understand the two types Python gives you: bytes (immutable) and bytearray (mutable). Whether you are reading a binary file format, sending raw bytes over a network socket, working with image data, or packing integers into a compact binary protocol, these two types are what you reach for. The confusion usually comes from mixing up strings (Unicode text) and bytes (raw binary data) — a distinction Python 3 enforces strictly.
Both bytes and bytearray are built into Python 3 — no imports needed for the basic operations. The struct module (also built-in) handles packing and unpacking binary data formats like those used in network protocols and file headers. The examples below all run with the standard library only.
This article covers: creating bytes and bytearrays, encoding and decoding strings, slicing and searching binary data, modifying bytearrays in place, using the struct module for binary packing, reading binary files, and a real-life binary file parser that reads PNG header metadata.
Bytes and Bytearray: Quick Example
Here is the core distinction in 15 lines — bytes is immutable, bytearray is mutable:
# bytes_quick.py
# bytes: immutable
data = b"Hello, World!"
print(type(data), data)
print("First byte:", data[0]) # Integer (72 = ord('H'))
print("Slice:", data[0:5]) # Still bytes
# bytearray: mutable
buf = bytearray(b"Hello, World!")
buf[0] = 104 # Change 'H' to 'h' (ASCII 104)
buf.extend(b" Goodbye!")
print("Modified:", buf)
print("As string:", buf.decode("utf-8"))
Output:
<class 'bytes'> b'Hello, World!'
First byte: 72
Slice: b'Hello'
Modified: bytearray(b'hello, World! Goodbye!')
As string: hello, World! Goodbye!
Indexing a bytes or bytearray object with a single integer returns an int (the byte value 0-255), not a single-character string. Slicing returns a new bytes or bytearray object. This is the most common source of confusion for developers coming from Python 2.
Creating bytes and bytearray
There are several ways to create byte sequences in Python, each suited to different situations:
| Method | Example | Use case |
|---|---|---|
Literal prefix b"" | b"hello" | Hard-coded ASCII byte strings |
str.encode() | "hello".encode("utf-8") | Converting text to bytes |
bytes(n) | bytes(10) | Zero-filled buffer of n bytes |
bytes(iterable) | bytes([72, 101, 108]) | From list of integers (0-255) |
bytes.fromhex() | bytes.fromhex("48656c6c6f") | From hex string |
bytearray(source) | bytearray(b"hello") | Mutable copy of bytes |
# bytes_creation.py
# From ASCII literal
a = b"Python"
print(a, a.hex())
# From UTF-8 string
b = "Python -- binary".encode("utf-8")
print(b)
# Zero-filled buffer
buf = bytearray(8)
print("Zeroed buffer:", buf)
# From integer list
c = bytes([80, 121, 116, 104, 111, 110])
print("From ints:", c.decode())
# From hex string
d = bytes.fromhex("deadbeef")
print("From hex:", d, d.hex())
Output:
b'Python' 507974686f6e
b'Python \xe2\x80\x94 binary'
Zeroed buffer: bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')
From ints: Python
From hex: b'\xde\xad\xbe\xef' deadbeef
Notice how "Python -- binary".encode("utf-8") shows the em-dash encoded as three UTF-8 bytes (\xe2\x80\x94). This is why Python 3 makes the string/bytes distinction explicit — text and binary data have completely different internal representations.
Encoding and Decoding Strings
The most frequent use of bytes is converting between text (strings) and binary data. Always specify the encoding explicitly — never rely on the default, which varies by platform.
# bytes_encoding.py
text = "Cafe -- served with care"
# Encode to bytes
utf8_bytes = text.encode("utf-8")
utf16_bytes = text.encode("utf-16")
ascii_bytes = "Hello".encode("ascii")
print("UTF-8: ", utf8_bytes[:20], "...", len(utf8_bytes), "bytes")
print("UTF-16:", utf16_bytes[:20], "...", len(utf16_bytes), "bytes")
# Decode back to string
decoded = utf8_bytes.decode("utf-8")
print("Decoded:", decoded)
# Handling errors gracefully
mixed = b"Hello \x80 World" # Invalid UTF-8 byte \x80
safe = mixed.decode("utf-8", errors="replace")
print("With replace:", safe)
strict_safe = mixed.decode("utf-8", errors="ignore")
print("With ignore: ", strict_safe)
Output:
UTF-8: b'Cafe \xe2\x80\x94 served wi' ... 26 bytes
UTF-16: b'\xff\xfec\x00a\x00f\x00e\x00' ... 52 bytes
Decoded: Cafe -- served with care
With replace: Hello World
With ignore: Hello World
UTF-8 is the right choice for almost everything — it is ASCII-compatible, compact for English text, and the universal standard for the web. UTF-16 uses 2 bytes minimum per character and adds a byte-order mark. The errors parameter to .decode() controls what happens with bytes that are not valid in the given encoding: "replace" substitutes the Unicode replacement character, "ignore" silently drops the bad byte, and "strict" (default) raises a UnicodeDecodeError.
Searching and Slicing Binary Data
Because bytes supports the same sequence operations as str, you can search, slice, split, and join binary data with familiar method names.
# bytes_searching.py
data = b"Content-Type: application/json\r\nContent-Length: 42\r\n\r\n{}"
# Find the header separator
sep_pos = data.find(b"\r\n\r\n")
headers_raw = data[:sep_pos]
body = data[sep_pos + 4:]
print("Headers block:", headers_raw.decode())
print("Body:", body.decode())
# Split headers into individual lines
for line in headers_raw.split(b"\r\n"):
if b":" in line:
key, value = line.split(b": ", 1)
print(f" {key.decode()}: {value.decode()}")
# Check content type
if data.startswith(b"Content"):
print("Starts with Content -- yes")
print("Position of 'json':", data.find(b"json"))
Output:
Headers block: Content-Type: application/json
Content-Length: 42
Body: {}
Content-Type: application/json
Content-Length: 42
Starts with Content -- yes
Position of 'json': 24
This pattern — splitting HTTP headers on \r\n and splitting each header on ": " — is essentially what Python’s http module does internally. Working directly with bytes is necessary here because HTTP headers are specified as ASCII bytes, not Unicode strings.
Packing Binary Data with struct
The struct module converts between Python values and packed binary data using C-style format strings. This is how you read binary file formats, network protocol headers, and hardware data formats.
# bytes_struct.py
import struct
# Pack: Python values -> bytes
packed = struct.pack(">IHH", 1234567, 100, 200)
print("Packed bytes:", packed.hex(), "-- length:", len(packed))
# Unpack: bytes -> Python values
value1, value2, value3 = struct.unpack(">IHH", packed)
print("Unpacked:", value1, value2, value3)
# Format string key: > = big-endian, I = unsigned int (4 bytes), H = unsigned short (2 bytes)
print("\nFormat sizes:")
for fmt, name in [("B", "unsigned byte"), ("H", "unsigned short"), ("I", "unsigned int"), ("Q", "unsigned long long")]:
print(f" {fmt} ({name}): {struct.calcsize(fmt)} bytes")
# Pack multiple fields into a fixed-size record
RECORD_FMT = ">HHI" # port, flags, timestamp
record = struct.pack(RECORD_FMT, 8080, 0b11, 1713484800)
print("\nRecord hex:", record.hex())
port, flags, ts = struct.unpack(RECORD_FMT, record)
print(f"Port: {port}, Flags: {flags:02b}, Timestamp: {ts}")
Output:
Packed bytes: 0012d68700640c8 -- length: 8
Unpacked: 1234567 100 200
Format sizes:
B (unsigned byte): 1 bytes
H (unsigned short): 2 bytes
I (unsigned int): 4 bytes
Q (unsigned long long): 8 bytes
Record hex: 1f9000021ef34c00
Port: 8080, Flags: 11, Timestamp: 1713484800
The format string prefix controls byte order: > means big-endian (network byte order, most significant byte first), < means little-endian, and = uses the native byte order of the current machine. Always specify byte order explicitly when interoperating with external systems — never rely on native order for data you will send over a network or save to a file.
Reading and Writing Binary Files
Open files in binary mode with "rb" or "wb" to read and write raw bytes. This is essential for images, PDFs, audio files, and any non-text format.
# bytes_binary_file.py
# Write binary data
with open("data.bin", "wb") as f:
# Write a simple custom binary format: 4-byte magic + 2-byte version + data
header = struct.pack(">4sH", b"PYDT", 1)
payload = b"Sample binary payload data\x00" * 3
f.write(header)
f.write(struct.pack(">I", len(payload))) # payload length
f.write(payload)
import struct # ensure import at top in real scripts
# Read it back
with open("data.bin", "rb") as f:
magic, version = struct.unpack(">4sH", f.read(6))
payload_len = struct.unpack(">I", f.read(4))[0]
payload = f.read(payload_len)
print("Magic:", magic.decode())
print("Version:", version)
print("Payload length:", payload_len)
print("Payload preview:", payload[:27])
Output:
Magic: PYDT
Version: 1
Payload length: 81
Payload preview: b'Sample binary payload data\x00'
Always open binary files with "rb"/"wb" — never with just "r"/"w". On Windows, text mode performs newline translation (\r\n becomes \n), which silently corrupts binary files. The pattern of writing a magic number at the start of a file is used by virtually every binary format — PNG starts with \x89PNG\r\n\x1a\n, PDF starts with %PDF, ZIP starts with PK.
Real-Life Example: PNG Header Parser
PNG files start with a fixed 8-byte signature followed by mandatory chunks. This parser reads the PNG signature and the IHDR chunk (which contains image width, height, and color mode) without loading the entire image into memory.
# png_header_parser.py
import struct
import urllib.request
def parse_png_header(data: bytes) -> dict:
"""Parse PNG signature and IHDR chunk from raw bytes."""
PNG_SIGNATURE = b"\x89PNG\r\n\x1a\n"
if data[:8] != PNG_SIGNATURE:
raise ValueError("Not a PNG file -- magic bytes do not match")
# IHDR chunk starts at byte 8
# Format: 4-byte length + 4-byte type + data + 4-byte CRC
chunk_len = struct.unpack(">I", data[8:12])[0]
chunk_type = data[12:16]
if chunk_type != b"IHDR":
raise ValueError("Expected IHDR chunk")
# IHDR data: width(4) + height(4) + bit_depth(1) + color_type(1) + ...
ihdr = data[16:16 + chunk_len]
width, height, bit_depth, color_type = struct.unpack(">IIBB", ihdr[:10])
color_modes = {0: "Grayscale", 2: "RGB", 3: "Indexed", 4: "Grayscale+Alpha", 6: "RGBA"}
return {
"width": width,
"height": height,
"bit_depth": bit_depth,
"color_mode": color_modes.get(color_type, "Unknown"),
"raw_signature": data[:8].hex(),
}
# Fetch a real PNG image (Python logo)
url = "https://www.python.org/static/img/python-logo.png"
with urllib.request.urlopen(url) as resp:
raw = resp.read(200) # Only need the first 200 bytes
info = parse_png_header(raw)
for key, val in info.items():
print(f"{key:20s}: {val}")
Output:
width : 290
height : 82
bit_depth : 8
color_mode : RGBA
raw_signature : 89504e470d0a1a0a
We only read the first 200 bytes of the file — enough to get the header without downloading the full image. The IHDR chunk always appears first in any valid PNG file, making this approach reliable. This same technique works for any binary format with a fixed header structure: BMP files, WAV audio, ZIP archives, ELF executables.
Frequently Asked Questions
What is the difference between bytes and str in Python 3?
str is a sequence of Unicode code points — it represents text and knows nothing about how that text is stored on disk or sent over a network. bytes is a sequence of integers (0-255) — it represents raw binary data. Converting between them requires an explicit encoding (UTF-8, ASCII, Latin-1, etc.). You cannot concatenate a str and a bytes object; Python will raise a TypeError. This strict separation prevents the encoding bugs that were common in Python 2.
When should I use bytearray instead of bytes?
Use bytearray when you need to modify the data in place — for example, building a binary packet by writing fields one by one, or patching specific bytes in a buffer. Use bytes when the data should be immutable — for function arguments, dictionary keys, and data you will not modify. bytearray is also useful for large binary buffers where creating a new bytes object for every modification would be wasteful.
How do I print bytes as hexadecimal?
Use data.hex() to get a lowercase hex string, or data.hex(" ") (Python 3.8+) to insert a space between each byte for readability. To go the other way, call bytes.fromhex("deadbeef"). For debugging, repr(data) shows the b"..." form with \xNN escapes for non-printable bytes. Use int.from_bytes(data, byteorder="big") to convert a byte sequence to a Python integer.
What is memoryview and when do I need it?
A memoryview provides a view into the underlying buffer of a bytes or bytearray object without copying data. Slicing a memoryview returns another memoryview pointing into the same memory — very efficient for large buffers where you want to process different sections without allocating new objects. Use it when you are working with large binary streams (network buffers, numpy arrays, audio data) and performance matters. For most everyday work, slicing bytes directly is fine.
What are the most useful struct format characters?
The most commonly used format characters are: B (unsigned byte, 1 byte), H (unsigned short, 2 bytes), I (unsigned int, 4 bytes), Q (unsigned long long, 8 bytes), f (float, 4 bytes), d (double, 8 bytes), s (char array, used as Ns for N bytes), and x (padding byte). Prefix with > for big-endian (network order) or < for little-endian (most Intel/x86 data). Use struct.calcsize(fmt) to check the byte size of a format string before use.
Conclusion
Python’s bytes and bytearray types give you precise control over binary data. You have covered the full toolkit: creating byte sequences from literals, strings, integers, and hex values; encoding and decoding with explicit error handling; slicing and searching binary buffers; using the struct module to pack and unpack binary formats; reading binary files; and the PNG header parser that shows these concepts working together in a real format.
A great next project is to extend the PNG parser to scan all chunks in a PNG file (not just IHDR) and print a manifest of chunk types and sizes — PNG files can contain text metadata (tEXt chunks), color profiles (iCCP), and transparency data (tRNS) beyond the image pixels. That project will give you practice with the chunk-based parsing pattern used by many binary formats.
For the complete API reference, see the Python binary sequence types documentation and the struct module documentation.
Related Articles
Related Articles
Further Reading: For more details, see the Python asyncio documentation.
Frequently Asked Questions
What is async/await in Python?
async def defines a coroutine function and await pauses execution until an asynchronous operation completes. This enables concurrent I/O operations without threading, using the asyncio event loop.
When should I use async/await instead of threading?
Use async/await for I/O-bound tasks like network requests and database queries with many concurrent connections. Use threading for CPU-bound tasks or libraries that do not support async.
How do I run multiple async tasks concurrently?
Use asyncio.gather(task1(), task2()) to run multiple coroutines concurrently. Use asyncio.create_task() to schedule without immediately waiting.
What does ‘coroutine was never awaited’ mean?
You called an async function without await. Async functions return coroutine objects that must be awaited. Add await before the call or use asyncio.run() from synchronous code.
Can I mix synchronous and asynchronous code?
Yes. Use asyncio.run() to call async from sync. Use loop.run_in_executor() to run blocking functions inside async code without blocking the event loop.