commit 2b9a1ede4da37661645ce4b88f008e226ea64d3e
parent 17710299ad11527f79ed6b4a7cf0cd42ca2ee0f4
Author: Carlosokumu <carlosokumu254@gmail.com>
Date: Sat, 7 Mar 2026 18:27:30 +0300
add export ledger entrieS functions
Diffstat:
2 files changed, 271 insertions(+), 67 deletions(-)
diff --git a/dummy/usawa/core/entry_service.py b/dummy/usawa/core/entry_service.py
@@ -4,70 +4,65 @@ import uuid
from .models import LedgerEntry
from usawa.storage.ledger_repository import LedgerRepository
+import lxml.etree as ET
logg = logging.getLogger("core.entry_service")
class EntryService:
"""Business logic for ledger entries"""
-
+
def __init__(self, repository: LedgerRepository):
self.repository = repository
-
+
def save_entry(self, entry: LedgerEntry) -> tuple[bool, str]:
- """
- Save entry with business logic
-
- :param entry: Entry to save
- :type entry: LedgerEntry
- :return: (success, error_message) tuple
- :rtype: tuple[bool, str]
- """
try:
entry.tx_date = datetime.now()
entry.date_registered = datetime.now()
entry.transaction_ref = self._generate_transaction_ref()
-
+
is_valid, error_msg = entry.validate()
if not is_valid:
logg.error(f"Entry validation failed: {error_msg}")
return False, error_msg
-
+
self.repository.save(entry)
-
+
logg.info(f"Entry saved successfully")
return True, ""
-
+
except FileExistsError as e:
- error_msg = "Some file info for this entry is already recorded in the ledger"
+ error_msg = (
+ "Some file information for this entry is already recorded in the ledger"
+ )
return False, error_msg
-
+
except ValueError as e:
error_msg = f"Invalid entry data: {str(e)}"
logg.error(f"Validation error: {e}")
return False, error_msg
-
+
except IOError as e:
error_msg = f"File error: {str(e)}"
logg.error(f"File operation failed: {e}")
return False, error_msg
-
+
except Exception as e:
error_msg = f"Failed to save entry: {str(e)}"
logg.error(f"Unexpected error: {e}", exc_info=True)
return False, error_msg
-
def get_all_entries(self):
return self.repository.get_all_entries()
-
-
def _generate_transaction_ref(self) -> str:
- """Generate UUID for transaction"""
return str(uuid.uuid4())
-
-
+
def get_asset_bytes(self, digest: bytes) -> bytes:
return self.repository.get_asset_bytes(digest=digest)
-
-\ No newline at end of file
+
+ def export_all_entries_to_xml(self, output_path: str) -> tuple[bool, str]:
+ return self.repository.export_all_entries_to_xml(output_path=output_path)
+
+ def export_entry_to_xml(self, serial: int, output_path: str) -> tuple[bool, str]:
+ return self.repository.export_entry_to_xml(serial, output_path)
diff --git a/dummy/usawa/storage/ledger_repository.py b/dummy/usawa/storage/ledger_repository.py
@@ -15,16 +15,21 @@ from whee.valkey import ValkeyStore
from usawa import Ledger, DemoWallet, load
from pathlib import Path
import mimetypes
+import lxml.etree as ET
+from pathlib import Path
+import lxml.etree as ET
+from copy import deepcopy
logg = logging.getLogger("storage.ledger_repository")
+
def sha256_verify(k, v=None):
if isinstance(k, str):
k = bytes.fromhex(k)
if len(k) != 32:
- raise ValueError('expect 256 bit key')
+ raise ValueError("expect 256 bit key")
khx = hexathon.uniform(k.hex())
@@ -39,8 +44,13 @@ def sha256_verify(k, v=None):
class LedgerRepository:
"""Repository that wraps LedgerStore and handles mapping"""
-
- def __init__(self,ledger_path = None, unix_client: UnixClient = None,fs_path: str="./assets"):
+
+ def __init__(
+ self,
+ ledger_path=None,
+ unix_client: UnixClient = None,
+ fs_path: str = "./assets",
+ ):
"""
Initialize the LedgerRepository.
@@ -53,19 +63,17 @@ class LedgerRepository:
:param fs_path: Path to the directory where FSResolver stores assets.
:type fs_path: str
"""
- self.valkey_store = ValkeyStore('')
+ self.valkey_store = ValkeyStore("")
self.unix_client = unix_client
self._wallet = None
self._store = None
self.ledger_path = ledger_path
- self.resolver = FSResolver(fs_path,verifier=sha256_verify)
-
-
+ self.resolver = FSResolver(fs_path, verifier=sha256_verify)
def _init_store(self, write=False) -> tuple[LedgerStore, Ledger, DemoWallet]:
ledger_tree = load(self.ledger_path)
ledger = Ledger.from_tree(ledger_tree)
-
+
if write:
logg.debug("init store for write")
self.store = LedgerStore(self.valkey_store, ledger)
@@ -79,17 +87,20 @@ class LedgerRepository:
pk = self.store.get_key()
self._wallet = DemoWallet(privatekey=pk)
- logg.debug("wallet pk: %s pubk: %s", self._wallet.privkey().hex(), self._wallet.pubkey().hex())
+ logg.debug(
+ "wallet pk: %s pubk: %s",
+ self._wallet.privkey().hex(),
+ self._wallet.pubkey().hex(),
+ )
ledger.set_wallet(self._wallet)
ledger.acl = ACL.from_wallet(self._wallet)
self.store.load(acl=ledger.acl)
return self.store, ledger, self._wallet
-
def save(self, domain_entry: LedgerEntry) -> None:
"""
Save a domain entry to storage
-
+
:param domain_entry: Entry to save
:type domain_entry: LedgerEntry
:raises ValueError: If validation fails
@@ -99,12 +110,17 @@ class LedgerRepository:
"""
try:
store, ledger, wallet = self._init_store(write=True)
-
+
entry = EntryMapper.to_entry(domain_entry, ledger=ledger)
entry.sign(wallet)
-
- logg.debug("Mapped entry - Serial: %s, Parent: %s, Attachments: %s",entry.serial,entry.parent.hex(),entry.attachment)
-
+
+ logg.debug(
+ "Mapped entry - Serial: %s, Parent: %s, Attachments: %s",
+ entry.serial,
+ entry.parent.hex(),
+ entry.attachment,
+ )
+
for attachment in domain_entry.attachments:
try:
info = self.get_file_info(attachment)
@@ -112,39 +128,38 @@ class LedgerRepository:
attachment,
slug=info["slug"],
description=info["description"],
- mimetype=info["mimetype"]
+ mimetype=info["mimetype"],
)
store.add_asset(asset)
entry.attach(asset)
-
-
+
with open(attachment, "rb") as f:
data = f.read()
self.resolver.put(asset.get_digest(binary=True), data)
-
+
except FileNotFoundError as e:
raise IOError(f"Attachment file not found: {attachment}") from e
except PermissionError as e:
raise IOError(f"Cannot read attachment file: {attachment}") from e
-
+
store.add_entry(entry, update_ledger=True)
-
+
ledger.truncate()
ledger.sign()
logg.info(f"Successfully saved entry #{entry.serial}")
-
+
except FileExistsError as e:
logg.debug(f"Entry fileinfo already exists: {e}")
- raise
+ raise
except ValueError as e:
logg.debug(f"Validation error: {e}")
- raise
+ raise
except IOError as e:
logg.debug(f"File operation failed: {e}")
- raise
+ raise
except Exception as e:
logg.debug(f"Failed to save entry: {e}", exc_info=True)
- raise
+ raise
def get_all_entries(self) -> List[LedgerEntry]:
"""Get all entries"""
@@ -158,7 +173,6 @@ class LedgerRepository:
except Exception as e:
logg.error(f"Failed to retrieve entries: {e}")
return []
-
def get_asset_bytes(self, digest: str):
logg.debug(f"Getting asset for digest: {digest}")
@@ -168,22 +182,219 @@ class LedgerRepository:
logg.exception("Failed to get asset for digest %s: %s", digest, e)
return None
-
- def get_file_info(self,file_path: str) -> dict:
+ def get_file_info(self, file_path: str) -> dict:
path = Path(file_path)
-
- slug = path.stem
-
- mimetype, _ = mimetypes.guess_type(file_path)
-
+
+ slug = path.stem
+
+ mimetype, _ = mimetypes.guess_type(file_path)
+
if mimetype:
- kind = mimetype.split("/")[0]
+ kind = mimetype.split("/")[0]
description = f"{kind.capitalize()} file: {path.name}"
else:
description = f"File: {path.name}"
-
+
return {
- "slug": slug,
- "description": description,
- "mimetype": mimetype,
- }
-\ No newline at end of file
+ "slug": slug,
+ "description": description,
+ "mimetype": mimetype,
+ }
+
+ def export_all_entries_to_xml(self, output_path: str) -> tuple[bool, str]:
+ """
+ Export all ledger entries to XML file
+
+ :param output_path: Path where XML file will be saved
+ :type output_path: str
+ :return: (success, error_message) tuple
+ :rtype: tuple[bool, str]
+ """
+ try:
+ ledger = self.store.ledger
+
+ tree = ledger.to_tree()
+ output_file = Path(output_path)
+ output_file.parent.mkdir(parents=True, exist_ok=True)
+
+ xml_string = ET.tostring(
+ tree, encoding="utf-8", xml_declaration=True, pretty_print=True
+ )
+
+ with open(output_file, "wb") as f:
+ f.write(xml_string)
+
+ logg.info(f"Exported ledger to {output_path}")
+ return True, ""
+
+ except PermissionError as e:
+ error_msg = "Permission denied. Cannot write to the specified location."
+ logg.debug(f"Permission error: {e}")
+ return False, error_msg
+
+ except IOError as e:
+ error_msg = f"Failed to write file: {str(e)}"
+ logg.debug(f"I/O error: {e}")
+ return False, error_msg
+
+ except Exception as e:
+ error_msg = f"Failed to export ledger: {str(e)}"
+ logg.debug("Unexpected error during export")
+ return False, error_msg
+
+ def export_entry_to_xml(self, serial: int, output_path: str) -> tuple[bool, str]:
+ """
+ Export a single ledger entry to an XML file
+ """
+ try:
+ logg.debug(f"Requested export for entry serial: {serial}")
+ storage_entry = self.store.ledger.entries.get(serial)
+ if not storage_entry:
+ return False, f"Entry #{serial} not found"
+
+ xml_tree = self.store.ledger.to_tree()
+ ns_uri = xml_tree.nsmap.get(None)
+ if ns_uri is None:
+ if "}" in xml_tree.tag:
+ ns_uri = xml_tree.tag.split("}")[0].strip("{")
+ else:
+ # Final fallback
+ ns_uri = "http://usawa.defalsify.org/"
+
+ logg.debug(f"Using namespace: {ns_uri}")
+ logg.debug(f"Root tag: {xml_tree.tag}")
+ logg.debug(f"Namespace map: {xml_tree.nsmap}")
+
+ all_entries = xml_tree.findall(".//{%s}entry" % ns_uri)
+ logg.debug(f"Total entries found in XML: {len(all_entries)}")
+
+ target_entry = None
+
+ for entry_elem in all_entries:
+ data_elem = _find_child(entry_elem, "data")
+
+ if data_elem is None:
+ logg.warning(f"Entry without <data> element")
+ continue
+
+ serial_elem = _find_child(data_elem, "serial")
+
+ if serial_elem is None:
+ logg.warning("Entry found with no serial element")
+ continue
+
+ entry_serial = int(serial_elem.text)
+ logg.debug(f"Inspecting entry serial: {entry_serial}")
+
+ if entry_serial == serial:
+ target_entry = entry_elem
+ logg.debug(f"Target entry found: {serial}")
+ break
+
+ if target_entry is None:
+ return False, f"Entry #{serial} not found in XML"
+
+ root = ET.Element("{%s}ledger" % ns_uri, nsmap={None: ns_uri})
+ root.set("version", xml_tree.get("version"))
+
+ for tag in ["topic", "generated", "src", "units", "identity"]:
+ elem = _find_child(xml_tree, tag)
+ if elem is not None:
+ logg.debug(f"Copying {tag} element")
+ root.append(deepcopy(elem))
+
+ data_elem = _find_child(target_entry, "data")
+
+ if data_elem is None:
+ return False, "Target entry has no <data> element"
+
+ debit_elem = _find_child(data_elem, "debit")
+ credit_elem = _find_child(data_elem, "credit")
+
+ debit_val = 0
+ credit_val = 0
+
+ if debit_elem is not None:
+ amount_elem = _find_child(debit_elem, "amount")
+ if amount_elem is not None:
+ debit_val = int(amount_elem.text)
+
+ if credit_elem is not None:
+ amount_elem = _find_child(credit_elem, "amount")
+ if amount_elem is not None:
+ credit_val = int(amount_elem.text)
+
+ expense = -abs(debit_val)
+ asset = credit_val
+
+ incoming = ET.Element("{%s}incoming" % ns_uri)
+ incoming.set("serial", "0")
+
+ real = ET.SubElement(incoming, "{%s}real" % ns_uri)
+ real.set("unit", "BTC")
+
+ ET.SubElement(real, "{%s}income" % ns_uri).text = "0"
+ ET.SubElement(real, "{%s}expense" % ns_uri).text = str(expense)
+ ET.SubElement(real, "{%s}asset" % ns_uri).text = str(asset)
+ ET.SubElement(real, "{%s}liability" % ns_uri).text = "0"
+
+ orig_incoming = _find_child(xml_tree, "incoming")
+ if orig_incoming is not None:
+ digest = _find_child(orig_incoming, "digest")
+ if digest is not None:
+ incoming.append(deepcopy(digest))
+
+ sig = _find_child(orig_incoming, "sig")
+ if sig is not None:
+ incoming.append(deepcopy(sig))
+
+ root.append(incoming)
+
+ logg.info(f"Appending entry #{serial} to new ledger")
+ root.append(deepcopy(target_entry))
+
+ final_entries = root.findall(".//{%s}entry" % ns_uri)
+ logg.info(f"Final XML contains {len(final_entries)} entry(ies)")
+
+ output_file = Path(output_path)
+ output_file.parent.mkdir(parents=True, exist_ok=True)
+
+ xml_string = ET.tostring(
+ root,
+ encoding="utf-8",
+ xml_declaration=True,
+ pretty_print=True,
+ )
+
+ with open(output_file, "wb") as f:
+ f.write(xml_string)
+
+ logg.info(f"Successfully exported entry #{serial} -> {output_path}")
+
+ return True, ""
+
+ except PermissionError as e:
+ logg.debug(f"Permission error while exporting entry: {e}")
+ return False, "Permission denied"
+
+ except IOError as e:
+ logg.debug(f"I/O error: {e}")
+ return False, str(e)
+
+ except Exception as e:
+ logg.debug("Unexpected error during entry export")
+ return False, str(e)
+
+
+def _get_local_name(element):
+ """Extract local name from element tag (without namespace)"""
+ tag = element.tag
+ return tag.split("}")[-1] if "}" in tag else tag
+
+
+def _find_child(parent, local_name):
+ """Find child element by local name"""
+ for child in parent:
+ if _get_local_name(child) == local_name:
+ return child
+ return None