Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
tsm-orchestration
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
UFZ TSM
tsm-orchestration
Commits
6bb8d758
Commit
6bb8d758
authored
1 year ago
by
Bert Palm
Browse files
Options
Downloads
Patches
Plain Diff
classiefied ftp script
parent
917470d2
No related branches found
No related tags found
1 merge request
!148
sftp sync script
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
cron/scripts/sftp_sync/sftp_sync.py
+263
-131
263 additions, 131 deletions
cron/scripts/sftp_sync/sftp_sync.py
with
263 additions
and
131 deletions
cron/scripts/sftp_sync/sftp_sync.py
+
263
−
131
View file @
6bb8d758
#!/usr/bin/env python
from
__future__
import
annotations
import
abc
import
os
import
stat
import
sys
import
time
from
dataclasses
import
dataclass
from
typing
import
IO
from
contextlib
import
contextmanager
from
paramiko
import
SSHClient
,
WarningPolicy
,
SFTPClient
,
SFTPAttributes
import
minio
import
psycopg
from
minio.datatypes
import
Object
as
MinioObject
,
Bucket
from
paramiko
import
(
SSHClient
,
WarningPolicy
,
SFTPClient
,
SFTPAttributes
,
RejectPolicy
,
MissingHostKeyPolicy
,
)
from
paramiko.config
import
SSH_PORT
import
logging
logger
=
logging
.
getLogger
(
__name__
)
class
RemoteFS
(
abc
.
ABC
):
files
:
dict
[
str
]
@abc.abstractmethod
def
exist
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
is_dir
(
self
,
path
:
str
)
->
bool
:
...
@abc.abstractmethod
def
last_modified
(
self
,
path
:
str
)
->
float
:
...
@abc.abstractmethod
def
size
(
self
,
path
:
str
)
->
int
:
...
@abc.abstractmethod
def
open
(
self
,
path
:
str
)
->
IO
[
bytes
]:
...
@abc.abstractmethod
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
)
->
None
:
...
@abc.abstractmethod
def
mkdir
(
self
,
path
:
str
)
->
None
:
...
def
update
(
self
,
other
:
RemoteFS
,
path
)
->
None
:
"""
Update or create a file at the same location as in the other
file system.
"""
if
other
.
is_dir
(
path
):
raise
ValueError
(
f
"
Cannot update a directory
"
)
if
self
.
exist
(
path
):
logger
.
debug
(
f
"
UPDATE
{
path
}
"
)
else
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
with
other
.
open
(
path
)
as
fo
:
self
.
put
(
path
,
fo
,
other
.
size
(
path
))
class
MinioFS
(
RemoteFS
):
cl
:
minio
.
Minio
files
:
dict
[
str
,
MinioObject
]
@classmethod
def
from_credentials
(
cls
,
endpoint
:
str
,
access_key
:
str
,
secret_key
:
str
,
bucket_name
:
str
|
None
=
None
,
secure
:
bool
=
True
,
):
cl
=
minio
.
Minio
(
endpoint
=
endpoint
,
access_key
=
access_key
,
secret_key
=
secret_key
,
secure
=
secure
,
)
return
cls
(
client
=
cl
,
bucket_name
=
bucket_name
)
def
__init__
(
self
,
client
:
minio
.
Minio
,
bucket_name
:
str
,
)
->
None
:
self
.
cl
=
client
self
.
bucket_name
=
bucket_name
self
.
_get_files
()
def
_get_files
(
self
):
self
.
files
=
{
file
.
object_name
:
file
for
file
in
self
.
cl
.
list_objects
(
self
.
bucket_name
,
recursive
=
True
)
}
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
return
self
.
files
[
path
].
size
def
is_dir
(
self
,
path
:
str
):
return
self
.
files
[
path
].
is_dir
def
last_modified
(
self
,
path
:
str
)
->
float
:
return
time
.
mktime
(
self
.
files
[
path
].
last_modified
.
timetuple
())
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
put_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
,
data
=
fo
,
length
=
size
)
@contextmanager
def
open
(
self
,
path
)
->
IO
[
bytes
]:
resp
=
None
try
:
resp
=
self
.
cl
.
get_object
(
bucket_name
=
self
.
bucket_name
,
object_name
=
path
)
yield
resp
finally
:
if
resp
is
not
None
:
resp
.
close
()
resp
.
release_conn
()
def
mkdir
(
self
,
path
:
str
):
# In Minio directories are created
# automatically when files are created.
pass
class
FtpFS
(
RemoteFS
):
cl
:
SFTPClient
files
:
dict
[
str
,
SFTPAttributes
]
@classmethod
def
from_credentials
(
cls
,
uri
,
username
,
password
,
path
,
keyfile_path
=
None
,
missing_host_key_policy
=
None
,
):
host
=
uri
.
split
(
"
:
"
)[
0
]
port
=
int
(
f
"
{
uri
}
:
"
.
split
(
"
:
"
)[
1
]
or
SSH_PORT
)
ssh
=
SSHClient
()
if
missing_host_key_policy
is
not
None
:
ssh
.
set_missing_host_key_policy
(
missing_host_key_policy
)
ssh
.
connect
(
hostname
=
host
,
port
=
int
(
port
),
username
=
username
,
password
=
password
or
None
,
key_filename
=
keyfile_path
,
look_for_keys
=
False
,
# todo maybe ?
allow_agent
=
False
,
compress
=
True
,
)
cl
=
ssh
.
open_sftp
()
return
cls
(
cl
,
path
)
def
__init__
(
self
,
client
:
SFTPClient
,
path
:
str
=
"
.
"
)
->
None
:
self
.
cl
=
client
self
.
path
=
path
self
.
files
=
{}
self
.
cl
.
chdir
(
self
.
path
)
self
.
_get_files
()
def
_get_files
(
self
,
path
=
""
):
# Note that directories always appear
# before any files from that directory
# appear.
dirs
=
[]
# we must avoid calling listdir_iter multiple
# times, otherwise it might cause a deadlock.
# That's why we do not recurse within the loop.
for
attrs
in
self
.
cl
.
listdir_iter
(
path
):
file_path
=
os
.
path
.
join
(
path
,
attrs
.
filename
)
if
stat
.
S_ISDIR
(
attrs
.
st_mode
):
dirs
.
append
(
file_path
)
self
.
files
[
file_path
]
=
attrs
for
dir_
in
dirs
:
self
.
_get_files
(
dir_
)
def
exist
(
self
,
path
:
str
):
return
path
in
self
.
files
def
size
(
self
,
path
:
str
):
return
self
.
files
[
path
].
st_size
def
is_dir
(
self
,
path
:
str
):
return
stat
.
S_ISDIR
(
self
.
files
[
path
].
st_mode
)
def
last_modified
(
self
,
path
:
str
)
->
float
:
return
self
.
files
[
path
].
st_mtime
def
put
(
self
,
path
:
str
,
fo
:
IO
[
bytes
],
size
:
int
):
self
.
cl
.
putfo
(
fl
=
fo
,
remotepath
=
path
,
file_size
=
size
)
def
mkdir
(
self
,
path
:
str
)
->
None
:
logger
.
debug
(
f
"
CREATE
{
path
}
"
)
self
.
cl
.
mkdir
(
path
)
@contextmanager
def
open
(
self
,
path
):
with
self
.
cl
.
open
(
path
,
mode
=
"
r
"
)
as
fo
:
yield
fo
@dataclass
class
FtpMeta
:
uri
:
str
username
:
str
sync_dir
:
str
|
None
=
None
password
:
str
|
None
=
None
keyfile_path
:
str
=
None
ssh_priv_key
:
str
=
None
def
get_internal_ftp
(
conn
,
thing_id
)
->
FtpMeta
:
def
get_internal_ftp
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
"""
Returns (uri, access_key, secret_key, bucket_name)
"""
with
conn
.
cursor
()
as
cur
:
values
=
cur
.
execute
(
"
SELECT r.fileserver_uri,
r.bucket,
r.access_key, r.secret_key
"
return
cur
.
execute
(
"
SELECT r.fileserver_uri, r.access_key, r.secret_key
, r.bucket
"
"
FROM tsm_thing t JOIN tsm_rawdatastorage r ON t.id = r.thing_id
"
"
WHERE t.thing_id = %s
"
,
[
thing_id
],
).
fetchone
()
return
FtpMeta
(
*
values
)
def
get_external_ftp
(
conn
,
thing_id
)
->
FtpMeta
:
def
get_external_ftp
(
conn
,
thing_id
)
->
tuple
[
str
,
str
,
str
,
str
]:
"""
Returns (uri, username, password, path)
"""
with
conn
.
cursor
()
as
cur
:
values
=
cur
.
execute
(
"
SELECT ext_sftp_uri,
ext_sftp_path,
ext_sftp_username, ext_sftp_password
"
return
cur
.
execute
(
"
SELECT ext_sftp_uri, ext_sftp_username, ext_sftp_password
, ext_sftp_path
"
"
FROM tsm_thing WHERE thing_id = %s
"
,
[
thing_id
],
).
fetchone
()
ftp
=
FtpMeta
(
*
values
)
ftp
.
keyfile_path
=
os
.
path
.
join
(
os
.
environ
[
"
SSH_KEYFILE_DIR
"
],
f
"
{
thing_id
}
"
)
return
ftp
def
connect_ftp
(
ftp
:
FtpMeta
)
->
SFTPClient
:
ssh
=
SSHClient
()
ssh
.
set_missing_host_key_policy
(
WarningPolicy
)
host
,
*
port
=
ftp
.
uri
.
split
(
"
:
"
)
port
=
port
[
0
]
if
port
else
22
ssh
.
connect
(
hostname
=
host
,
port
=
port
,
username
=
ftp
.
username
,
password
=
ftp
.
password
or
None
,
key_filename
=
ftp
.
keyfile_path
or
None
,
look_for_keys
=
False
,
# todo maybe ?
allow_agent
=
False
,
compress
=
True
,
)
sftp
=
ssh
.
open_sftp
()
if
ftp
.
sync_dir
:
# might raise FileNotFoundError,
# but that is good enough
sftp
.
chdir
(
ftp
.
sync_dir
)
return
sftp
def
get_files
(
sftp
,
path
)
->
dict
:
# Note that directories always appear
# before any files from that directory
# appear.
files
=
{}
dirs
=
[]
# we must avoid calling listdir_iter multiple
# times, otherwise it might cause a deadlock.
# That's why we do not recurse within the loop.
for
attrs
in
sftp
.
listdir_iter
(
path
):
file_path
=
f
"
{
path
}
/
{
attrs
.
filename
}
"
if
stat
.
S_ISDIR
(
attrs
.
st_mode
):
dirs
.
append
(
file_path
)
files
[
file_path
]
=
attrs
for
dir_
in
dirs
:
files
.
update
(
get_files
(
sftp
,
dir_
))
return
files
def
sync
(
src
:
SFTPClient
,
dest
:
SFTPClient
):
def
is_dir
(
attrs
:
SFTPAttributes
)
->
bool
:
return
stat
.
S_ISDIR
(
attrs
.
st_mode
)
def
mk_dir
(
path
)
->
None
:
# force creation of a dir.
# delete file with same name
# if necessary
logger
.
info
(
f
"
CREATE
{
path
}
/
"
)
try
:
dest
.
remove
(
path
)
except
FileNotFoundError
:
pass
dest
.
mkdir
(
path
)
def
update_file
(
path
,
src_attrs
)
->
None
:
logger
.
info
(
f
"
UPDATE
{
path
}
"
)
with
src
.
open
(
path
,
"
rb
"
)
as
fl
:
dest
.
putfo
(
fl
,
path
)
dest
.
utime
(
path
,
(
src_attrs
.
st_atime
,
src_attrs
.
st_mtime
))
def
needs_sync
(
path
,
src_attrs
)
->
bool
:
# general:
# return True if path not exist on dest
# for directories:
# return True if path also is a directory on dest
# for files:
# return True if file has same size and mtime on dest
dest_attrs
=
dest_files
.
get
(
path
,
None
)
if
dest_attrs
is
None
:
return
True
if
is_dir
(
src_attrs
)
and
is_dir
(
dest_attrs
):
return
False
elif
is_dir
(
dest_attrs
):
return
True
elif
(
dest_attrs
.
st_size
==
src_attrs
.
st_size
and
dest_attrs
.
st_mtime
==
src_attrs
.
st_mtime
):
return
False
else
:
return
True
dest_files
=
get_files
(
dest
,
"
.
"
)
for
path
,
attrs
in
get_files
(
src
,
"
.
"
).
items
():
if
needs_sync
(
path
,
attrs
):
if
is_dir
(
attrs
):
mk_dir
(
path
)
else
:
update_file
(
path
,
attrs
)
else
:
logger
.
info
(
f
"
ok :
{
path
}
"
)
def
sync
(
src
:
RemoteFS
,
trg
:
RemoteFS
):
for
path
in
src
.
files
:
# dirs
if
src
.
is_dir
(
path
):
if
not
trg
.
exist
(
path
):
trg
.
mkdir
(
path
)
continue
# regular files
if
(
not
trg
.
exist
(
path
)
or
src
.
size
(
path
)
!=
trg
.
size
(
path
)
or
src
.
last_modified
(
path
)
>
trg
.
last_modified
(
path
)
):
trg
.
update
(
src
,
path
)
continue
logger
.
info
(
f
"
--OK--
{
path
}
"
)
def
my_test
():
ftp1
=
connect_ftp
(
FtpMeta
(
uri
=
"
tsm.intranet.ufz.de
"
,
sync_dir
=
"
ftp_test
"
,
username
=
"
bpalm
"
,
password
=
None
,
keyfile_path
=
os
.
environ
.
get
(
"
TEST_KEYFILE_PATH
"
),
)
src
=
FtpFS
.
from_credentials
(
uri
=
"
tsm.intranet.ufz.de
"
,
username
=
"
bpalm
"
,
password
=
None
,
path
=
"
ftp_test
"
,
keyfile_path
=
os
.
environ
.
get
(
"
TEST_KEYFILE_PATH
"
),
missing_host_key_policy
=
WarningPolicy
,
)
ftp2
=
connect_ftp
(
FtpMeta
(
uri
=
"
tsm.ufz.de
"
,
sync_dir
=
"
fooo
"
,
username
=
"
bpalm
"
,
password
=
None
,
keyfile_path
=
os
.
environ
.
get
(
"
TEST_KEYFILE_PATH
"
),
)
trg
=
MinioFS
.
from_credentials
(
endpoint
=
"
localhost:9000
"
,
access_key
=
"
minioadmin
"
,
secret_key
=
"
minioadmin
"
,
secure
=
False
,
bucket_name
=
"
ufz-timese-demogroup-0a308373-ab29-4317-b351-1443e8a1babd
"
,
)
sync
(
ftp1
,
ftp2
)
sync
(
src
,
trg
)
if
__name__
==
"
__main__
"
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
my_test
()
exit
(
1
)
if
1
:
exit
(
1
)
if
len
(
sys
.
argv
)
!=
2
:
raise
ValueError
(
"
Expected a thing_id as first and only argument.
"
)
thing_id
=
sys
.
argv
[
1
]
...
...
@@ -176,10 +301,17 @@ if __name__ == "__main__":
for
k
in
[
"
SSH_PRIV_KEY_PATH
"
,
"
FTP_AUTH_DB_URI
"
]:
if
(
os
.
environ
.
get
(
k
)
or
None
)
is
None
:
raise
EnvironmentError
(
"
Environment variable {k} must be set
"
)
with
psycopg
.
connect
(
os
.
environ
[
"
FTP_AUTH_DB_DSN
"
])
as
conn
:
ssh_priv_key
=
os
.
path
.
join
(
os
.
environ
[
"
SSH_KEYFILE_DIR
"
],
f
"
{
thing_id
}
"
)
dsn
=
os
.
environ
[
"
FTP_AUTH_DB_DSN
"
]
with
psycopg
.
connect
(
dsn
)
as
conn
:
ftp_ext
=
get_external_ftp
(
conn
,
thing_id
)
ftp_int
=
get_internal_ftp
(
conn
,
thing_id
)
src
=
connect_ftp
(
ftp_ext
)
dest
=
connect_ftp
(
ftp_int
)
sync
(
src
,
dest
)
target
=
MinioFS
.
from_credentials
(
*
ftp_int
,
secure
=
True
)
source
=
FtpFS
.
from_credentials
(
*
ftp_ext
,
keyfile_path
=
ssh_priv_key
,
missing_host_key_policy
=
WarningPolicy
)
sync
(
source
,
target
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment