Compare commits
No commits in common. "9900ce304bb39b0c0a0263ab28e8dacfdb47cf8b" and "fee5a7db1a8ec52c2775f573c7d80542e6d8ee38" have entirely different histories.
9900ce304b
...
fee5a7db1a
7
.vscode/settings.json
vendored
7
.vscode/settings.json
vendored
@ -1,7 +0,0 @@
|
|||||||
{
|
|
||||||
"cSpell.words": [
|
|
||||||
"hyfetch",
|
|
||||||
"neofetch",
|
|
||||||
"TERMUX"
|
|
||||||
]
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
{
|
|
||||||
"backups": [
|
|
||||||
{
|
|
||||||
"path": "zsh/linux.custom_rc.zsh",
|
|
||||||
"source": "~/.custom_rc"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "lsd",
|
|
||||||
"source": "~/.config/lsd"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "bat",
|
|
||||||
"source": "~/.config/bat"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "neofetch",
|
|
||||||
"source": "~/.config/neofetch"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "hyfetch/hyfetch.json",
|
|
||||||
"source": "~/.config/hyfetch.json"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
292
sync.py
292
sync.py
@ -1,38 +1,48 @@
|
|||||||
from typing import Callable, Iterable, TypeVar
|
from genericpath import exists
|
||||||
|
from typing import Iterable
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from math import nan
|
from math import nan
|
||||||
import os
|
import os
|
||||||
from os import path
|
from os import path
|
||||||
import sys
|
from pathlib import Path
|
||||||
import shutil
|
|
||||||
|
|
||||||
#=== Global Parameters ===#
|
#=== Init ===#
|
||||||
dry_run: bool = False
|
|
||||||
|
|
||||||
#=== Utils ===#
|
backup_root: str = path.dirname(__file__)
|
||||||
import re
|
user_home: str = path.expanduser("~")
|
||||||
import hashlib
|
|
||||||
|
|
||||||
T_WaitForInput_Res = TypeVar('T_WaitForInput_Res')
|
if user_home == "~":
|
||||||
def wait_for_input (cb: Callable[[str], T_WaitForInput_Res|None]) -> T_WaitForInput_Res:
|
print("WARN: Cannot read the user home dir, do you run it in the correct script?")
|
||||||
|
exit()
|
||||||
|
else:
|
||||||
|
print("dot-config: current user home: " + user_home)
|
||||||
|
|
||||||
|
class SysType (Enum):
|
||||||
|
LINUX = "linux"
|
||||||
|
TERMUX = "termux"
|
||||||
|
WINDOWS = "windows"
|
||||||
|
if ("termux" in backup_root):
|
||||||
|
sys_type: SysType = SysType.TERMUX
|
||||||
|
elif (backup_root[0] == "/"):
|
||||||
|
sys_type: SysType = SysType.LINUX
|
||||||
|
else:
|
||||||
|
sys_type: SysType = SysType.WINDOWS
|
||||||
|
print(f"dot config: your dot-config path is {backup_root}")
|
||||||
|
print(f"dot config: your system type is {sys_type}")
|
||||||
|
print(f"Is all the informations correct? [y/n] ", end="")
|
||||||
while True:
|
while True:
|
||||||
_in = input()
|
_in = input()
|
||||||
_out = cb(_in)
|
match _in:
|
||||||
if _out != None:
|
case "y":
|
||||||
return _out
|
print("continuing...")
|
||||||
|
break;
|
||||||
|
case "n":
|
||||||
|
print("Exiting")
|
||||||
|
exit()
|
||||||
|
case _:
|
||||||
|
print("please confirm with [y/n] ", end="")
|
||||||
|
|
||||||
def replace_env_variables(input_string):
|
#=== Utils ===#
|
||||||
"""
|
|
||||||
Replaces environment variables in the input string with their current values.
|
|
||||||
"""
|
|
||||||
def replace_env(match):
|
|
||||||
env_var = match.group(1)
|
|
||||||
return os.environ.get(env_var, f"${{{env_var}}}")
|
|
||||||
|
|
||||||
# Use regular expression to find environment variable placeholders
|
|
||||||
pattern = r"%(\w+)%"
|
|
||||||
replaced_string = re.sub(pattern, replace_env, input_string)
|
|
||||||
return replaced_string
|
|
||||||
|
|
||||||
def sorted_paths (paths: Iterable[str]) -> list[str]:
|
def sorted_paths (paths: Iterable[str]) -> list[str]:
|
||||||
fin = sorted(paths)
|
fin = sorted(paths)
|
||||||
@ -41,47 +51,21 @@ def sorted_paths (paths: Iterable[str]) -> list[str]:
|
|||||||
def de_abs_path (path: str) -> str:
|
def de_abs_path (path: str) -> str:
|
||||||
return path.strip('/').strip('\\')
|
return path.strip('/').strip('\\')
|
||||||
|
|
||||||
def get_file_hash(file_path: str) -> str:
|
|
||||||
with open(file_path, 'rb') as f:
|
|
||||||
bytes = f.read() # read entire file as bytes
|
|
||||||
readable_hash = hashlib.md5(bytes).hexdigest()
|
|
||||||
return readable_hash
|
|
||||||
|
|
||||||
def ensure_file_dir (file_path: str) -> None:
|
|
||||||
dir: str = path.dirname(file_path)
|
|
||||||
if dir != '' and not path.exists(dir):
|
|
||||||
if not dry_run:
|
|
||||||
os.makedirs(dir)
|
|
||||||
print(f":created parent dir {dir}")
|
|
||||||
|
|
||||||
def copyfile (src: str, dest: str) -> None:
|
|
||||||
ensure_file_dir(dest)
|
|
||||||
if not dry_run:
|
|
||||||
shutil.copy2(src, dest)
|
|
||||||
print(f":copied {src} -> {dest}")
|
|
||||||
print(f":updated {dest}")
|
|
||||||
|
|
||||||
def delfile (file_path: str) -> None:
|
|
||||||
if not dry_run:
|
|
||||||
os.remove(file_path)
|
|
||||||
print(f":deleted {file_path}")
|
|
||||||
|
|
||||||
#=== Backup Item ===#
|
#=== Backup Item ===#
|
||||||
|
|
||||||
class BackupItem:
|
class BackupItem:
|
||||||
def __init__ (self, backup_dir: str, origin_dir: str) -> None:
|
def __init__ (self, backup_dir: str, origin_dir: str) -> None:
|
||||||
self.name: str = backup_dir
|
self.name: str = backup_dir
|
||||||
self.backup_dir: str = path.join(backup_root, backup_dir)
|
self.backup_dir: str = path.join(backup_root, backup_dir)
|
||||||
self.origin_dir: str = path.abspath(replace_env_variables(path.expanduser(origin_dir)))
|
self.origin_dir: str = path.abspath(path.expanduser(origin_dir))
|
||||||
|
|
||||||
|
table: list[BackupItem] = [
|
||||||
|
BackupItem("PowerShell", "~/Documents/PowerShell"),
|
||||||
|
BackupItem("lsd", "~/.config/lsd")
|
||||||
|
]
|
||||||
|
|
||||||
def execute_sync (backupItem: BackupItem) -> None:
|
def execute_sync (backupItem: BackupItem) -> None:
|
||||||
print(f">>> executing backup for {backupItem.name}")
|
print(f">>> executing backup for {backupItem.name}")
|
||||||
exec_gallery: list[Callable] = []
|
|
||||||
### for file mode
|
|
||||||
if (path.isfile(backupItem.origin_dir)) or (path.isfile(backupItem.backup_dir)):
|
|
||||||
exec_gallery.append(compare_file(backupItem, None))
|
|
||||||
### for dir mode
|
|
||||||
else:
|
|
||||||
all_files_tmp: list[str] = []
|
all_files_tmp: list[str] = []
|
||||||
def walk_dir (walking_dir: str):
|
def walk_dir (walking_dir: str):
|
||||||
for root, dirs, files in os.walk(walking_dir):
|
for root, dirs, files in os.walk(walking_dir):
|
||||||
@ -99,28 +83,14 @@ def execute_sync (backupItem: BackupItem) -> None:
|
|||||||
walk_dir(backupItem.backup_dir)
|
walk_dir(backupItem.backup_dir)
|
||||||
all_files: list[str] = sorted_paths(set(all_files_tmp))
|
all_files: list[str] = sorted_paths(set(all_files_tmp))
|
||||||
for file in all_files:
|
for file in all_files:
|
||||||
exec_gallery.append(compare_file(backupItem, file))
|
compare_file(backupItem, file)
|
||||||
if exec_gallery.__len__() == 0:
|
# print("\n".join(all_files))
|
||||||
print("no files to sync ~")
|
|
||||||
return
|
|
||||||
while True:
|
|
||||||
print("! sync those files now? [y/n] ", end="")
|
|
||||||
_in = input()
|
|
||||||
if _in == 'y':
|
|
||||||
for i in exec_gallery:
|
|
||||||
i()
|
|
||||||
return
|
|
||||||
elif _in == 'n':
|
|
||||||
return
|
|
||||||
|
|
||||||
def compare_file (rootBackItem: BackupItem, relative_file_path: str|None) -> Callable:
|
def compare_file (rootBackItem: BackupItem, relative_file_path: str) -> None:
|
||||||
class NewerStatus (Enum):
|
class IsNewerStatus (Enum):
|
||||||
RIGHT_MISSING = -2
|
OLDER = -1
|
||||||
RIGHT_OLDER = -1
|
NEWER = 1
|
||||||
LEFT_OLDER = 1
|
|
||||||
LEFT_MISSING = 2
|
|
||||||
DIFFERENT = nan
|
DIFFERENT = nan
|
||||||
ALL_MISSING = -999
|
|
||||||
SAME = 0
|
SAME = 0
|
||||||
class FileStatus:
|
class FileStatus:
|
||||||
def __init__(self, realpath: str) -> None:
|
def __init__(self, realpath: str) -> None:
|
||||||
@ -129,159 +99,33 @@ def compare_file (rootBackItem: BackupItem, relative_file_path: str|None) -> Cal
|
|||||||
if self.exists:
|
if self.exists:
|
||||||
self.size = path.getsize(realpath)
|
self.size = path.getsize(realpath)
|
||||||
self.edited_time = path.getmtime(realpath)
|
self.edited_time = path.getmtime(realpath)
|
||||||
def hash_of_file (self) -> str:
|
def isNewerThan (self, other):
|
||||||
|
# type: (FileStatus) -> IsNewerStatus
|
||||||
if not self.exists:
|
if not self.exists:
|
||||||
return ""
|
return IsNewerStatus.OLDER
|
||||||
return get_file_hash(self.path)
|
if not other.exists:
|
||||||
def FileSameCheck (left: FileStatus, right: FileStatus) -> NewerStatus:
|
return IsNewerStatus.NEWER
|
||||||
def check_hash_same_or (status: NewerStatus) -> NewerStatus: # TODO: add to compare
|
if self.edited_time > other.edited_time:
|
||||||
if left.hash_of_file() == right.hash_of_file():
|
return IsNewerStatus.NEWER
|
||||||
return NewerStatus.SAME
|
elif self.edited_time < other.edited_time:
|
||||||
return status
|
return IsNewerStatus.OLDER
|
||||||
if not left.exists:
|
if self.size != other.size:
|
||||||
if not right.exists:
|
return IsNewerStatus.DIFFERENT
|
||||||
return NewerStatus.ALL_MISSING
|
return IsNewerStatus.SAME
|
||||||
return NewerStatus.LEFT_MISSING
|
|
||||||
if not right.exists:
|
|
||||||
if not left.exists:
|
|
||||||
return NewerStatus.ALL_MISSING
|
|
||||||
return NewerStatus.RIGHT_MISSING
|
|
||||||
if left.edited_time > right.edited_time:
|
|
||||||
return NewerStatus.LEFT_OLDER
|
|
||||||
elif left.edited_time < right.edited_time:
|
|
||||||
return NewerStatus.RIGHT_OLDER
|
|
||||||
if left.size != right.size:
|
|
||||||
return NewerStatus.DIFFERENT
|
|
||||||
return NewerStatus.SAME
|
|
||||||
if relative_file_path == None:
|
|
||||||
backup_item: FileStatus = FileStatus(rootBackItem.backup_dir)
|
|
||||||
origin_item: FileStatus = FileStatus(rootBackItem.origin_dir)
|
|
||||||
file_id: str = rootBackItem.name
|
|
||||||
else:
|
|
||||||
backup_item: FileStatus = FileStatus(path.join(rootBackItem.backup_dir, relative_file_path))
|
backup_item: FileStatus = FileStatus(path.join(rootBackItem.backup_dir, relative_file_path))
|
||||||
origin_item: FileStatus = FileStatus(path.join(rootBackItem.origin_dir, relative_file_path))
|
origin_item: FileStatus = FileStatus(path.join(rootBackItem.origin_dir, relative_file_path))
|
||||||
file_id: str = relative_file_path
|
match origin_item.isNewerThan(backup_item):
|
||||||
# print(f"((backup_item: {backup_item.path}))")
|
case IsNewerStatus.SAME:
|
||||||
# print(f"((origin_item: {origin_item.path}))")
|
print(f"{relative_file_path} : is same")
|
||||||
def wait_for_if_remove (onSync = Callable, onRemove = Callable) -> Callable[[str], Callable|None]:
|
case IsNewerStatus.OLDER:
|
||||||
def implementation (_in: str) -> Callable|None:
|
print(f"{relative_file_path} : backup file is newer")
|
||||||
match _in:
|
case IsNewerStatus.NEWER:
|
||||||
case "s":
|
print(f"{relative_file_path} : original file is newer")
|
||||||
return onSync
|
case IsNewerStatus.DIFFERENT:
|
||||||
case "r":
|
print(f"{relative_file_path} : WARN : backup is different but cannot determine which is newer")
|
||||||
return onRemove
|
|
||||||
case "i":
|
|
||||||
return lambda: None
|
|
||||||
case _:
|
|
||||||
print("sync or remove? [s=sync/r=remove/i=ignore] ", end="")
|
|
||||||
return None
|
|
||||||
return implementation
|
|
||||||
match FileSameCheck(origin_item, backup_item):
|
|
||||||
case NewerStatus.SAME:
|
|
||||||
# print(f"{file_id} : is same")
|
|
||||||
pass
|
|
||||||
case NewerStatus.RIGHT_OLDER:
|
|
||||||
print(f"{file_id} : local file is newer")
|
|
||||||
return lambda: copyfile(origin_item.path, backup_item.path)
|
|
||||||
case NewerStatus.LEFT_OLDER:
|
|
||||||
print(f"{file_id} : backup file is newer")
|
|
||||||
return lambda: copyfile(backup_item.path, origin_item.path)
|
|
||||||
case NewerStatus.RIGHT_MISSING:
|
|
||||||
print(f"{file_id} : backup file is missing, sync or remove? [s=sync/r=remove/i=ignore] ", end="")
|
|
||||||
return wait_for_input(wait_for_if_remove(
|
|
||||||
onSync = lambda: copyfile(origin_item.path, backup_item.path),
|
|
||||||
onRemove = lambda: delfile(origin_item.path)
|
|
||||||
))
|
|
||||||
case NewerStatus.LEFT_MISSING:
|
|
||||||
print(f"{file_id} : local file is missing, sync or remove? [s=sync/r=remove/i=ignore] ", end="")
|
|
||||||
exec = wait_for_input(wait_for_if_remove(
|
|
||||||
onSync = lambda: copyfile(backup_item.path, origin_item.path),
|
|
||||||
onRemove = lambda: delfile(backup_item.path)
|
|
||||||
))
|
|
||||||
return exec
|
|
||||||
case NewerStatus.DIFFERENT:
|
|
||||||
print(f"{file_id} : backup is different with local, which to keep? [b=backup/l=local/i=ignore] ", end="")
|
|
||||||
return wait_for_input(lambda _in: (
|
|
||||||
(lambda: copyfile(backup_item.path, origin_item.path)) if _in == 'l' else
|
|
||||||
(lambda: copyfile(origin_item.path, backup_item.path)) if _in == 'b' else
|
|
||||||
(lambda: None) if _in == 'i' else
|
|
||||||
None
|
|
||||||
))
|
|
||||||
case NewerStatus.ALL_MISSING:
|
|
||||||
print(f"{file_id} : both files are missing, will skipped")
|
|
||||||
return lambda: None
|
|
||||||
|
|
||||||
#=== Init ===#
|
|
||||||
|
|
||||||
for i in sys.argv:
|
|
||||||
if i == "--help" or i == '-h':
|
|
||||||
print("Usage: sync.py")
|
|
||||||
print(" -n --dry-run : enable dry-run mode")
|
|
||||||
print(" -v --version : show version")
|
|
||||||
print(" -h --help : show this help")
|
|
||||||
exit()
|
|
||||||
if i == "--version" or i == '-v':
|
|
||||||
print("dot-config sync.py v1.annie.0-alpha1")
|
|
||||||
exit()
|
|
||||||
if i == '--dry-run' or i == '-n':
|
|
||||||
dry_run = True
|
|
||||||
print("dot-config: global dry-run mode enabled!")
|
|
||||||
|
|
||||||
backup_root: str = path.dirname(__file__)
|
|
||||||
user_home: str = path.expanduser("~")
|
|
||||||
|
|
||||||
if user_home == "~":
|
|
||||||
print("FATAL: Cannot read the user home dir, do you run it in the correct script?")
|
|
||||||
exit()
|
|
||||||
else:
|
|
||||||
print("dot-config: current user home: " + user_home)
|
|
||||||
|
|
||||||
class SysType (Enum):
|
|
||||||
LINUX = 'linux'
|
|
||||||
TERMUX = 'termux'
|
|
||||||
WINDOWS = 'windows'
|
|
||||||
if ("termux" in backup_root):
|
|
||||||
sys_type: SysType = SysType.TERMUX
|
|
||||||
elif (backup_root[0] == "/"):
|
|
||||||
sys_type: SysType = SysType.LINUX
|
|
||||||
else:
|
|
||||||
sys_type: SysType = SysType.WINDOWS
|
|
||||||
print(f"dot-config: your dot-config path is {backup_root}")
|
|
||||||
print(f"dot-config: your system type is {sys_type}")
|
|
||||||
print(f"dot-config: dry run mode is {dry_run}")
|
|
||||||
print(f"Is all the information correct? [y/n] ", end="")
|
|
||||||
while True:
|
|
||||||
_in = input()
|
|
||||||
match _in:
|
|
||||||
case "y":
|
|
||||||
print("continuing...")
|
|
||||||
break;
|
|
||||||
case "n":
|
|
||||||
print("Exiting")
|
|
||||||
exit()
|
|
||||||
case _:
|
|
||||||
print("please confirm with [y/n] ", end="")
|
|
||||||
|
|
||||||
#=== main ===#
|
#=== main ===#
|
||||||
import json
|
|
||||||
|
|
||||||
table: list[BackupItem] = []
|
|
||||||
config_file = path.join(backup_root, f"sync.{sys_type.value}.json")
|
|
||||||
if not path.isfile(config_file):
|
|
||||||
print(f"dot-config : FATAL : cannot find config file for current system in {config_file}")
|
|
||||||
exit()
|
|
||||||
with open(config_file, 'r') as config_file_raw:
|
|
||||||
config = json.load(config_file_raw)
|
|
||||||
for i in config['backups']:
|
|
||||||
here: str = i['path']
|
|
||||||
there: str = i['source']
|
|
||||||
print(f"-- loaded [{here}] <-> [{there}]")
|
|
||||||
table.append(BackupItem(here, there))
|
|
||||||
|
|
||||||
print()
|
|
||||||
for i in table:
|
for i in table:
|
||||||
# print(f"((BackupItem i : {i.name}))")
|
|
||||||
# print(f"((i.backup_dir : {i.backup_dir}))")
|
|
||||||
# print(f"((i.origin_dir : {i.origin_dir}))")
|
|
||||||
execute_sync(i)
|
execute_sync(i)
|
||||||
print()
|
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
{
|
|
||||||
"backups": [
|
|
||||||
{
|
|
||||||
"path": "zsh/termux.zshrc.zsh",
|
|
||||||
"source": "~/.zshrc"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "lsd",
|
|
||||||
"source": "~/.config/lsd"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "bat",
|
|
||||||
"source": "~/.config/bat"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "neofetch",
|
|
||||||
"source": "~/.config/neofetch"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "hyfetch/hyfetch.json",
|
|
||||||
"source": "~/.config/hyfetch.json"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
{
|
|
||||||
"backups": [
|
|
||||||
{
|
|
||||||
"path": "PowerShell",
|
|
||||||
"source": "~/Documents/PowerShell"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "oh-my-posh",
|
|
||||||
"source": "~/.config/oh-my-posh"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "lsd",
|
|
||||||
"source": "%AppData%/lsd"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "bat",
|
|
||||||
"source": "%AppData%/bat"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "neofetch",
|
|
||||||
"source": "~/.config/neofetch"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "hyfetch/hyfetch.json",
|
|
||||||
"source": "~/.config/hyfetch.json"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
@ -1,22 +0,0 @@
|
|||||||
#!/bin/zsh
|
|
||||||
|
|
||||||
#=== bat (cat)
|
|
||||||
alias cat='bat'
|
|
||||||
#=== lsd
|
|
||||||
alias cols='lsd'
|
|
||||||
alias crls='cols'
|
|
||||||
alias ccls='crls -g'
|
|
||||||
alias cl='ccls -lA'
|
|
||||||
alias cll='ccls -l'
|
|
||||||
alias cla='ccls -la'
|
|
||||||
alias lc='ccls'
|
|
||||||
alias ls='lc'
|
|
||||||
alias l='cl'
|
|
||||||
alias ll='cll'
|
|
||||||
alias la='cla'
|
|
||||||
alias lgg='ccls --tree'
|
|
||||||
|
|
||||||
#=== zoxide (cd)
|
|
||||||
# zoxide must init after compinit is called
|
|
||||||
eval "$(zoxide init zsh)"
|
|
||||||
alias cd='z'
|
|
Loading…
Reference in New Issue
Block a user