Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
tsm-orchestration
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
UFZ TSM
tsm-orchestration
Commits
b94a4094
Commit
b94a4094
authored
1 year ago
by
Bert Palm
Browse files
Options
Downloads
Patches
Plain Diff
added requirements and other minor improvements
parent
44b4c121
No related branches found
No related tags found
1 merge request
!148
sftp sync script
Pipeline
#374901
passed
1 year ago
Stage: integration
Stage: end-to-end
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
cron/requirements.txt
+4
-1
4 additions, 1 deletion
cron/requirements.txt
cron/scripts/sftp_sync/remote_fs.py
+226
-0
226 additions, 0 deletions
cron/scripts/sftp_sync/remote_fs.py
cron/scripts/sftp_sync/sftp_sync.py
+14
-239
14 additions, 239 deletions
cron/scripts/sftp_sync/sftp_sync.py
with
244 additions
and
240 deletions
cron/requirements.txt
+
4
−
1
View file @
b94a4094
psycopg2-binary~=2.9.2
\ No newline at end of file
psycopg2-binary~=2.9.2
psycopg
minio
paramiko
\ No newline at end of file
This diff is collapsed.
Click to expand it.
cron/scripts/sftp_sync/remote_fs.py
0 → 100644
+
226
−
0
View file @
b94a4094
#!/usr/bin/env python
from
__future__
import
annotations
import
abc
import
logging
import
os
import
stat
import
time
from
typing
import
IO
from
contextlib
import
contextmanager
import
minio
from
minio.datatypes
import
Object
as
MinioObject
from
paramiko
import
(
SSHClient
,
SFTPClient
,
SFTPAttributes
,
RejectPolicy
,
MissingHostKeyPolicy
,
)
from
paramiko.config
import
SSH_PORT
logger
=
logging
.
getLogger
(
__name__
)
class
RemoteFS
(
abc
.
ABC
):
files
:
dict
[
str
]
@abc.abstractmethod
def
exist
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
is_dir
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
last_modified
(
self
,
path
:
str
)
->
float
:
...
@abc.abstractmethod
def
size
(
self
,
path
:
str
)
->
int
:
...
@abc.abstractmethod
def
open
(
self
,
path
:
str
)
->
IO
[
bytes
]:
...
@abc.abstractmethod
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
)
->
None
:
...
@abc.abstractmethod
def
mkdir
(
self
,
path
:
str
)
->
None
:
...
def
update
(
self
,
other
:
RemoteFS
,
path
)
->
None
:
"""
Update or create a file at the same location as in the other
file system.
"""
if
other
.
is_dir
(
path
):
raise
ValueError
(
f
"
Cannot update a directory
"
)
if
self
.
exist
(
path
):
logger
.
debug
(
f
"
UPDATE
{
path
}
"
)
else
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
with
other
.
open
(
path
)
as
fo
:
self
.
put
(
path
,
fo
,
other
.
size
(
path
))
class
MinioFS
(
RemoteFS
):
cl
:
minio
.
Minio
files
:
dict
[
str
,
MinioObject
]
@classmethod
def
from_credentials
(
cls
,
endpoint
:
str
,
access_key
:
str
,
secret_key
:
str
,
bucket_name
:
str
|
None
=
None
,
secure
:
bool
=
True
,
):
cl
=
minio
.
Minio
(
endpoint
=
endpoint
,
access_key
=
access_key
,
secret_key
=
secret_key
,
secure
=
secure
,
)
return
cls
(
client
=
cl
,
bucket_name
=
bucket_name
)
def
__init__
(
self
,
client
:
minio
.
Minio
,
bucket_name
:
str
,
)
->
None
:
self
.
cl
=
client
self
.
bucket_name
=
bucket_name
self
.
_get_files
()
def
_get_files
(
self
):
self
.
files
=
{
file
.
object_name
:
file
for
file
in
self
.
cl
.
list_objects
(
self
.
bucket_name
,
recursive
=
True
)
}
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
size
def
is_dir
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
is_dir
def
last_modified
(
self
,
path
:
str
)
->
float
:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
time
.
mktime
(
self
.
files
[
path
].
last_modified
.
timetuple
())
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
put_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
,
data
=
fo
,
length
=
size
)
@contextmanager
def
open
(
self
,
path
)
->
IO
[
bytes
]:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
resp
=
None
try
:
resp
=
self
.
cl
.
get_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
)
yield
resp
finally
:
if
resp
is
not
None
:
resp
.
close
()
resp
.
release_conn
()
def
mkdir
(
self
,
path
:
str
):
# In Minio directories are created
# automatically when files are created.
pass
class
FtpFS
(
RemoteFS
):
cl
:
SFTPClient
files
:
dict
[
str
,
SFTPAttributes
]
@classmethod
def
from_credentials
(
cls
,
uri
,
username
,
password
,
path
,
keyfile_path
=
None
,
missing_host_key_policy
:
MissingHostKeyPolicy
|
None
=
None
,
):
host
=
uri
.
split
(
"
:
"
)[
0
]
port
=
int
(
f
"
{
uri
}
:
"
.
split
(
"
:
"
)[
1
]
or
SSH_PORT
)
ssh
=
SSHClient
()
if
missing_host_key_policy
is
not
None
:
ssh
.
set_missing_host_key_policy
(
missing_host_key_policy
)
ssh
.
connect
(
hostname
=
host
,
port
=
int
(
port
),
username
=
username
,
password
=
password
or
None
,
key_filename
=
keyfile_path
,
look_for_keys
=
False
,
# todo maybe ?
allow_agent
=
False
,
compress
=
True
,
)
cl
=
ssh
.
open_sftp
()
return
cls
(
cl
,
path
)
def
__init__
(
self
,
client
:
SFTPClient
,
path
:
str
=
"
.
"
)
->
None
:
self
.
cl
=
client
self
.
path
=
path
self
.
files
=
{}
self
.
cl
.
chdir
(
self
.
path
)
self
.
_get_files
()
def
_get_files
(
self
,
path
=
""
):
# Note that directories always appear
# before any files from that directory
# appear.
dirs
=
[]
# we must avoid calling listdir_iter multiple
# times, otherwise it might cause a deadlock.
# That's why we do not recurse within the loop.
for
attrs
in
self
.
cl
.
listdir_iter
(
path
):
file_path
=
os
.
path
.
join
(
path
,
attrs
.
filename
)
if
stat
.
S_ISDIR
(
attrs
.
st_mode
):
dirs
.
append
(
file_path
)
self
.
files
[
file_path
]
=
attrs
for
dir_
in
dirs
:
self
.
_get_files
(
dir_
)
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
st_size
def
is_dir
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
stat
.
S_ISDIR
(
self
.
files
[
path
].
st_mode
)
def
last_modified
(
self
,
path
:
str
)
->
float
:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
st_mtime
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
putfo
(
fl
=
fo
,
remotepath
=
path
,
file_size
=
size
)
def
mkdir
(
self
,
path
:
str
)
->
None
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
self
.
cl
.
mkdir
(
path
)
@contextmanager
def
open
(
self
,
path
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
with
self
.
cl
.
open
(
path
,
mode
=
"
r
"
)
as
fo
:
yield
fo
This diff is collapsed.
Click to expand it.
cron/scripts/sftp_sync/sftp_sync.py
+
14
−
239
View file @
b94a4094
#!/usr/bin/env python
from
__future__
import
annotations
import
abc
import
os
import
stat
import
sys
import
time
from
dataclasses
import
dataclass
from
typing
import
IO
from
contextlib
import
contextmanager
import
minio
import
psycopg
from
minio.datatypes
import
Object
as
MinioObject
,
Bucket
from
paramiko
import
(
SSHClient
,
WarningPolicy
,
SFTPClient
,
SFTPAttributes
,
RejectPolicy
,
MissingHostKeyPolicy
,
)
from
paramiko.config
import
SSH_PORT
import
logging
from
remote_fs
import
MinioFS
,
FtpFS
,
RemoteFS
from
paramiko
import
WarningPolicy
logger
=
logging
.
getLogger
(
__name__
)
class
RemoteFS
(
abc
.
ABC
):
files
:
dict
[
str
]
@abc.abstractmethod
def
exist
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
is_dir
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
last_modified
(
self
,
path
:
str
)
->
float
:
...
@abc.abstractmethod
def
size
(
self
,
path
:
str
)
->
int
:
...
@abc.abstractmethod
def
open
(
self
,
path
:
str
)
->
IO
[
bytes
]:
...
@abc.abstractmethod
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
)
->
None
:
...
@abc.abstractmethod
def
mkdir
(
self
,
path
:
str
)
->
None
:
...
def
update
(
self
,
other
:
RemoteFS
,
path
)
->
None
:
"""
Update or create a file at the same location as in the other
file system.
"""
if
other
.
is_dir
(
path
):
raise
ValueError
(
f
"
Cannot update a directory
"
)
if
self
.
exist
(
path
):
logger
.
debug
(
f
"
UPDATE
{
path
}
"
)
else
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
with
other
.
open
(
path
)
as
fo
:
self
.
put
(
path
,
fo
,
other
.
size
(
path
))
class
MinioFS
(
RemoteFS
):
cl
:
minio
.
Minio
files
:
dict
[
str
,
MinioObject
]
@classmethod
def
from_credentials
(
cls
,
endpoint
:
str
,
access_key
:
str
,
secret_key
:
str
,
bucket_name
:
str
|
None
=
None
,
secure
:
bool
=
True
,
):
cl
=
minio
.
Minio
(
endpoint
=
endpoint
,
access_key
=
access_key
,
secret_key
=
secret_key
,
secure
=
secure
,
)
return
cls
(
client
=
cl
,
bucket_name
=
bucket_name
)
def
__init__
(
self
,
client
:
minio
.
Minio
,
bucket_name
:
str
,
)
->
None
:
self
.
cl
=
client
self
.
bucket_name
=
bucket_name
self
.
_get_files
()
def
_get_files
(
self
):
self
.
files
=
{
file
.
object_name
:
file
for
file
in
self
.
cl
.
list_objects
(
self
.
bucket_name
,
recursive
=
True
)
}
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
size
def
is_dir
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
is_dir
def
last_modified
(
self
,
path
:
str
)
->
float
:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
time
.
mktime
(
self
.
files
[
path
].
last_modified
.
timetuple
())
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
put_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
,
data
=
fo
,
length
=
size
)
@contextmanager
def
open
(
self
,
path
)
->
IO
[
bytes
]:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
resp
=
None
try
:
resp
=
self
.
cl
.
get_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
)
yield
resp
finally
:
if
resp
is
not
None
:
resp
.
close
()
resp
.
release_conn
()
def
mkdir
(
self
,
path
:
str
):
# In Minio directories are created
# automatically when files are created.
pass
class
FtpFS
(
RemoteFS
):
cl
:
SFTPClient
files
:
dict
[
str
,
SFTPAttributes
]
@classmethod
def
from_credentials
(
cls
,
uri
,
username
,
password
,
path
,
keyfile_path
=
None
,
missing_host_key_policy
=
None
,
):
host
=
uri
.
split
(
"
:
"
)[
0
]
port
=
int
(
f
"
{
uri
}
:
"
.
split
(
"
:
"
)[
1
]
or
SSH_PORT
)
ssh
=
SSHClient
()
if
missing_host_key_policy
is
not
None
:
ssh
.
set_missing_host_key_policy
(
missing_host_key_policy
)
ssh
.
connect
(
hostname
=
host
,
port
=
int
(
port
),
username
=
username
,
password
=
password
or
None
,
key_filename
=
keyfile_path
,
look_for_keys
=
False
,
# todo maybe ?
allow_agent
=
False
,
compress
=
True
,
)
cl
=
ssh
.
open_sftp
()
return
cls
(
cl
,
path
)
def
__init__
(
self
,
client
:
SFTPClient
,
path
:
str
=
"
.
"
)
->
None
:
self
.
cl
=
client
self
.
path
=
path
self
.
files
=
{}
self
.
cl
.
chdir
(
self
.
path
)
self
.
_get_files
()
def
_get_files
(
self
,
path
=
""
):
# Note that directories always appear
# before any files from that directory
# appear.
dirs
=
[]
# we must avoid calling listdir_iter multiple
# times, otherwise it might cause a deadlock.
# That's why we do not recurse within the loop.
for
attrs
in
self
.
cl
.
listdir_iter
(
path
):
file_path
=
os
.
path
.
join
(
path
,
attrs
.
filename
)
if
stat
.
S_ISDIR
(
attrs
.
st_mode
):
dirs
.
append
(
file_path
)
self
.
files
[
file_path
]
=
attrs
for
dir_
in
dirs
:
self
.
_get_files
(
dir_
)
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
st_size
def
is_dir
(
self
,
path
:
str
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
stat
.
S_ISDIR
(
self
.
files
[
path
].
st_mode
)
def
last_modified
(
self
,
path
:
str
)
->
float
:
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
return
self
.
files
[
path
].
st_mtime
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
putfo
(
fl
=
fo
,
remotepath
=
path
,
file_size
=
size
)
def
mkdir
(
self
,
path
:
str
)
->
None
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
self
.
cl
.
mkdir
(
path
)
@contextmanager
def
open
(
self
,
path
):
if
not
self
.
exist
(
path
):
raise
FileNotFoundError
(
path
)
with
self
.
cl
.
open
(
path
,
mode
=
"
r
"
)
as
fo
:
yield
fo
@dataclass
class
FtpMeta
:
uri
:
str
username
:
str
sync_dir
:
str
|
None
=
None
password
:
str
|
None
=
None
ssh_priv_key
:
str
=
None
def
get_internal_ftp
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
def
get_minio_credentials
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
"""
Returns (uri, access_key, secret_key, bucket_name)
"""
with
conn
.
cursor
()
as
cur
:
return
cur
.
execute
(
...
...
@@ -250,7 +22,7 @@ def get_internal_ftp(conn, thing_id) -> tuple[str, str, str, str]:
).
fetchone
()
def
get_external_ftp
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
def
get_external_ftp
_credentials
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
"""
Returns (uri, username, password, path)
"""
with
conn
.
cursor
()
as
cur
:
return
cur
.
execute
(
...
...
@@ -263,7 +35,6 @@ def get_external_ftp(conn, thing_id) -> tuple[str, str, str, str]:
def
sync
(
src
:
RemoteFS
,
trg
:
RemoteFS
):
for
path
in
src
.
files
:
logger
.
info
(
f
"
SYNCING:
{
path
}
"
)
# dirs
...
...
@@ -283,24 +54,28 @@ def sync(src: RemoteFS, trg: RemoteFS):
if
__name__
==
"
__main__
"
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
logging
.
basicConfig
(
level
=
os
.
environ
.
get
(
"
LOG_LEVEL
"
,
"
INFO
"
).
upper
())
if
len
(
sys
.
argv
)
!=
2
:
raise
ValueError
(
"
Expected a thing_id as first and only argument.
"
)
thing_id
=
sys
.
argv
[
1
]
for
k
in
[
"
SSH_PRIV_KEY_PATH
"
,
"
FTP_AUTH_DB_URI
"
]:
for
k
in
[
"
SSH_PRIV_KEY_PATH
"
,
"
FTP_AUTH_DB_URI
"
,
"
MINIO_SECURE
"
]:
if
(
os
.
environ
.
get
(
k
)
or
None
)
is
None
:
raise
EnvironmentError
(
"
Environment variable {k} must be set
"
)
ssh_priv_key
=
os
.
path
.
join
(
os
.
environ
[
"
SSH_KEYFILE_DIR
"
],
f
"
{
thing_id
}
"
)
dsn
=
os
.
environ
[
"
FTP_AUTH_DB_DSN
"
]
minio_secure
=
(
# ensure True as default
False
if
os
.
environ
[
"
MINIO_SECURE
"
].
lower
()
in
[
"
false
"
,
"
0
"
]
else
True
)
with
psycopg
.
connect
(
dsn
)
as
conn
:
ftp_ext
=
get_external_ftp
(
conn
,
thing_id
)
ftp_int
=
get_in
ternal_ftp
(
conn
,
thing_id
)
ftp_ext
=
get_external_ftp
_credentials
(
conn
,
thing_id
)
ftp_int
=
get_
m
in
io_credentials
(
conn
,
thing_id
)
target
=
MinioFS
.
from_credentials
(
*
ftp_int
,
secure
=
Tru
e
)
target
=
MinioFS
.
from_credentials
(
*
ftp_int
,
secure
=
minio_secur
e
)
source
=
FtpFS
.
from_credentials
(
*
ftp_ext
,
keyfile_path
=
ssh_priv_key
,
missing_host_key_policy
=
WarningPolicy
*
ftp_ext
,
keyfile_path
=
ssh_priv_key
,
missing_host_key_policy
=
WarningPolicy
()
)
sync
(
source
,
target
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment