import io
from datetime import datetime
from struct import pack
import olefile
# An encrypted ECMA376 file is stored as an OLE container.
#
# At this point, creating an Ole file is somewhat of a chore, since
# the latest OleFile (v0.47) does not really do it.
#
# See https://github.com/decalage2/olefile/issues/6
#
# This file is not meant to support all manners of OLE files; it creates
# what we need (an OLE file with an encrypted stream + supporting streams).
# Nothing more, nothing less. So, unlike OleFile, we can take _a lot_ of
# shortcuts.
#
# Probably very brittle.
#
# File format:
#
# https://github.com/libyal/libolecf/blob/main/documentation/OLE%20Compound%20File%20format.asciidoc
#
# Initial C++ code from https://github.com/herumi/msoffice (BSD-3)
[docs]def datetime2filetime(dt):
"""
Convert Python datetime.datetime to FILETIME (64 bits unsigned int)
A file time is a 64-bit value that represents the number of 100-nanosecond intervals that have elapsed
since 12:00 A.M. January 1, 1601 Coordinated Universal Time (UTC).
https://learn.microsoft.com/en-us/windows/win32/sysinfo/file-times
"""
_FILETIME_NULL_DATE = datetime(1601, 1, 1, 0, 0, 0)
return int((dt - _FILETIME_NULL_DATE).total_seconds() * 10000000)
[docs]class RedBlack:
RED = 0 # Note that this is per-spec; olefile.py shows the opposite
BLACK = 1
[docs]class DirectoryEntryType:
EMPTY = 0
STORAGE = 1
STREAM = 2
LOCK_BYTES = 3
PROPERTY = 4
ROOT_STORAGE = 5
[docs]class SectorTypes:
MAXREGSECT = 0xFFFFFFFA
DIFSECT = 0xFFFFFFFC
FATSECT = 0xFFFFFFFD
ENDOFCHAIN = 0xFFFFFFFE
FREESECT = 0xFFFFFFFF
NOSTREAM = 0xFFFFFFFF
[docs]class DSPos:
# Order in the directories array; must be in sync with getDirectoryEntries()
iRoot = 0
iEncryptionPackage = 1
iDataSpaces = 2
iVersion = 3
iDataSpaceMap = 4
iDataSpaceInfo = 5
iStongEncryptionDataSpace = 6
iTransformInfo = 7
iStrongEncryptionTransform = 8
iPrimary = 9
iEncryptionInfo = 10
dirNum = 11
[docs]class DefaultContent:
# Lifted off of Herumi/msoffice (C++ package)
# https://github.com/herumi/msoffice/blob/master/include/resource.hpp
Version = b"\x3c\x00\x00\x00\x4d\x00\x69\x00\x63\x00\x72\x00\x6f\x00\x73\x00\x6f\x00\x66\x00\x74\x00\x2e\x00\x43\x00\x6f\x00\x6e\x00\x74\x00\x61\x00\x69\x00\x6e\x00\x65\x00\x72\x00\x2e\x00\x44\x00\x61\x00\x74\x00\x61\x00\x53\x00\x70\x00\x61\x00\x63\x00\x65\x00\x73\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00"
Primary = b"\x58\x00\x00\x00\x01\x00\x00\x00\x4c\x00\x00\x00\x7b\x00\x46\x00\x46\x00\x39\x00\x41\x00\x33\x00\x46\x00\x30\x00\x33\x00\x2d\x00\x35\x00\x36\x00\x45\x00\x46\x00\x2d\x00\x34\x00\x36\x00\x31\x00\x33\x00\x2d\x00\x42\x00\x44\x00\x44\x00\x35\x00\x2d\x00\x35\x00\x41\x00\x34\x00\x31\x00\x43\x00\x31\x00\x44\x00\x30\x00\x37\x00\x32\x00\x34\x00\x36\x00\x7d\x00\x4e\x00\x00\x00\x4d\x00\x69\x00\x63\x00\x72\x00\x6f\x00\x73\x00\x6f\x00\x66\x00\x74\x00\x2e\x00\x43\x00\x6f\x00\x6e\x00\x74\x00\x61\x00\x69\x00\x6e\x00\x65\x00\x72\x00\x2e\x00\x45\x00\x6e\x00\x63\x00\x72\x00\x79\x00\x70\x00\x74\x00\x69\x00\x6f\x00\x6e\x00\x54\x00\x72\x00\x61\x00\x6e\x00\x73\x00\x66\x00\x6f\x00\x72\x00\x6d\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00"
DataSpaceMap = b"\x08\x00\x00\x00\x01\x00\x00\x00\x68\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x20\x00\x00\x00\x45\x00\x6e\x00\x63\x00\x72\x00\x79\x00\x70\x00\x74\x00\x65\x00\x64\x00\x50\x00\x61\x00\x63\x00\x6b\x00\x61\x00\x67\x00\x65\x00\x32\x00\x00\x00\x53\x00\x74\x00\x72\x00\x6f\x00\x6e\x00\x67\x00\x45\x00\x6e\x00\x63\x00\x72\x00\x79\x00\x70\x00\x74\x00\x69\x00\x6f\x00\x6e\x00\x44\x00\x61\x00\x74\x00\x61\x00\x53\x00\x70\x00\x61\x00\x63\x00\x65\x00\x00\x00"
StrongEncryptionDataSpace = b"\x08\x00\x00\x00\x01\x00\x00\x00\x32\x00\x00\x00\x53\x00\x74\x00\x72\x00\x6f\x00\x6e\x00\x67\x00\x45\x00\x6e\x00\x63\x00\x72\x00\x79\x00\x70\x00\x74\x00\x69\x00\x6f\x00\x6e\x00\x54\x00\x72\x00\x61\x00\x6e\x00\x73\x00\x66\x00\x6f\x00\x72\x00\x6d\x00\x00\x00"
[docs]class DirectoryEntry:
def __init__(
self,
name="",
_type=DirectoryEntryType.EMPTY,
color=RedBlack.RED,
leftId=SectorTypes.NOSTREAM,
rightId=SectorTypes.NOSTREAM,
childId=SectorTypes.NOSTREAM,
clsid="",
bits=0,
ct=0,
mt=0,
loc=0,
content=b"",
):
self.Name = name
self.Type = _type
self.Color = color
self.LeftSiblingId = leftId
self.RightSiblingId = rightId
self.ChildId = childId
self.CLSID = clsid
self.StateBits = bits
self.CreationTime = ct
self.ModificationTime = mt
self.StartingSectorLocation = loc
self.Content = content
[docs] def write_filetime(self, obuf, ft):
# Write the lower 32 bits and upper 32 bits, in this order.
obuf.write(pack("<II", ft & 0xFFFFFFFF, ft >> 32))
@property
def Name(self):
return self._Name
@Name.setter
def Name(self, n):
if len(n) > 31:
raise ValueError("Name cannot be longer than 31 characters")
if set("!:/").intersection(n):
raise ValueError("Name contains invalid characters (!:/)")
self._Name = n
@property
def CLSID(self):
return self._CLSID
@CLSID.setter
def CLSID(self, c):
if c and len(c) != 16:
raise ValueError("CLSID must be blank, or 16 characters long")
self._CLSID = c
@property
def LeftSiblingId(self):
return self._LeftSiblingId
@LeftSiblingId.setter
def LeftSiblingId(self, id):
self._valid_id(id)
self._LeftSiblingId = id
@property
def RightSiblingId(self):
return self._RightSiblingId
@RightSiblingId.setter
def RightSiblingId(self, id):
self._valid_id(id)
self._RightSiblingId = id
@property
def ChildId(self):
return self._ChildId
@ChildId.setter
def ChildId(self, id):
self._valid_id(id)
self._ChildId = id
def _valid_id(self, id):
if not ((id <= SectorTypes.MAXREGSECT) or (id == SectorTypes.NOSTREAM)):
raise ValueError("Invalid id received")
[docs]class ECMA376EncryptedLayout:
def __init__(self, sectorSize):
self.sectorSize = sectorSize
self.miniFatNum = 0
self.miniFatDataSectorNum = 0
self.miniFatSectors = 0
self.numMiniFatSectors = 1
self.difatSectorNum = 0
self.fatSectorNum = 0
self.difatPos = 0
self.directoryEntrySectorNum = 0
self.encryptionPackageSectorNum = 0
@property
def fatPos(self):
return self.difatPos + self.difatSectorNum
@property
def miniFatPos(self):
return self.fatPos + self.fatSectorNum
@property
def directoryEntryPos(self):
return self.miniFatPos + self.numMiniFatSectors
@property
def miniFatDataPos(self):
return self.directoryEntryPos + self.directoryEntrySectorNum
@property
def contentSectorNum(self):
return self.numMiniFatSectors + self.directoryEntrySectorNum + self.miniFatDataSectorNum + self.encryptionPackageSectorNum
@property
def miniFatDataPos(self):
return self.directoryEntryPos + self.directoryEntrySectorNum
@property
def encryptionPackagePos(self):
return self.miniFatDataPos + self.miniFatDataSectorNum
@property
def totalSectors(self):
return self.difatSectorNum + self.fatSectorNum + self.contentSectorNum
@property
def totalSize(self):
return Header.BUFFER_SIZE + self.totalSectors * self.sectorSize
@property
def offsetDirectoryEntries(self):
return Header.BUFFER_SIZE + self.directoryEntryPos * self.sectorSize
@property
def offsetMiniFatData(self):
return Header.BUFFER_SIZE + self.miniFatDataPos * self.sectorSize
@property
def offsetFat(self):
return Header.BUFFER_SIZE + self.fatPos * self.sectorSize
@property
def offsetMiniFat(self):
return Header.BUFFER_SIZE + self.miniFatPos * self.sectorSize
[docs] def offsetDifat(self, n):
return Header.BUFFER_SIZE + (self.difatPos + n) * self.sectorSize
[docs] def offsetData(self, startingSectorLocation):
return Header.BUFFER_SIZE + startingSectorLocation * self.sectorSize
[docs] def offsetMiniData(self, startingSectorLocation):
return self.offsetMiniFatData + startingSectorLocation * 64
[docs]class ECMA376Encrypted:
def __init__(self, encryptedPackage=b"", encryptionInfo=b""):
self._header = self._get_default_header()
self._dirs = self._get_directory_entries()
self.set_payload(encryptedPackage, encryptionInfo)
[docs] def write_to(self, obuf):
"""
Writes the encrypted data to obuf
"""
# Create a temporary buffer with seek/tell capabilities, we do not want to assume the passed-in buffer has such
# capabilities (ie: piping to stdout).
_obuf = io.BytesIO()
self._write_to(_obuf)
# Finalize and write to client buffer.
obuf.write(_obuf.getvalue())
[docs] def set_payload(self, encryptedPackage, encryptionInfo):
self._dirs[DSPos.iEncryptionPackage].Content = encryptedPackage
self._dirs[DSPos.iEncryptionInfo].Content = encryptionInfo
def _get_default_header(self):
return Header()
def _get_directory_entries(self):
ft = datetime2filetime(datetime.now())
directories = [ # Must follow DSPos ordering
DirectoryEntry("Root Entry", DirectoryEntryType.ROOT_STORAGE, RedBlack.RED, ct=ft, mt=ft, childId=DSPos.iEncryptionInfo),
DirectoryEntry("EncryptedPackage", DirectoryEntryType.STREAM, RedBlack.RED, ct=ft, mt=ft),
DirectoryEntry("\x06DataSpaces", DirectoryEntryType.STORAGE, RedBlack.RED, ct=ft, mt=ft, childId=DSPos.iDataSpaceMap),
DirectoryEntry("Version", DirectoryEntryType.STREAM, RedBlack.BLACK, ct=ft, mt=ft, content=DefaultContent.Version),
DirectoryEntry(
"DataSpaceMap",
DirectoryEntryType.STREAM,
RedBlack.BLACK,
ct=ft,
mt=ft,
leftId=DSPos.iVersion,
rightId=DSPos.iDataSpaceInfo,
content=DefaultContent.DataSpaceMap,
),
DirectoryEntry(
"DataSpaceInfo",
DirectoryEntryType.STORAGE,
RedBlack.BLACK,
ct=ft,
mt=ft,
rightId=DSPos.iTransformInfo,
childId=DSPos.iStongEncryptionDataSpace,
),
DirectoryEntry(
"StrongEncryptionDataSpace",
DirectoryEntryType.STREAM,
RedBlack.BLACK,
ct=ft,
mt=ft,
content=DefaultContent.StrongEncryptionDataSpace,
),
DirectoryEntry(
"TransformInfo", DirectoryEntryType.STORAGE, RedBlack.RED, ct=ft, mt=ft, childId=DSPos.iStrongEncryptionTransform
),
DirectoryEntry("StrongEncryptionTransform", DirectoryEntryType.STORAGE, RedBlack.BLACK, ct=ft, mt=ft, childId=DSPos.iPrimary),
DirectoryEntry("\x06Primary", DirectoryEntryType.STREAM, RedBlack.BLACK, ct=ft, mt=ft, content=DefaultContent.Primary),
DirectoryEntry(
"EncryptionInfo",
DirectoryEntryType.STREAM,
RedBlack.BLACK,
ct=ft,
mt=ft,
leftId=DSPos.iDataSpaces,
rightId=DSPos.iEncryptionPackage,
),
]
return directories
def _write_to(self, obuf):
layout = ECMA376EncryptedLayout(self._header.sectorSize)
self._set_sector_locations_of_streams(layout)
self._detect_sector_num(layout)
self._header.firstDirectorySectorLocation = layout.directoryEntryPos
self._header.firstMiniFatSectorLocation = layout.miniFatPos
self._header.numMiniFatSectors = layout.numMiniFatSectors
self._dirs[DSPos.iRoot].StartingSectorLocation = layout.miniFatDataPos
self._dirs[DSPos.iRoot].Content = b"\0" * (64 * layout.miniFatNum)
self._dirs[DSPos.iEncryptionPackage].StartingSectorLocation = layout.encryptionPackagePos
for i in range(min(layout.fatSectorNum, Header.FIRSTNUMDIFAT)):
self._header.difat.append(layout.fatPos + i)
self._header.numFatSectors = layout.fatSectorNum
self._header.numDifatSectors = layout.difatSectorNum
if layout.difatSectorNum > 0:
self._header.firstDifatSectorLocation = layout.difatPos
# Zero out the output buffer; some sections pad, some sections don't ... but we need the buffer to have the proper size
# so we can jump around
obuf.write(b"\0" * layout.totalSize)
obuf.seek(0)
self._header.write_to(obuf)
self._write_DIFAT(obuf, layout)
self._write_FAT_start(obuf, layout)
self._write_MiniFAT(obuf, layout)
self._write_directory_entries(obuf, layout)
self._write_Content(obuf, layout)
def _write_directory_entries(self, obuf, layout: ECMA376EncryptedLayout):
obuf.seek(layout.offsetDirectoryEntries)
for d in self._dirs:
d.write_header_to(obuf) # This must write 128 bytes, no more, no less.
if obuf.tell() != (layout.offsetDirectoryEntries + len(self._dirs) * 128):
# TODO: Use appropriate custom exception
raise Exception("Buffer did not advance as expected when writing out directory entries")
def _write_Content(self, obuf, layout: ECMA376EncryptedLayout):
for d in self._dirs:
size = len(d.Content)
if size:
if size <= 4096: # Small content goes in the minifat section
obuf.seek(layout.offsetMiniData(d.StartingSectorLocation))
obuf.write(d.Content)
else:
obuf.seek(layout.offsetData(d.StartingSectorLocation))
obuf.write(d.Content)
def _write_FAT_start(self, obuf, layout: ECMA376EncryptedLayout):
v = ([SectorTypes.DIFSECT] * layout.difatSectorNum) + ([SectorTypes.FATSECT] * layout.fatSectorNum)
v += [layout.numMiniFatSectors, layout.directoryEntrySectorNum, layout.miniFatDataSectorNum, layout.encryptionPackageSectorNum]
obuf.seek(layout.offsetFat)
self._write_FAT(obuf, v, layout.fatSectorNum * layout.sectorSize)
def _write_MiniFAT(self, obuf, layout: ECMA376EncryptedLayout):
obuf.seek(layout.offsetMiniFat)
self._write_FAT(obuf, layout.miniFatSectors, layout.numMiniFatSectors * layout.sectorSize)
def _write_FAT(self, obuf, entries, blockSize):
v = 0
startPos = obuf.tell()
max_n = blockSize // 4 # 4 bytes per entry with <I
# TODO: Use appropriate custom exception
for e in entries:
if e <= SectorTypes.MAXREGSECT:
for j in range(1, e):
v += 1
if v > max_n:
raise Exception("Attempting to write beyond block size")
obuf.write(pack("<I", v))
if v == max_n:
raise Exception("Attempting to write beyond block size")
obuf.write(pack("<I", SectorTypes.ENDOFCHAIN))
else:
if v == max_n:
raise Exception("Attempting to write beyond block size")
obuf.write(pack("<I", e))
v += 1
obuf.write(pack("<I", SectorTypes.FREESECT) * (max_n - v))
if obuf.tell() - startPos != blockSize:
# TODO: Use appropriate custom exception
raise Exception("_write_FAT() did not completely fill the block space.")
def _write_DIFAT(self, obuf, layout: ECMA376EncryptedLayout):
if layout.difatSectorNum < 1:
return
v = Header.FIRSTNUMDIFAT + layout.difatSectorNum
for i in range(layout.difatSectorNum):
obuf.seek(layout.offsetDifat(i))
for j in range(layout.sectorSize // 4 - 1): # 4 == sizeof(32 bit int)
obuf.write(pack("<I", v))
v += 1
if v > layout.difatSectorNum + layout.fatSectorNum:
for k in range(j, layout.sectorSize // 4 - 1):
obuf.write(pack("<I", SectorTypes.FREESECT))
obuf.write(pack("<I", SectorTypes.ENDOFCHAIN))
return
# The next seek is _probably_ not needed...
obuf.seek(layout.offsetDifat(i) + layout.sectorSize - 4)
obuf.write(pack("<I", layout.difatPos + i + 1))
def _detect_sector_num(self, layout: ECMA376EncryptedLayout):
numInFat = layout.sectorSize // 4 # Number of 4-bytes integers
difatSectorNum = 0
fatSectorNum = 0
for i in range(10):
a = self._get_block_num(difatSectorNum + fatSectorNum + layout.contentSectorNum, numInFat)
b = 0 if a <= Header.FIRSTNUMDIFAT else self._get_block_num(a - Header.FIRSTNUMDIFAT, numInFat - 1)
if (b == difatSectorNum) and (a == fatSectorNum):
layout.fatSectorNum = fatSectorNum
layout.difatSectorNum = difatSectorNum
return
difatSectorNum = b
fatSectorNum = a
raise IndexError("Unable to detect sector number within a reasonsable amount of loops")
def _set_sector_locations_of_streams(self, layout: ECMA376EncryptedLayout):
# Use all streams, except the encrypted package which is special (and the main reason why we're doing all this!)
streamsOfInterest = list(filter(lambda d: d.Type == DirectoryEntryType.STREAM and d.Name != "EncryptedPackage", self._dirs))
miniFatSectors = []
miniFatNum = 0
miniFatDataSectorNum = 0
pos = 0
for s in streamsOfInterest:
n = self._get_MiniFAT_sector_number(len(s.Content))
miniFatSectors.append(n)
s.StartingSectorLocation = pos
pos += n
miniFatNum = pos
miniFatDataSectorNum = self._get_block_num(miniFatNum, (self._header.sectorSize // 64))
if self._get_block_num(miniFatDataSectorNum, 128) > 1:
raise ValueError("Unexpected layout size; too large")
layout.miniFatNum = miniFatNum
layout.miniFatDataSectorNum = miniFatDataSectorNum
layout.miniFatSectors = miniFatSectors
layout.directoryEntrySectorNum = self._get_block_num(len(self._dirs), 4)
layout.encryptionPackageSectorNum = self._get_block_num(len(self._dirs[DSPos.iEncryptionPackage].Content), layout.sectorSize)
def _get_MiniFAT_sector_number(self, size):
return self._get_block_num(size, 64)
def _get_block_num(self, x, block):
return (x + block - 1) // block